您的位置:首页 > 编程语言 > Java开发

spring boot实战(第十二篇)整合RabbitMQ

2017-04-11 18:42 323 查看
目录(?)[+]

前言

本篇主要讲述Spring Boot与RabbitMQ的整合,内容非常简单,纯API的调用操作。 操作之间需要加入依赖Jar

[html]
view plain
copy





<dependency>  
<groupId>org.springframework.boot</groupId>  
<artifactId>spring-boot-starter-amqp</artifactId>  
lt;/dependency>  

消息生产者

不论是创建消息消费者或生产者都需要ConnectionFactory

ConnectionFactory配置

创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)

[html]
view plain
copy





@Configuration  
public class AmqpConfig {  
  
    public static final String EXCHANGE   = "spring-boot-exchange";  
    public static final String ROUTINGKEY = "spring-boot-routingKey";  
  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses("127.0.0.1:5672");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setVirtualHost("/");  
        connectionFactory.setPublisherConfirms(true); //必须要设置  
        return connectionFactory;  
    }  
}  

这里需要显示调用

[html]
view plain
copy





connectionFactory.setPublisherConfirms(true);  

才能进行消息的回调。

RabbitTemplate

通过使用RabbitTemplate来对开发者提供API操作

[html]
view plain
copy





@Bean  
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
//必须是prototype类型  
public RabbitTemplate rabbitTemplate() {  
    RabbitTemplate template = new RabbitTemplate(connectionFactory());  
    return template;  
}  

这里设置为原型,具体的原因在后面会讲到

  在发送消息时通过调用RabbitTemplate中的如下方法

[html]
view plain
copy





public void convertAndSend(String exchange, String routingKey, final Object object, CorrelationData correlationData)  

exchange:交换机名称

routingKey:路由关键字

object:发送的消息内容

correlationData:消息ID

因此生产者代码详单简洁

Send.java

[html]
view plain
copy





@Component  
public class Send  {  
  
    private RabbitTemplate rabbitTemplate;  
  
    /**  
     * 构造方法注入  
     */  
    @Autowired  
    public Send(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
    }  
  
    public void sendMsg(String content) {  
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    }  
  
       
}  

如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate
实际的ConfirmCallback为最后一次申明的ConfirmCallback。
下面给出完整的生产者代码:

[html]
view plain
copy





package com.lkl.springboot.amqp;  
  
import java.util.UUID;  
  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.support.CorrelationData;  
import org.springframework.beans.factory.annotation.Autowired;  
import org.springframework.stereotype.Component;  
  
/**  
 * 消息生产者  
 *   
 * @author liaokailin  
 * @version $Id: Send.java, v 0.1 2015年11月01日 下午4:22:25 liaokailin Exp $  
 */  
@Component  
public class Send implements RabbitTemplate.ConfirmCallback {  
  
    private RabbitTemplate rabbitTemplate;  
  
    /**  
     * 构造方法注入  
     */  
    @Autowired  
    public Send(RabbitTemplate rabbitTemplate) {  
        this.rabbitTemplate = rabbitTemplate;  
        rabbitTemplate.setConfirmCallback(this); //rabbitTemplate如果为单例的话,那回调就是最后设置的内容  
    }  
  
    public void sendMsg(String content) {  
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());  
        rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE, AmqpConfig.ROUTINGKEY, content, correlationId);  
    }  
  
    /**  
     * 回调  
     */  
    @Override  
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {  
        System.out.println(" 回调id:" + correlationData);  
        if (ack) {  
            System.out.println("消息成功消费");  
        } else {  
            System.out.println("消息消费失败:" + cause);  
        }  
    }  
  
}  

消息消费者

消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。

交换机

[html]
view plain
copy





