spring boot实战(第十二篇)整合RabbitMQ
2016-10-10 11:06
281 查看
前言
最近几篇文章将围绕消息中间件RabbitMQ展开,对于RabbitMQ基本概念这里不阐述,主要讲解RabbitMQ的基本用法、SpringBoot与RabbitMQ整合源码分析。
RabbitMQ安装
在使用消息中间件RabbitMQ之前就是安装RabbitMQ。安装erlang:yuminstallerlang
下载RabbitMQ安装包:
解压安装包、配置环境变量RABBITMQ_HOME
参考网址:https://www.rabbitmq.com/install-generic-unix.html
windows:
RabbitMQ配置
1.安装完成后需要对RabbitMQ进行配置,在etc/rabbitmq目录下创建两个文件:rabbitmq-env.conf环境信息配置
RABBITMQ_NODE_IP_ADDRESS=127.0.0.1 RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=node01
rabbitmq.config 核心配置文件
[{rabbit,[{loopback_users,[]}]}].
该配置表示是的默认用户guest用户可以远程访问mq(广域网不能访问,内网可以访问)
2.启动RabbitMQ执行命令rabbitmq-server
RabbitMQ3.5.4.Copyright(C)2007-2015PivotalSoftware,Inc. ####LicensedundertheMPL.Seehttp://www.rabbitmq.com/#### ##########Logs:/Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01.log ########/Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01-sasl.log ########## Startingbroker...completedwith0plugins.
3.RabbitMQ提供WEB-UI管理控制台,使用rabbitmq-pluginsenablerabbitmq_management命令启用,重启后可以看到
Startingbroker...completedwith6plugins.
表明WEB-UI控制台启动成功,访问:http://localhost:15672/
登陆进入:
通过该控制台可以方便管理RabbitMQ。
创建Test用户
RabbitMQ默认使用guest用户,下面讲述如何创建一个test用户,最快捷的做法使用web管理控制台这里使用命令创建:
rabbitmqctladd_usertesttest
rabbitmqctlset_user_tagstestadministrator
tag分为四种"management","policymaker","monitoring""administrator" 详见 http://www.rabbitmq.com/management.html
RabbitMQ其他
在实际使用RabbitMQ中还需要涉及到RabbitMQ的集群、高可用(采用镜像队列实现)以后有机会再详细阐述,有兴趣可参考https://www.rabbitmq.com/documentation.htmlRabbitMQJavaClient
RabbitMQ客户端支持语言种类繁多,官方都一一举例:https://www.rabbitmq.com/getstarted.html这里主要自己开发一个小的demo
消息消费者
操作步骤:创建连接工厂ConnectionFactory
获取连接Connection
通过连接获取通信通道Channel
声明交换机Exchange:交换机类型分为四类:
FanoutExchange:将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange:通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
声明队列Queue
将队列和交换机绑定
创建消费者
执行消息的消费
packageorg.lkl.mq.rabbitmq.test; importjava.io.IOException; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.TimeoutException; importcom.rabbitmq.client.Channel; importcom.rabbitmq.client.Connection; importcom.rabbitmq.client.ConnectionFactory; importcom.rabbitmq.client.ConsumerCancelledException; importcom.rabbitmq.client.QueueingConsumer; importcom.rabbitmq.client.QueueingConsumer.Delivery; importcom.rabbitmq.client.ShutdownSignalException; /** *客户端01 * *@authorliaokailin *@version$Id:Receive01.java,v0.12015年11月01日下午3:47:58liaokailinExp$ */ publicclassReceive01{ publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,ShutdownSignalException, ConsumerCancelledException,InterruptedException{ ConnectionFactoryfacotry=newConnectionFactory(); facotry.setUsername("test"); facotry.setPassword("test"); facotry.setVirtualHost("test"); facotry.setHost("localhost"); Connectionconn=facotry.newConnection();//获取一个链接 //通过Channel进行通信 Channelchannel=conn.createChannel(); intprefetchCount=1; channel.basicQos(prefetchCount);//保证公平分发 booleandurable=true; //声明交换机 channel.exchangeDeclare(Send.EXCHANGE_NAME,"direct",durable);//按照routingKey过滤 //声明队列 StringqueueName=channel.queueDeclare("queue-01",true,true,false,null).getQueue(); //将队列和交换机绑定 StringroutingKey="lkl-0"; //队列可以多次绑定,绑定不同的交换机或者路由key channel.queueBind(queueName,Send.EXCHANGE_NAME,routingKey); //创建消费者 QueueingConsumerconsumer=newQueueingConsumer(channel); //将消费者和队列关联 channel.basicConsume(queueName,false,consumer);//设置为false表面手动确认消息消费 //获取消息 System.out.println("Waitmessage...."); while(true){ Deliverydelivery=consumer.nextDelivery(); Stringmsg=newString(delivery.getBody()); Stringkey=delivery.getEnvelope().getRoutingKey(); System.out.println("Received'"+key+"':'"+msg+"'"); System.out.println("Handlemessage"); TimeUnit.SECONDS.sleep(3);//mockhandlemessage channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);//确定该消息已成功消费 } } }
消息生产者
操作步骤:创建连接工厂ConnectionFactory
获取连接Connection
通过连接获取通信通道Channel
发送消息
packageorg.lkl.mq.rabbitmq.test; importjava.io.IOException; importjava.util.concurrent.TimeoutException; importcom.rabbitmq.client.Channel; importcom.rabbitmq.client.ConfirmListener; importcom.rabbitmq.client.Connection; importcom.rabbitmq.client.ConnectionFactory; importcom.rabbitmq.client.MessageProperties; /** *消息publish * *@authorliaokailin *@version$Id:Send.java,v0.12015年10月22日下午3:48:09liaokailinExp$ */ publicclassSend{ publicfinalstaticStringEXCHANGE_NAME="test-exchange"; publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,InterruptedException{ /** *配置amqpbroker连接信息 */ ConnectionFactoryfacotry=newConnectionFactory(); facotry.setUsername("test"); facotry.setPassword("test"); facotry.setVirtualHost("test"); facotry.setHost("localhost"); Connectionconn=facotry.newConnection();//获取一个链接 //通过Channel进行通信 Channelchannel=conn.createChannel(); //channel.exchangeDeclare(Send.EXCHANGE_NAME,"direct",true);//如果消费者已创建,这里可不声明 channel.confirmSelect();//Enablespublisheracknowledgementsonthischannel channel.addConfirmListener(newConfirmListener(){ @Override publicvoidhandleNack(longdeliveryTag,booleanmultiple)throwsIOException{ System.out.println("[handleNack]:"+deliveryTag+","+multiple); } @Override publicvoidhandleAck(longdeliveryTag,booleanmultiple)throwsIOException{ System.out.println("[handleAck]:"+deliveryTag+","+multiple); } }); Stringmessage="lkl-"; //消息持久化MessageProperties.PERSISTENT_TEXT_PLAIN //发送多条信息,每条消息对应routekey都不一致 for(inti=0;i<10;i++){ channel.basicPublish(EXCHANGE_NAME,message+(i%2),MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes()); System.out.println("[send]msg"+(message+i)+"ofroutingKeyis"+(message+(i%2))); } } }
在设置消息被消费的回调前需显示调用
channel.confirmSelect()
否则回调函数无法调用
先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。
================================
前言
本篇主要讲述消息生产者
不论是创建消息消费者或生产者都需要ConnectionFactory
ConnectionFactory配置
创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)
这里需要显示调用
connectionFactory.setPublisherConfirms(true);
才能进行消息的回调。
RabbitTemplate
通过使用RabbitTemplate来对开发者提供API操作@Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //必须是prototype类型 publicRabbitTemplaterabbitTemplate(){ RabbitTemplatetemplate=newRabbitTemplate(connectionFactory()); returntemplate; }
这里设置为原型,具体的原因在后面会讲到
在发送消息时通过调用RabbitTemplate中的如下方法
publicvoidconvertAndSend(Stringexchange,StringroutingKey,finalObjectobject,CorrelationDatacorrelationData)
exchange:交换机名称
routingKey:路由关键字
object:发送的消息内容
correlationData:消息ID
因此生产者代码详单简洁
Send.java
@Component
publicclassSend{
privateRabbitTemplaterabbitTemplate;
/**
*构造方法注入
*/
@Autowired
publicSend(RabbitTemplaterabbitTemplate){
this.rabbitTemplate=rabbitTemplate;
}
publicvoidsendMsg(Stringcontent){
CorrelationDatacorrelationId=newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE,AmqpConfig.ROUTINGKEY,content,correlationId);
}
}
如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate
实际的ConfirmCallback为最后一次申明的ConfirmCallback。
下面给出完整的生产者代码:
packagecom.lkl.springboot.amqp;
importjava.util.UUID;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.amqp.rabbit.support.CorrelationData;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.stereotype.Component;
/**
*消息生产者
*
*@authorliaokailin
*@version$Id:Send.java,v0.12015年11月01日下午4:22:25liaokailinExp$
*/
@Component
publicclassSendimplementsRabbitTemplate.ConfirmCallback{
privateRabbitTemplaterabbitTemplate;
/**
*构造方法注入
*/
@Autowired
publicSend(RabbitTemplaterabbitTemplate){
this.rabbitTemplate=rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);//rabbitTemplate如果为单例的话,那回调就是最后设置的内容
}
publicvoidsendMsg(Stringcontent){
CorrelationDatacorrelationId=newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE,AmqpConfig.ROUTINGKEY,content,correlationId);
}
/**
*回调
*/
@Override
publicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){
System.out.println("回调id:"+correlationData);
if(ack){
System.out.println("消息成功消费");
}else{
System.out.println("消息消费失败:"+cause);
}
}
}
消息消费者
消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。交换机
/**
*针对消费者配置
FanoutExchange:将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange:通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
publicDirectExchangedefaultExchange(){
returnnewDirectExchange(EXCHANGE);
}
在SpringBoot中交换机继承AbstractExchange类
队列
@Bean
publicQueuequeue(){
returnnewQueue("spring-boot-queue",true);//队列持久
}
绑定
@Bean
publicBindingbinding(){
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}
完成以上工作后,在springboot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。
消息消费
@Bean
publicSimpleMessageListenerContainermessageContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener(){
@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
byte[]body=message.getBody();
System.out.println("receivemsg:"+newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费
}
});
returncontainer;
}
下面给出完整的配置文件:
packagecom.lkl.springboot.amqp;
importorg.springframework.amqp.core.AcknowledgeMode;
importorg.springframework.amqp.core.Binding;
importorg.springframework.amqp.core.BindingBuilder;
importorg.springframework.amqp.core.DirectExchange;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.Queue;
importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;
importorg.springframework.amqp.rabbit.connection.ConnectionFactory;
importorg.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
importorg.springframework.beans.factory.config.ConfigurableBeanFactory;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.context.annotation.Scope;
importcom.rabbitmq.client.Channel;
/**
*QmqpRabbitmq
*
*http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/*
*@authorlkl
*@version$Id:AmqpConfig.java,v0.12015年11月01日下午2:05:37lklExp$
*/
@Configuration
publicclassAmqpConfig{
publicstaticfinalStringEXCHANGE="spring-boot-exchange";
publicstaticfinalStringROUTINGKEY="spring-boot-routingKey";
@Bean
publicConnectionFactoryconnectionFactory(){
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);//必须要设置
returnconnectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
publicRabbitTemplaterabbitTemplate(){
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
returntemplate;
}
/**
*针对消费者配置
*1.设置交换机类型
*2.将队列绑定到交换机
*
*
FanoutExchange:将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange:通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
publicDirectExchangedefaultExchange(){
returnnewDirectExchange(EXCHANGE);
}
@Bean
publicQueuequeue(){
returnnewQueue("spring-boot-queue",true);//队列持久
}
@Bean
publicBindingbinding(){
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}
@Bean
publicSimpleMessageListenerContainermessageContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener(){
@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
byte[]body=message.getBody();
System.out.println("receivemsg:"+newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费
}
});
returncontainer;
}
}
以上完成SpringBoot与RabbitMQ的整合
自动配置
在SpringBoot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtualHost=test
后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?
自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码
connectionFactory.setPublisherConfirms(true);
具体分析见后续文章的源码解读.
=========================================
前言
本篇开始讲述SpringBoot如何整合RabbitMQ(实际上Spring就整合了RabbitMQ)。RabbitAdmin
在上篇中遗留AmqpAdmin没有讲解,现在来看下该部分代码
publicAmqpAdminamqpAdmin(CachingConnectionFactoryconnectionFactory){
returnnewRabbitAdmin(connectionFactory);
}
创建RabbitAdmin实例,调用构造方法
publicRabbitAdmin(ConnectionFactoryconnectionFactory){
this.connectionFactory=connectionFactory;
Assert.notNull(connectionFactory,"ConnectionFactorymustnotbenull");
this.rabbitTemplate=newRabbitTemplate(connectionFactory);
}
创建连接工厂、rabbitTemplate,其中ConnectionFactory采用上一篇中自定义bean
publicConnectionFactoryconnectionFactory(){
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisherConfirms(true);//必须要设置
returnconnectionFactory;
}
为CachingConnectionFactory实例,其缓存模式为通道缓存
privatevolatileCacheModecacheMode=CacheMode.CHANNEL;
接下来看下RabbitAdmin类定义:
publicclassRabbitAdminimplementsAmqpAdmin,ApplicationContextAware,InitializingBean{
...
}
实现接口AmqpAdmin(定义若干RabbitMQ操作父接口),这里需要强调的是InitializingBean,实现该接口则会调用afterPropertiesSet方法
publicvoidafterPropertiesSet(){
synchronized(this.lifecycleMonitor){
if(this.running||!this.autoStartup){
return;
}
if(this.connectionFactoryinstanceofCachingConnectionFactory&&
((CachingConnectionFactory)this.connectionFactory).getCacheMode()==CacheMode.CONNECTION){
logger.warn("RabbitAdminautodeclarationisnotsupportedwithCacheMode.CONNECTION");
return;
}
this.connectionFactory.addConnectionListener(newConnectionListener(){
//Preventstackoverflow...
privatefinalAtomicBooleaninitializing=newAtomicBoolean(false);
@Override
publicvoidonCreate(Connectionconnection){
if(!initializing.compareAndSet(false,true)){
//Ifwearealreadyinitializing,wedon'tneedtodoitagain...
return;
}
try{
initialize();
}
finally{
initializing.compareAndSet(true,false);
}
}
@Override
publicvoidonClose(Connectionconnection){
}
});
this.running=true;
}
}
synchronized(this.lifecycleMonitor)加锁保证同一时间只有一个线程访问该代码,随后调用this.connectionFactory.addConnectionListener添加连接监听,各连接工厂关系:
实际调用为CachingConnectionFactory
publicvoidaddConnectionListener(ConnectionListenerlistener){
super.addConnectionListener(listener);
//Iftheconnectionisalreadyaliveweassumethatthenewlistenerwantstobenotified
if(this.connection!=null){
listener.onCreate(this.connection);
}
}
此时connection为null,无法执行到listener.onCreate(this.connection);
往CompositeConnectionListenerconnectionListener中添加监听信息,最终保证在集合中
privateList<ConnectionListener>delegates=newCopyOnWriteArrayList<ConnectionListener>();
这里添加的监听代码执行,在后面调用时再来讲解。
至此~~RabbitAdmin创建完成。
Exchange
接下来继续来看AmqpConfig.java中的代码
@Bean
publicDirectExchangedefaultExchange(){
returnnewDirectExchange(EXCHANGE);
}
以上代码创建一个交换机,交换机类型为direct
在申明交换机时需要指定交换机名称,默认创建可持久交换机
Queue
publicQueuequeue(){
returnnewQueue("spring-boot-queue",true);//队列持久
}
默认创建可持久队列
Binding
@Bean
publicBindingbinding(){
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}
BindingBuilder.bind(queue())实现为:
publicstaticDestinationConfigurerbind(Queuequeue){
returnnewDestinationConfigurer(queue.getName(),DestinationType.QUEUE);
}
DestinationConfigurer通过name、type区分不同配置信息,其to()方法为重载方法,传递参数为四种交换机,分别返回XxxExchangeRoutingKeyConfigurer,其中with方法返回Bingding实例,因此在Binding信息中存储了
队列、交换机、路由key等相关信息
publicclassBindingextendsAbstractDeclarable{
publicstaticenumDestinationType{
QUEUE,EXCHANGE;
}
privatefinalStringdestination;
privatefinalStringexchange;
privatefinalStringroutingKey;
privatefinalMap<String,Object>arguments;
privatefinalDestinationTypedestinationType;
...
}
以上信息理解都非常简单,下面来看比较复杂点的SimpleMessageListenerContainer
SimpleMessageListenerContainer
@Bean
publicSimpleMessageListenerContainermessageContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener(){
@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
byte[]body=message.getBody();
System.out.println("receivemsg:"+newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费
}
});
returncontainer;
}
查看其实现的接口,注意SmartLifecycle
接下来设置队列信息,在AbstractMessageListenerContainer
privatevolatileList<String>queueNames=newCopyOnWriteArrayList<String>();
添加队列信息
AbstractMessageListenerContainer#exposeListenerChannel设置为true
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
设置并发消费者数量,默认情况为1
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
设置消费者成功消费消息后确认模式,分为两种
自动模式,默认模式,在RabbitMQBroker消息发送到消费者后自动删除
手动模式,消费者客户端显示编码确认消息消费完成,Broker给生产者发送回调,消息删除
接下来设置消费者端消息监听,为privatevolatileObjectmessageListener赋值
到这里消息监听容器也创建完成了,但令人纳闷的时,消费者如何去消费消息呢?从这里完全看不出来。那么接下来看下SmartLifecycle接口
SmartLifecycle
熟悉Spring都应该知道该接口,其定义为:publicinterfaceSmartLifecycleextendsLifecycle,Phased{
booleanisAutoStartup();
voidstop(Runnablecallback);
}
其中的isAutoStartup设置为true时,会自动调用Lifecycle接口中的start方法,既然我们为源码分析,也简单看下这个聪明的声明周期接口是如何实现它的聪明方法的
在
[html]
protectedvoidfinishRefresh(){
//Initializelifecycleprocessorforthiscontext.
initLifecycleProcessor();
//Propagaterefreshtolifecycleprocessorfirst.
getLifecycleProcessor().onRefresh();
//Publishthefinalevent.
publishEvent(newContextRefreshedEvent(this));
//ParticipateinLiveBeansViewMBean,ifactive.
LiveBeansView.registerApplicationContext(this);
}
其中initLifecycleProcessor初始化生命周期处理器,
[html]
protectedvoidinitLifecycleProcessor(){
ConfigurableListableBeanFactorybeanFactory=getBeanFactory();
if(beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)){
this.lifecycleProcessor=
beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME,LifecycleProcessor.class);
if(logger.isDebugEnabled()){
logger.debug("UsingLifecycleProcessor["+this.lifecycleProcessor+"]");
}
}
else{
DefaultLifecycleProcessordefaultProcessor=newDefaultLifecycleProcessor();
defaultProcessor.setBeanFactory(beanFactory);
this.lifecycleProcessor=defaultProcessor;
beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME,this.lifecycleProcessor);
if(logger.isDebugEnabled()){
logger.debug("UnabletolocateLifecycleProcessorwithname'"+
LIFECYCLE_PROCESSOR_BEAN_NAME+
"':usingdefault["+this.lifecycleProcessor+"]");
}
}
}
注册DefaultLifecycleProcessor对应bean
getLifecycleProcessor().onRefresh()调用DefaultLifecycleProcessor中方法onRefresh,调用startBeans(true)
[html]
privatevoidstartBeans(booleanautoStartupOnly){
Map<String,Lifecycle>lifecycleBeans=getLifecycleBeans();
Map<Integer,LifecycleGroup>phases=newHashMap<Integer,LifecycleGroup>();
for(Map.Entry<String,?extendsLifecycle>entry:lifecycleBeans.entrySet()){
Lifecyclebean=entry.getValue();
if(!autoStartupOnly||(beaninstanceofSmartLifecycle&&((SmartLifecycle)bean).isAutoStartup())){
intphase=getPhase(bean);
LifecycleGroupgroup=phases.get(phase);
if(group==null){
group=newLifecycleGroup(phase,this.timeoutPerShutdownPhase,lifecycleBeans,autoStartupOnly);
phases.put(phase,group);
}
group.add(entry.getKey(),bean);
}
}
if(phases.size()>0){
List<Integer>keys=newArrayList<Integer>(phases.keySet());
Collections.sort(keys);
for(Integerkey:keys){
phases.get(key).start();
}
}
}
其中
Map<String,Lifecycle>lifecycleBeans=getLifecycleBeans();
获取所有实现Lifecycle接口bean,执行beaninstanceofSmartLifecycle&&((SmartLifecycle)bean).isAutoStartup()判断,如果bean同时也为Phased实例,则加入到LifecycleGroup中,随后phases.get(key).start()调用start方法
接下来要做的事情就很明显:要了解消费者具体如何实现,查看SimpleMessageListenerContainer中的start是如何实现的。
至此~~整合RabbitMQ源码分析准备工作完成,下一篇中正式解读消费者的实现。
==============================
踩坑记录
近日在用springboot架构一个微服务框架,服务发现与治理、发布REST接口各种轻松惬意。但是服务当设计MQ入口时,就发现遇到无数地雷,现在整理成下文,供各路大侠围观与嘲笑。版本
当前使用的spring-boot-starter-amqp版本为2016.5发布的也许若干年后,你们版本都不会有这些问题了。:(
RabbitMQ
当需要用到MQ的时候,我的第一反映就是使用RabbitMQ,猫了一眼springboot的官方说明,上面说springboot为rabbit准备了spring-boot-starter-amqp,并且为RabbitTemplate和RabbitMQ提供了自动配置选项。暗自窃喜~~瞅瞅[官方文档]
心情愉悦的照着例子,开干~~。
踩坑
十五分钟后的代码类似这样:@Service
@RabbitListener(queues="merchant")
publicclassMQReceiver{
protectedLoggerlogger=Logger.getLogger(MQReceiver.class
.getName());
@RabbitHandler
publicvoidprocess(@PayloadUpdateMerchantrequest){
UpdateMerchantResponseresponse=newUpdateMerchantResponse();
logger.info(request.getMerchantId()+"->"+response.getReturnCode());
}
}
消费信息后,应该记录一条日志。
结果得到只有org.springframework.amqp.AmqpException:Nomethodfoundforclass[B这个异常,并且还无限循环抛出这个异常。。。
记得刚才官方文档好像说了异常什么的,转身去猫一眼,果然有:
Ifretriesarenotenabledandthelistenerthrowsanexception,bydefaultthedeliverywillberetriedindefinitely.Youcanmodifythisbehaviorintwoways;setthedefaultRequeueRejected
propertytofalse
andzerore-deliverieswillbeattempted;or,throwanAmqpRejectAndDontRequeueException
tosignalthemessageshouldberejected.Thisisthemechanismusedwhenretriesareenabledandthemaximumdeliveryattemptsarereached.
知道了为啥会无限重试了,下面来看看为啥会抛出这个异常,google搜一下,貌似还有一个倒霉鬼遇到了
进去看完问题和大神的解答,豁然开朗。
Therearetwoconversionsinthe@RabbitListenerpipeline.
ThefirstconvertsfromaSpringAMQPMessagetoaspring-messagingMessage.
ThereiscurrentlynowaytochangethefirstconverterfromSimpleMessageConverterwhichhandlesString,Serializableandpasseseverythingelseasbyte[].
Thesecondconverterconvertsthemessagepayloadtothemethodparametertype(ifnecessary).
Withmethod-level@RabbitListenersthereisatightbindingbetweenthehandlerandthemethod.
Withclass-level@RabbitListeners,themessagepayloadfromthefirstconversionisusedtoselectwhichmethodtoinvoke.Onlythen,istheargumentconversionattempted.
ThismechanismworksfinewithJavaSerializableobjectssincethepayloadhasalreadybeenconvertedbeforethemethodisselected.
However,withJSON,thefirstconversionreturnsabyte[]andhencewefindnomatching@RabbitHandler.
Weneedamechanismsuchthatthefirstconverterissettablesothatthepayloadisconvertedearlyenoughinthepipelinetoselecttheappropriatehandlermethod.
AContentTypeDelegatingMessageConverterisprobablymostappropriate.
And,asstatedinAMQP-574,weneedtoclearlydocumenttheconversionneedsfora@RabbitListener,especiallywhenusingJSONoracustomconversion.
得嘞,官方示例果然是坑,试试大神的解决方案,手动新增下转换。
@Bean
publicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory);
template.setMessageConverter(newJackson2JsonMessageConverter());
returntemplate;
}
@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){
SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(newJackson2JsonMessageConverter());
returnfactory;
}
然后在生产和消费信息的地方使用他们:
@RabbitListener(queues="merchant",containerFactory="rabbitListenerContainerFactory")
publicvoidprocess(@PayloadUpdateMerchantrequest){
UpdateMerchantResponseresponse=newUpdateMerchantResponse();
logger.info(request.getMerchantId()+"->"+response.getReturnCode());
}
再来一次,果然可以了
c.l.s.m.service.MQReceiver:00000001->null