您的位置:首页 > 编程语言 > Java开发

spring boot实战(第十二篇)整合RabbitMQ

2016-11-30 14:17 666 查看
前言

本篇主要讲述spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar

 

[html]


<dependency>  

<groupId>org.springframework.boot</groupId>  

<artifactId>spring-boot-starter-amqp</artifactId>  

lt;/dependency>  



<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

 


消息生产者

不论是创建消息消费者或生产者都需要ConnectionFactory
 
 


ConnectionFactory配置

创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)
 

[html]


@Configuration  

public class AmqpConfig {  

  

    public static final String EXCHANGE   = "spring-boot-exchange";  

    public static final String ROUTINGKEY = "spring-boot-routingKey";  

  

    @Bean  

    public ConnectionFactory connectionFactory() {  

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  

        connectionFactory.setAddresses("127.0.0.1:5672");  

        connectionFactory.setUsername("guest");  

        connectionFactory.setPassword("guest");  

        connectionFactory.setVirtualHost("/");  

        connectionFactory.setPublisherConfirms(true); //必须要设置  

        return connectionFactory;  

    }  

}  



@Configuration
public class AmqpConfig {

public static final String EXCHANGE   = "spring-boot-exchange";
public static final String ROUTINGKEY = "spring-boot-routingKey";

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); //必须要设置
return connectionFactory;
}
}


这里需要显示调用

[html]


connectionFactory.setPublisherConfirms(true);  

connectionFactory.setPublisherConfirms(true);

才能进行消息的回调。
 
 


RabbitTemplate

通过使用RabbitTemplate来对开发者提供API操作

[html]


@Bean  

@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  

//必须是prototype类型  

public RabbitTemplate rabbitTemplate() {  

    RabbitTemplate template = new RabbitTemplate(connectionFactory());  

    return template;  

}  



@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}

这里设置为原型,具体的原因在后面会讲到
  在发送消息时通过调用RabbitTemplate中的如下方法

[html]

public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)  

public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)


exchange:交换机名称

routingKey:路由关键字

object:发送的消息内容

correlationData:消息ID

 
因此生产者代码详单简洁


Send.java

[html]

@Component  

public class Send  {  

  

    private RabbitTemplate rabbitTemplate;  

  

    /**  

     * 构造方法注入  

     */  

    @Autowired  

    public Send(RabbitTemplate rabbitTemplate) {  

        this.rabbitTemplate = rabbitTemplate;  

    }  

  

    public void sendMsg(String content) {  

        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  

        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  

    }  

  

       

}  



@Component
public class Send  {

private RabbitTemplate rabbitTemplate;

/**
* 构造方法注入
*/
@Autowired
public Send(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
}

}


 
如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate
实际的ConfirmCallback为最后一次申明的ConfirmCallback。
下面给出完整的生产者代码:
 

[html]


package com.u51.lkl.springboot.amqp;  

  

import java.util.UUID;  

  

import org.springframework.amqp.rabbit.core.RabbitTemplate;  

import org.springframework.amqp.rabbit.support.CorrelationData;  

import org.springframework.beans.factory.annotation.Autowired;  

import org.springframework.stereotype.Component;  

  

/**  

 * 消息生产者  

 *   

 * @author liaokailin  

 * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $  

 */  

@Component  

public class Send implements RabbitTemplate.ConfirmCallback {  

  

    private RabbitTemplate rabbitTemplate;  

  

    /**  

     * 构造方法注入  

     */  

    @Autowired  

    public Send(RabbitTemplate rabbitTemplate) {  

        this.rabbitTemplate = rabbitTemplate;  

        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容  

    }  

  

    public void sendMsg(String content) {  

        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  

        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  

    }  

  

    /**  

     * 回调  

     */  

    @Override  

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  

        System.out.println(" 回调id:" + correlationData);  

        if (ack) {  

            System.out.println("消息成功消费");  

        } else {  

            System.out.println("消息消费失败:" + cause);  

        }  

    }  

  

}  



package com.u51.lkl.springboot.amqp;

import java.util.UUID;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 消息生产者
*
* @author liaokailin
* @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $
*/
@Component
public class Send implements RabbitTemplate.ConfirmCallback {

private RabbitTemplate rabbitTemplate;

/**
* 构造方法注入
*/
@Autowired
public Send(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容
}

public void sendMsg(String content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);
}

/**
* 回调
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(" 回调id:" + correlationData);
if (ack) {
System.out.println("消息成功消费");
} else {
System.out.println("消息消费失败:" + cause);
}
}

}


 


消息消费者

消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。


交换机

[html]


/**  

     * 针对消费者配置  

        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  

        HeadersExchange :通过添加属性key-value匹配  

        DirectExchange:按照routingkey分发到指定队列  

        TopicExchange:多关键字匹配  

     */  

    @Bean  

    public DirectExchange defaultExchange() {  

        return new DirectExchange(EXCHANGE);  

    }  

