Spring-Rabbit消费多个mq中相同的队列
2017-01-11 20:40
239 查看
因为业务规模比较大,采用多台rabbitmq服务器进行处理,在每台rabbitmq建立相同的exchange,采用客户端分片的方式,生产者根据hash分发消息到不同的服务器中。
作为消费者,必须能支持消费全部rabbitmq的相同的exchange。
这里因为4台mq,就在代码中写死数组下标
因为原生的SimpleRabbitListenerContra会默认配置一个RabbitAdmin进行队列维护,但是这个RabbitmqAdmin生成的时候配置的ConnectionFactory是全局的,这就导致这个RabbitAdmin不可用。
我们必须手动注入RabbitAdmin,并且指定正确的CinnectionFactory。
如果多台mq都连着,队列都相同,不会产生问题。
如果多台mq都连着,但是队列都不同,那么RabbitAdmin声明队列的时候都会把工程中全部的队列配置向每一个服务器声明一遍,每台mq的quene,exchange都是一样。
如果多台mq都连着,但是某些quene,exchange只针对地配置单台mq,建议剥离出工程,单独起服务消费。
或者有能力改造一下RabbitAdmin的代码,声明队列要根据containerFactory区分开,只负责声明所属服务器的配置。
使用注解的方式添加handle
建议对Bean起名的时候参考一下是否有对应的autoConfigurer配置。
因为发现,spring-boot中,如果没有存在名称
作为消费者,必须能支持消费全部rabbitmq的相同的exchange。
这里因为4台mq,就在代码中写死数组下标
package cn.jpush.sms.common; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.annotation.EnableRabbit; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.List; @Configuration @EnableRabbit public class RabbitMQConfig { @Autowired private RabbitProperties rabbitProperties; @Bean public ConnectionFactory connectionFactory() { return buildConnectionFactory(0); } @Bean public ConnectionFactory connectionFactory1() { return buildConnectionFactory(1); } @Bean public ConnectionFactory connectionFactory2() { return buildConnectionFactory(2); } @Bean public ConnectionFactory connectionFactory3() { return buildConnectionFactory(3); } private ConnectionFactory buildConnectionFactory(int i) { List<RabbitAccount> list = rabbitProperties.getAccount(); RabbitAccount rabbitAccount = list.get(i); CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitAccount.getHostname()); connectionFactory.setUsername(rabbitAccount.getUsername()); connectionFactory.setPassword(rabbitAccount.getPassword()); connectionFactory.setPort(rabbitAccount.getPort()); return connectionFactory; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory1() { return buildRabbitListenerContainerFactory(connectionFactory1(),rabbitAdmin1()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory2() { return buildRabbitListenerContainerFactory(connectionFactory2(),rabbitAdmin2()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory3() { return buildRabbitListenerContainerFactory(connectionFactory3(),rabbitAdmin3()); } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { return buildRabbitListenerContainerFactory(connectionFactory(),rabbitAdmin()); } @Bean public RabbitAdmin rabbitAdmin() { RabbitAdmin r = new RabbitAdmin(connectionFactory()); return r; } @Bean public RabbitAdmin rabbitAdmin1() { RabbitAdmin r = new RabbitAdmin(connectionFactory1()); return r; } @Bean public RabbitAdmin rabbitAdmin2() { RabbitAdmin r = new RabbitAdmin(connectionFactory2()); return r; } @Bean public RabbitAdmin rabbitAdmin3() { RabbitAdmin r = new RabbitAdmin(connectionFactory3()); return r; } private MySimpleRabbitListenerContainerFactory buildRabbitListenerContainerFactory(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) { MySimpleRabbitListenerContainerFactory factory = new MySimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setRabbitAdmin(rabbitAdmin); factory.setConcurrentConsumers(8); factory.setMaxConcurrentConsumers(10); factory.setPrefetchCount(1); factory.setAcknowledgeMode(AcknowledgeMode.NONE); return factory; } }
因为原生的SimpleRabbitListenerContra会默认配置一个RabbitAdmin进行队列维护,但是这个RabbitmqAdmin生成的时候配置的ConnectionFactory是全局的,这就导致这个RabbitAdmin不可用。
我们必须手动注入RabbitAdmin,并且指定正确的CinnectionFactory。
package cn.jpush.sms.common; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; public class MySimpleRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory { private RabbitAdmin rabbitAdmin; @Override protected void initializeContainer(SimpleMessageListenerContainer instance) { super.initializeContainer(instance); // 初始化Container完成后,手动注入一个RabbitAdmin // RabbitAdmin原本设计为ApplicationContext中只需要一个实例即可,它主要做的事是队列声明。 // RabbitAdmin会扫描Context中全部的Quene,Exchange,routeKey,在所配置的的Connection上进行队列配置声明。这种设计在只连接一台rabbitmq服务器的时候是正常的。 // 但是在接入多台mq的情况下,要必须在每个Connection中都配置一个RabbitAdmin,否则就不能进行向服务器进行redeclared。 // 自定义多个RabbitAdmin的副作用是每个Connection都会声明工程中配置的全部的队列,不会区分containerFactory。 // 工程中连接多个mq服务器,但是都是同名称的队列,即使存在声明多次的副作用,最终的体现和声明一次是一样的。 instance.setRabbitAdmin(rabbitAdmin); } public void setRabbitAdmin(RabbitAdmin rabbitAdmin) { this.rabbitAdmin = rabbitAdmin; } }
如果多台mq都连着,队列都相同,不会产生问题。
如果多台mq都连着,但是队列都不同,那么RabbitAdmin声明队列的时候都会把工程中全部的队列配置向每一个服务器声明一遍,每台mq的quene,exchange都是一样。
如果多台mq都连着,但是某些quene,exchange只针对地配置单台mq,建议剥离出工程,单独起服务消费。
或者有能力改造一下RabbitAdmin的代码,声明队列要根据containerFactory区分开,只负责声明所属服务器的配置。
使用注解的方式添加handle
package cn.jpush.sms.handler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.List; @Component public class PushDeliveryProcessHandler { private static final Logger LOG = LoggerFactory.getLogger(PushDeliveryProcessHandler.class); @RabbitListener(containerFactory = "rabbitListenerContainerFactory1", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}")) @RabbitListener(containerFactory = "rabbitListenerContainerFactory2", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}")) @RabbitListener(containerFactory = "rabbitListenerContainerFactory3", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}")) @RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}")) public void handle(Message message) { //to do something } }
建议对Bean起名的时候参考一下是否有对应的autoConfigurer配置。
因为发现,spring-boot中,如果没有存在名称
rabbitListenerContainerFactory的bean,它就会创建这样的类型bean,但是我们并不想让它创建,则我们在起名的就占用掉它,这样就不会让spring-boot的自动配置误会了。
相关文章推荐
- RabbitMQ消息队列+spring监听mq服务器多个ip,接收消费mq消息(三)
- RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息
- RabbitMQ消息队列+spring监听mq服务器多个ip,接收消费mq消息(二)
- spring boot Rabbitmq集成,延时消息队列实现
- 详细介绍Spring Boot + RabbitMQ实现延迟队列
- RabbitMQ消息队列+spring监听mq服务器,接收消费mq消息
- spring+rabbitmq的简单实现和延迟队列实现
- spring boot rabbitmq 多MQ配置 自动 创建 队列 RPC
- spring boot Rabbitmq集成,延时消息队列实现
- 集群与负载均衡系列(6)——消息队列之rabbitMQ+spring-boot+spring amqp发送可靠的消息
- spring+activemq实战之配置监听多队列实现不同队列消息消费
- Spring Boot + RabbitMQ 实现消息队列场景
- spring rabbitmq 动态绑定exchange,routingkey,queue
- Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程
- Spring + JMS + ActiveMQ实现简单的消息队列(监听器异步实现)
- RabbitMQ .NET消息队列使用入门(一)【简单示例】
- RabbitMQ+PHP 消息队列环境配置
- Spring Rabbitmq HelloWorld实例
- RabbitMQ spring 使用总结
- 对Rabbitmq rpc返回队列的一点理解