/**  
     * 针对消费者配置  
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
        return new DirectExchange(EXCHANGE);  
    }  

在Spring Boot中交换机继承AbstractExchange类

队列

[html]
view plain
copy





@Bean  
    public Queue queue() {  
        return new Queue("spring-boot-queue", true); //队列持久  
  
    }  

绑定

[html]
view plain
copy





@Bean  
  public Binding binding() {  
      return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
  }  

完成以上工作后,在spring boot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。

消息消费

[html]
view plain
copy





@Bean  
  public SimpleMessageListenerContainer messageContainer() {  
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
      container.setQueues(queue());  
      container.setExposeListenerChannel(true);  
      container.setMaxConcurrentConsumers(1);  
      container.setConcurrentConsumers(1);  
      container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
      container.setMessageListener(new ChannelAwareMessageListener() {  
  
          @Override  
          public void onMessage(Message message, Channel channel) throws Exception {  
              byte[] body = message.getBody();  
              System.out.println("receive msg : " + new String(body));  
              channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
          }  
      });  
      return container;  
  }  

下面给出完整的配置文件:

[html]
view plain
copy





package com.lkl.springboot.amqp;  
  
import org.springframework.amqp.core.AcknowledgeMode;  
import org.springframework.amqp.core.Binding;  
import org.springframework.amqp.core.BindingBuilder;  
import org.springframework.amqp.core.DirectExchange;  
import org.springframework.amqp.core.Message;  
import org.springframework.amqp.core.Queue;  
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;  
import org.springframework.amqp.rabbit.connection.ConnectionFactory;  
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;  
import org.springframework.beans.factory.config.ConfigurableBeanFactory;  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.context.annotation.Scope;  
  
import com.rabbitmq.client.Channel;  
  
/**  
 * Qmqp Rabbitmq  
 *   
 * http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/  
 *   
 * @author lkl  
 * @version $Id: AmqpConfig.java, v 0.1 2015年11月01日 下午2:05:37 lkl Exp $  
 */  
  
@Configuration  
public class AmqpConfig {  
  
    public static final String EXCHANGE   = "spring-boot-exchange";  
    public static final String ROUTINGKEY = "spring-boot-routingKey";  
  
    @Bean  
    public ConnectionFactory connectionFactory() {  
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();  
        connectionFactory.setAddresses("127.0.0.1:5672");  
        connectionFactory.setUsername("guest");  
        connectionFactory.setPassword("guest");  
        connectionFactory.setVirtualHost("/");  
        connectionFactory.setPublisherConfirms(true); //必须要设置  
        return connectionFactory;  
    }  
  
    @Bean  
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  
    //必须是prototype类型  
    public RabbitTemplate rabbitTemplate() {  
        RabbitTemplate template = new RabbitTemplate(connectionFactory());  
        return template;  
    }  
  
    /**  
     * 针对消费者配置  
     * 1. 设置交换机类型  
     * 2. 将队列绑定到交换机  
     *   
     *   
        FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念  
        HeadersExchange :通过添加属性key-value匹配  
        DirectExchange:按照routingkey分发到指定队列  
        TopicExchange:多关键字匹配  
     */  
    @Bean  
    public DirectExchange defaultExchange() {  
        return new DirectExchange(EXCHANGE);  
    }  
  
    @Bean  
    public Queue queue() {  
        return new Queue("spring-boot-queue", true); //队列持久  
  
    }  
  
    @Bean  
    public Binding binding() {  
        return BindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);  
    }  
  
    @Bean  
    public SimpleMessageListenerContainer messageContainer() {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());  
        container.setQueues(queue());  
        container.setExposeListenerChannel(true);  
        container.setMaxConcurrentConsumers(1);  
        container.setConcurrentConsumers(1);  
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认  
        container.setMessageListener(new ChannelAwareMessageListener() {  
  
            @Override  
            public void onMessage(Message message, Channel channel) throws Exception {  
                byte[] body = message.getBody();  
                System.out.println("receive msg : " + new String(body));  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费  
            }  
        });  
        return container;  
    }  
  
}  

以上完成 Spring Boot与RabbitMQ的整合 

自动配置

在Spring Boot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息

[html]
view plain
copy





spring.rabbitmq.host=localhost  
spring.rabbitmq.port=5672  
spring.rabbitmq.username=test  
spring.rabbitmq.password=test  
spring.rabbitmq.virtualHost=test  

后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?

自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码

[html]
view plain
copy





connectionFactory.setPublisherConfirms(true);  

具体分析见后续文章的源码解读.

转载请注明 
http://blog.csdn.net/liaokailin/article/details/48186331
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: