您的位置:首页 > 编程语言 > Java开发

Spring-Rabbit消费多个mq中相同的队列

2017-01-11 20:40 239 查看
因为业务规模比较大,采用多台rabbitmq服务器进行处理,在每台rabbitmq建立相同的exchange,采用客户端分片的方式,生产者根据hash分发消息到不同的服务器中。

作为消费者,必须能支持消费全部rabbitmq的相同的exchange。

这里因为4台mq,就在代码中写死数组下标

package cn.jpush.sms.common;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

@Configuration
@EnableRabbit
public class RabbitMQConfig {

@Autowired
private RabbitProperties rabbitProperties;

@Bean
public ConnectionFactory connectionFactory() {
return buildConnectionFactory(0);
}

@Bean
public ConnectionFactory connectionFactory1() {
return buildConnectionFactory(1);
}

@Bean
public ConnectionFactory connectionFactory2() {
return buildConnectionFactory(2);
}

@Bean
public ConnectionFactory connectionFactory3() {
return buildConnectionFactory(3);
}

private ConnectionFactory buildConnectionFactory(int i) {
List<RabbitAccount> list = rabbitProperties.getAccount();
RabbitAccount rabbitAccount = list.get(i);
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitAccount.getHostname());
connectionFactory.setUsername(rabbitAccount.getUsername());
connectionFactory.setPassword(rabbitAccount.getPassword());
connectionFactory.setPort(rabbitAccount.getPort());
return connectionFactory;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory1() {
return buildRabbitListenerContainerFactory(connectionFactory1(),rabbitAdmin1());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory2() {
return buildRabbitListenerContainerFactory(connectionFactory2(),rabbitAdmin2());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory3() {
return buildRabbitListenerContainerFactory(connectionFactory3(),rabbitAdmin3());
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
return buildRabbitListenerContainerFactory(connectionFactory(),rabbitAdmin());
}

@Bean
public RabbitAdmin rabbitAdmin() {
RabbitAdmin r = new RabbitAdmin(connectionFactory());
return r;
}

@Bean
public RabbitAdmin rabbitAdmin1() {
RabbitAdmin r = new RabbitAdmin(connectionFactory1());
return r;
}

@Bean
public RabbitAdmin rabbitAdmin2() {
RabbitAdmin r = new RabbitAdmin(connectionFactory2());
return r;
}

@Bean
public RabbitAdmin rabbitAdmin3() {
RabbitAdmin r = new RabbitAdmin(connectionFactory3());
return r;
}

private MySimpleRabbitListenerContainerFactory buildRabbitListenerContainerFactory(ConnectionFactory connectionFactory, RabbitAdmin rabbitAdmin) {
MySimpleRabbitListenerContainerFactory factory = new MySimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setRabbitAdmin(rabbitAdmin);
factory.setConcurrentConsumers(8);
factory.setMaxConcurrentConsumers(10);
factory.setPrefetchCount(1);
factory.setAcknowledgeMode(AcknowledgeMode.NONE);
return factory;
}

}


因为原生的SimpleRabbitListenerContra会默认配置一个RabbitAdmin进行队列维护,但是这个RabbitmqAdmin生成的时候配置的ConnectionFactory是全局的,这就导致这个RabbitAdmin不可用。

我们必须手动注入RabbitAdmin,并且指定正确的CinnectionFactory。

package cn.jpush.sms.common;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

public class MySimpleRabbitListenerContainerFactory extends SimpleRabbitListenerContainerFactory {

private RabbitAdmin rabbitAdmin;

@Override
protected void initializeContainer(SimpleMessageListenerContainer instance) {
super.initializeContainer(instance);
// 初始化Container完成后,手动注入一个RabbitAdmin
// RabbitAdmin原本设计为ApplicationContext中只需要一个实例即可,它主要做的事是队列声明。
// RabbitAdmin会扫描Context中全部的Quene,Exchange,routeKey,在所配置的的Connection上进行队列配置声明。这种设计在只连接一台rabbitmq服务器的时候是正常的。
// 但是在接入多台mq的情况下,要必须在每个Connection中都配置一个RabbitAdmin,否则就不能进行向服务器进行redeclared。
// 自定义多个RabbitAdmin的副作用是每个Connection都会声明工程中配置的全部的队列,不会区分containerFactory。
// 工程中连接多个mq服务器,但是都是同名称的队列,即使存在声明多次的副作用,最终的体现和声明一次是一样的。
instance.setRabbitAdmin(rabbitAdmin);
}

public void setRabbitAdmin(RabbitAdmin rabbitAdmin) {
this.rabbitAdmin = rabbitAdmin;
}

}


如果多台mq都连着,队列都相同,不会产生问题。

如果多台mq都连着,但是队列都不同,那么RabbitAdmin声明队列的时候都会把工程中全部的队列配置向每一个服务器声明一遍,每台mq的quene,exchange都是一样。

如果多台mq都连着,但是某些quene,exchange只针对地配置单台mq,建议剥离出工程,单独起服务消费。

或者有能力改造一下RabbitAdmin的代码,声明队列要根据containerFactory区分开,只负责声明所属服务器的配置。

使用注解的方式添加handle

package cn.jpush.sms.handler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class PushDeliveryProcessHandler {
private static final Logger LOG = LoggerFactory.getLogger(PushDeliveryProcessHandler.class);

@RabbitListener(containerFactory = "rabbitListenerContainerFactory1", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}"))
@RabbitListener(containerFactory = "rabbitListenerContainerFactory2", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}"))
@RabbitListener(containerFactory = "rabbitListenerContainerFactory3", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}"))
@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "${rb.queue}", durable = "false"), exchange = @Exchange(value = "${rb.exchange}", type = "direct"), key = "${rb.routeKey}"))

public void handle(Message message) {
//to do something
}

}


建议对Bean起名的时候参考一下是否有对应的autoConfigurer配置。

因为发现,spring-boot中,如果没有存在名称
rabbitListenerContainerFactory
的bean,它就会创建这样的类型bean,但是我们并不想让它创建,则我们在起名的就占用掉它,这样就不会让spring-boot的自动配置误会了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  exchange