/**
* 针对消费者配置
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE);
}


在Spring Boot中交换机继承AbstractExchange类
 
 


队列

 

[html] view plain copy print?





@Bean  

    public Queue queue() {  

        return new Queue("spring-boot-queue", true); //队列持久  

  

    }  

@Bean
public Queue queue() {
return new Queue("spring-boot-queue", true); //队列持久

}



绑定

[html] view plain copy print?





@Bean  

  public Binding binding() {  

      return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  

  }  

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}


完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
 


消息消费

[html]


@Bean  

  public SimpleMessageListenerContainer messageContainer() {  

      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  

      container.setQueues(queue());  

      container.setExposeListenerChannel(true);  

      container.setMaxConcurrentConsumers(1);  

      container.setConcurrentConsumers(1);  

      container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  

      container.setMessageListener(new ChannelAwareMessageListener() {  

  

          @Override  

          public void onMessage(Message message, Channel channel) throws Exception {  

              byte[] body = message.getBody();  

              System.out.println("receive msg : " + new String(body));  

              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  

          }  

      });  

      return container;  

  }  



@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}


下面给出完整的配置文件:
 

[html] view plain copy print?





package com.u51.lkl.springboot.amqp;  

  

import org.springframework.amqp.core.AcknowledgeMode;  

import org.springframework.amqp.core.Binding;  

import org.springframework.amqp.core.BindingBuilder;  

import org.springframework.amqp.core.DirectExchange;  

import org.springframework.amqp.core.Message;  

import org.springframework.amqp.core.Queue;  

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  

import org.springframework.amqp.rabbit.connection.ConnectionFactory;  

import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  

import org.springframework.amqp.rabbit.core.RabbitTemplate;  

import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  

import org.springframework.beans.factory.config.ConfigurableBeanFactory;  

import org.springframework.context.annotation.Bean;  

import org.springframework.context.annotation.Configuration;  

import org.springframework.context.annotation.Scope;  

  

import com.rabbitmq.client.Channel;  

  

/**  

 * Qmqp Rabbitmq  

 *   

 * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/  

 *   

 * @author lkl  

 * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $  

 */  

  

@Configuration  

public class AmqpConfig {  

  

    public static final String EXCHANGE   = "spring-boot-exchange";  

    public static final String ROUTINGKEY = "spring-boot-routingKey";  

  

    @Bean  

    public ConnectionFactory connectionFactory() {  

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  

        connectionFactory.setAddresses("127.0.0.1:5672");  

        connectionFactory.setUsername("guest");  

        connectionFactory.setPassword("guest");  

        connectionFactory.setVirtualHost("/");  

        connectionFactory.setPublisherConfirms(true); //必须要设置  

        return connectionFactory;  

    }  

  

    @Bean  

    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  

    //必须是prototype类型  

    public RabbitTemplate rabbitTemplate() {  

        RabbitTemplate template = new RabbitTemplate(connectionFactory());  

        return template;  

    }  

  

    /**  

     * 针对消费者配置  

     * 1. 设置交换机类型  

     * 2. 将队列绑定到交换机  

     *   

     *   

        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  

        HeadersExchange :通过添加属性key-value匹配  

        DirectExchange:按照routingkey分发到指定队列  

        TopicExchange:多关键字匹配  

     */  

    @Bean  

    public DirectExchange defaultExchange() {  

        return new DirectExchange(EXCHANGE);  

    }  

  

    @Bean  

    public Queue queue() {  

        return new Queue("spring-boot-queue", true); //队列持久  

  

    }  

  

    @Bean  

    public Binding binding() {  

        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  

    }  

  

    @Bean  

    public SimpleMessageListenerContainer messageContainer() {  

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  

        container.setQueues(queue());  

        container.setExposeListenerChannel(true);  

        container.setMaxConcurrentConsumers(1);  

        container.setConcurrentConsumers(1);  

        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  

        container.setMessageListener(new ChannelAwareMessageListener() {  

  

            @Override  

            public void onMessage(Message message, Channel channel) throws Exception {  

                byte[] body = message.getBody();  

                System.out.println("receive msg : " + new String(body));  

                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  

            }  

        });  

        return container;  

    }  

  

}  



package com.u51.lkl.springboot.amqp;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import com.rabbitmq.client.Channel;

/**
* Qmqp Rabbitmq
*
* http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/ *
* @author lkl
* @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $
*/

@Configuration
public class AmqpConfig {

public static final String EXCHANGE = "spring-boot-exchange";
public static final String ROUTINGKEY = "spring-boot-routingKey";

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); //必须要设置
return connectionFactory;
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}

/**
* 针对消费者配置
* 1. 设置交换机类型
* 2. 将队列绑定到交换机
*
*
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE);
}

@Bean public Queue queue() { return new Queue("spring-boot-queue", true); //队列持久 }

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}

@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(new ChannelAwareMessageListener() {

@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("receive msg : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}

}


以上完成 Spring Boot与RabbitMQ的整合 
动配置
在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息

[html]


spring.rabbitmq.host=localhost  

spring.rabbitmq.port=5672  

spring.rabbitmq.username=test  

spring.rabbitmq.password=test  

spring.rabbitmq.virtualHost=test  



spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtualHost=test


后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?
 
自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码

[html]


connectionFactory.setPublisherConfirms(true);  



connectionFactory.setPublisherConfirms(true);


具体分析见后续文章的源码解读.
转载:http://blog.csdn.net/liaokailin/article/details/49559571
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: