您的位置:首页 > 编程语言 > Java开发

spring boot实战(第十二篇)整合RabbitMQ

2016-10-10 11:06 281 查看

前言

最近几篇文章将围绕消息中间件RabbitMQ展开,对于RabbitMQ基本概念这里不阐述,主要讲解RabbitMQ的基本用法、Java客户端API介绍、springBoot与RabbitMQ整合、

SpringBoot与RabbitMQ整合源码分析。

RabbitMQ安装

在使用消息中间件RabbitMQ之前就是安装RabbitMQ。

安装erlang:yuminstallerlang

下载RabbitMQ安装包:https://www.rabbitmq.com/releases/rabbitmq-server/v3.5.6/rabbitmq-server-generic-unix-3.5.6.tar.gz
解压安装包、配置环境变量RABBITMQ_HOME

参考网址:https://www.rabbitmq.com/install-generic-unix.html
windows:https://www.rabbitmq.com/install-windows.html

RabbitMQ配置

1.安装完成后需要对RabbitMQ进行配置,在etc/rabbitmq目录下创建两个文件:

rabbitmq-env.conf环境信息配置

RABBITMQ_NODE_IP_ADDRESS=127.0.0.1
RABBITMQ_NODE_PORT=5672
RABBITMQ_NODENAME=node01


rabbitmq.config 核心配置文件

[{rabbit,[{loopback_users,[]}]}].

该配置表示是的默认用户guest用户可以远程访问mq(广域网不能访问,内网可以访问)

2.启动RabbitMQ执行命令rabbitmq-server

RabbitMQ3.5.4.Copyright(C)2007-2015PivotalSoftware,Inc.
####LicensedundertheMPL.Seehttp://www.rabbitmq.com/####
##########Logs:/Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01.log
########/Users/liaokailin/software/rabbitmq_server-3.5.4/sbin/../var/log/rabbitmq/node01-sasl.log
##########
Startingbroker...completedwith0plugins.


3.RabbitMQ提供WEB-UI管理控制台,使用rabbitmq-pluginsenablerabbitmq_management命令启用,重启后可以看到

  Startingbroker...completedwith6plugins.

表明WEB-UI控制台启动成功,访问:http://localhost:15672/

登陆进入:

通过该控制台可以方便管理RabbitMQ。

创建Test用户

RabbitMQ默认使用guest用户,下面讲述如何创建一个test用户,最快捷的做法使用web管理控制台

这里使用命令创建:

rabbitmqctladd_usertesttest

rabbitmqctlset_user_tagstestadministrator

tag分为四种"management","policymaker","monitoring""administrator" 详见 http://www.rabbitmq.com/management.html

RabbitMQ其他

在实际使用RabbitMQ中还需要涉及到RabbitMQ的集群、高可用(采用镜像队列实现)以后有机会再详细阐述,有兴趣可参考https://www.rabbitmq.com/documentation.html

RabbitMQJavaClient

RabbitMQ客户端支持语言种类繁多,官方都一一举例:https://www.rabbitmq.com/getstarted.html

这里主要自己开发一个小的demo

消息消费者

操作步骤:

创建连接工厂ConnectionFactory

获取连接Connection

通过连接获取通信通道Channel

声明交换机Exchange:交换机类型分为四类:
  FanoutExchange:将消息分发到所有的绑定队列,无routingkey的概念

HeadersExchange:通过添加属性key-value匹配

DirectExchange:按照routingkey分发到指定队列

TopicExchange:多关键字匹配

声明队列Queue

将队列和交换机绑定

创建消费者

执行消息的消费

packageorg.lkl.mq.rabbitmq.test;

importjava.io.IOException;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.TimeoutException;

importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.ConsumerCancelledException;
importcom.rabbitmq.client.QueueingConsumer;
importcom.rabbitmq.client.QueueingConsumer.Delivery;
importcom.rabbitmq.client.ShutdownSignalException;

/**
*客户端01
*
*@authorliaokailin
*@version$Id:Receive01.java,v0.12015年11月01日下午3:47:58liaokailinExp$
*/
publicclassReceive01{
publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,ShutdownSignalException,
ConsumerCancelledException,InterruptedException{
ConnectionFactoryfacotry=newConnectionFactory();
facotry.setUsername("test");
facotry.setPassword("test");
facotry.setVirtualHost("test");
facotry.setHost("localhost");

Connectionconn=facotry.newConnection();//获取一个链接
//通过Channel进行通信
Channelchannel=conn.createChannel();
intprefetchCount=1;
channel.basicQos(prefetchCount);//保证公平分发

booleandurable=true;
//声明交换机
channel.exchangeDeclare(Send.EXCHANGE_NAME,"direct",durable);//按照routingKey过滤
//声明队列
StringqueueName=channel.queueDeclare("queue-01",true,true,false,null).getQueue();
//将队列和交换机绑定
StringroutingKey="lkl-0";
//队列可以多次绑定,绑定不同的交换机或者路由key
channel.queueBind(queueName,Send.EXCHANGE_NAME,routingKey);

//创建消费者
QueueingConsumerconsumer=newQueueingConsumer(channel);

//将消费者和队列关联
channel.basicConsume(queueName,false,consumer);//设置为false表面手动确认消息消费

//获取消息

System.out.println("Waitmessage....");
while(true){
Deliverydelivery=consumer.nextDelivery();
Stringmsg=newString(delivery.getBody());
Stringkey=delivery.getEnvelope().getRoutingKey();

System.out.println("Received'"+key+"':'"+msg+"'");
System.out.println("Handlemessage");
TimeUnit.SECONDS.sleep(3);//mockhandlemessage
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);//确定该消息已成功消费
}

}
}


消息生产者

操作步骤:

创建连接工厂ConnectionFactory

获取连接Connection

通过连接获取通信通道Channel

发送消息

packageorg.lkl.mq.rabbitmq.test;

importjava.io.IOException;
importjava.util.concurrent.TimeoutException;

importcom.rabbitmq.client.Channel;
importcom.rabbitmq.client.ConfirmListener;
importcom.rabbitmq.client.Connection;
importcom.rabbitmq.client.ConnectionFactory;
importcom.rabbitmq.client.MessageProperties;

/**
*消息publish
*
*@authorliaokailin
*@version$Id:Send.java,v0.12015年10月22日下午3:48:09liaokailinExp$
*/
publicclassSend{
publicfinalstaticStringEXCHANGE_NAME="test-exchange";

publicstaticvoidmain(String[]args)throwsIOException,TimeoutException,InterruptedException{
/**
*配置amqpbroker连接信息
*/
ConnectionFactoryfacotry=newConnectionFactory();
facotry.setUsername("test");
facotry.setPassword("test");
facotry.setVirtualHost("test");
facotry.setHost("localhost");

Connectionconn=facotry.newConnection();//获取一个链接
//通过Channel进行通信
Channelchannel=conn.createChannel();

//channel.exchangeDeclare(Send.EXCHANGE_NAME,"direct",true);//如果消费者已创建,这里可不声明
channel.confirmSelect();//Enablespublisheracknowledgementsonthischannel
channel.addConfirmListener(newConfirmListener(){

@Override
publicvoidhandleNack(longdeliveryTag,booleanmultiple)throwsIOException{
System.out.println("[handleNack]:"+deliveryTag+","+multiple);

}

@Override
publicvoidhandleAck(longdeliveryTag,booleanmultiple)throwsIOException{
System.out.println("[handleAck]:"+deliveryTag+","+multiple);
}
});

Stringmessage="lkl-";
//消息持久化MessageProperties.PERSISTENT_TEXT_PLAIN
//发送多条信息,每条消息对应routekey都不一致
for(inti=0;i<10;i++){
channel.basicPublish(EXCHANGE_NAME,message+(i%2),MessageProperties.PERSISTENT_TEXT_PLAIN,
(message+i).getBytes());
System.out.println("[send]msg"+(message+i)+"ofroutingKeyis"+(message+(i%2)));
}

}
}


在设置消息被消费的回调前需显示调用

channel.confirmSelect()


否则回调函数无法调用

先执行消费者,消费者会轮询是否有消息的到来,在web控制也可以观察哦~~,再启动生产者发送消息。

================================

前言

本篇主要讲述springBoot与RabbitMQ的整合,内容非常简单,纯API的调用操作。操作之间需要加入依赖Jar

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> /dependency>

消息生产者


不论是创建消息消费者或生产者都需要ConnectionFactory


ConnectionFactory配置


创建AmqpConfig文件AmqpConfig.java(后期的配置都在该文件中)


@Configuration publicclassAmqpConfig{ publicstaticfinalStringEXCHANGE="spring-boot-exchange"; publicstaticfinalStringROUTINGKEY="spring-boot-routingKey"; @Bean publicConnectionFactoryconnectionFactory(){ CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(); connectionFactory.setAddresses("127.0.0.1:5672"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true);//必须要设置 returnconnectionFactory; } }

这里需要显示调用

connectionFactory.setPublisherConfirms(true);


才能进行消息的回调。


RabbitTemplate

通过使用RabbitTemplate来对开发者提供API操作


@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
publicRabbitTemplaterabbitTemplate(){
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
returntemplate;
}


这里设置为原型,具体的原因在后面会讲到


在发送消息时通过调用RabbitTemplate中的如下方法

publicvoidconvertAndSend(Stringexchange,StringroutingKey,finalObjectobject,CorrelationDatacorrelationData)


exchange:交换机名称

routingKey:路由关键字

object:发送的消息内容

correlationData:消息ID


因此生产者代码详单简洁

Send.java

@Component
publicclassSend{

privateRabbitTemplaterabbitTemplate;

/**
*构造方法注入
*/
@Autowired
publicSend(RabbitTemplaterabbitTemplate){
this.rabbitTemplate=rabbitTemplate;
}

publicvoidsendMsg(Stringcontent){
CorrelationDatacorrelationId=newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE,AmqpConfig.ROUTINGKEY,content,correlationId);
}

}


如果需要在生产者需要消息发送后的回调,需要对rabbitTemplate设置ConfirmCallback对象,由于不同的生产者需要对应不同的ConfirmCallback,如果rabbitTemplate设置为单例bean,则所有的rabbitTemplate

实际的ConfirmCallback为最后一次申明的ConfirmCallback。

下面给出完整的生产者代码:

packagecom.lkl.springboot.amqp;

importjava.util.UUID;

importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.amqp.rabbit.support.CorrelationData;
importorg.springframework.beans.factory.annotation.Autowired;
importorg.springframework.stereotype.Component;

/**
*消息生产者
*
*@authorliaokailin
*@version$Id:Send.java,v0.12015年11月01日下午4:22:25liaokailinExp$
*/
@Component
publicclassSendimplementsRabbitTemplate.ConfirmCallback{

privateRabbitTemplaterabbitTemplate;

/**
*构造方法注入
*/
@Autowired
publicSend(RabbitTemplaterabbitTemplate){
this.rabbitTemplate=rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);//rabbitTemplate如果为单例的话,那回调就是最后设置的内容
}

publicvoidsendMsg(Stringcontent){
CorrelationDatacorrelationId=newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(AmqpConfig.EXCHANGE,AmqpConfig.ROUTINGKEY,content,correlationId);
}

/**
*回调
*/
@Override
publicvoidconfirm(CorrelationDatacorrelationData,booleanack,Stringcause){
System.out.println("回调id:"+correlationData);
if(ack){
System.out.println("消息成功消费");
}else{
System.out.println("消息消费失败:"+cause);
}
}

}


消息消费者

消费者负责申明交换机(生产者也可以申明)、队列、两者的绑定操作。

交换机

/**
*针对消费者配置
FanoutExchange:将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange:通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
publicDirectExchangedefaultExchange(){
returnnewDirectExchange(EXCHANGE);
}


在SpringBoot中交换机继承AbstractExchange类


队列

@Bean
publicQueuequeue(){
returnnewQueue("spring-boot-queue",true);//队列持久

}


绑定

@Bean
publicBindingbinding(){
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}


完成以上工作后,在springboot中通过消息监听容器实现消息的监听,在消息到来时执行回调操作。



消息消费

@Bean
publicSimpleMessageListenerContainermessageContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener(){

@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
byte[]body=message.getBody();
System.out.println("receivemsg:"+newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费
}
});
returncontainer;
}


下面给出完整的配置文件:

packagecom.lkl.springboot.amqp;

importorg.springframework.amqp.core.AcknowledgeMode;
importorg.springframework.amqp.core.Binding;
importorg.springframework.amqp.core.BindingBuilder;
importorg.springframework.amqp.core.DirectExchange;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.Queue;
importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;
importorg.springframework.amqp.rabbit.connection.ConnectionFactory;
importorg.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
importorg.springframework.beans.factory.config.ConfigurableBeanFactory;
importorg.springframework.context.annotation.Bean;
importorg.springframework.context.annotation.Configuration;
importorg.springframework.context.annotation.Scope;

importcom.rabbitmq.client.Channel;

/**
*QmqpRabbitmq
*
*http://docs.spring.io/spring-amqp/docs/1.4.5.RELEASE/reference/html/*
*@authorlkl
*@version$Id:AmqpConfig.java,v0.12015年11月01日下午2:05:37lklExp$
*/

@Configuration
publicclassAmqpConfig{

publicstaticfinalStringEXCHANGE="spring-boot-exchange";
publicstaticfinalStringROUTINGKEY="spring-boot-routingKey";

@Bean
publicConnectionFactoryconnectionFactory(){
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);//必须要设置
returnconnectionFactory;
}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
//必须是prototype类型
publicRabbitTemplaterabbitTemplate(){
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
returntemplate;
}

/**
*针对消费者配置
*1.设置交换机类型
*2.将队列绑定到交换机
*
*
FanoutExchange:将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange:通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
publicDirectExchangedefaultExchange(){
returnnewDirectExchange(EXCHANGE);
}

@Bean
publicQueuequeue(){
returnnewQueue("spring-boot-queue",true);//队列持久

}

@Bean
publicBindingbinding(){
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}

@Bean
publicSimpleMessageListenerContainermessageContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener(){

@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
byte[]body=message.getBody();
System.out.println("receivemsg:"+newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费
}
});
returncontainer;
}

}


以上完成SpringBoot与RabbitMQ的整合





自动配置


在SpringBoot中实现了RabbitMQ的自动配置,在配置文件中添加如下配置信息

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=test
spring.rabbitmq.password=test
spring.rabbitmq.virtualHost=test


后会自动创建ConnectionFactory以及RabbitTemplate对应Bean,为什么上面我们还需要手动什么呢?

自动创建的ConnectionFactory无法完成事件的回调,即没有设置下面的代码

connectionFactory.setPublisherConfirms(true);




具体分析见后续文章的源码解读.


=========================================

前言

本篇开始讲述SpringBoot如何整合RabbitMQ(实际上Spring就整合了RabbitMQ)。

RabbitAdmin

在上篇中遗留AmqpAdmin没有讲解,现在来看下该部分代码

publicAmqpAdminamqpAdmin(CachingConnectionFactoryconnectionFactory){
returnnewRabbitAdmin(connectionFactory);
}


创建RabbitAdmin实例,调用构造方法

publicRabbitAdmin(ConnectionFactoryconnectionFactory){
this.connectionFactory=connectionFactory;
Assert.notNull(connectionFactory,"ConnectionFactorymustnotbenull");
this.rabbitTemplate=newRabbitTemplate(connectionFactory);
}


创建连接工厂、rabbitTemplate,其中ConnectionFactory采用上一篇中自定义bean

publicConnectionFactoryconnectionFactory(){
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory();
connectionFactory.setAddresses("127.0.0.1:5672");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPublisherConfirms(true);//必须要设置
returnconnectionFactory;
}


为CachingConnectionFactory实例,其缓存模式为通道缓存

privatevolatileCacheModecacheMode=CacheMode.CHANNEL;


接下来看下RabbitAdmin类定义:

publicclassRabbitAdminimplementsAmqpAdmin,ApplicationContextAware,InitializingBean{
...
}


实现接口AmqpAdmin(定义若干RabbitMQ操作父接口),这里需要强调的是InitializingBean,实现该接口则会调用afterPropertiesSet方法

publicvoidafterPropertiesSet(){

synchronized(this.lifecycleMonitor){

if(this.running||!this.autoStartup){
return;
}

if(this.connectionFactoryinstanceofCachingConnectionFactory&&
((CachingConnectionFactory)this.connectionFactory).getCacheMode()==CacheMode.CONNECTION){
logger.warn("RabbitAdminautodeclarationisnotsupportedwithCacheMode.CONNECTION");
return;
}

this.connectionFactory.addConnectionListener(newConnectionListener(){

//Preventstackoverflow...
privatefinalAtomicBooleaninitializing=newAtomicBoolean(false);

@Override
publicvoidonCreate(Connectionconnection){
if(!initializing.compareAndSet(false,true)){
//Ifwearealreadyinitializing,wedon'tneedtodoitagain...
return;
}
try{

initialize();
}
finally{
initializing.compareAndSet(true,false);
}
}

@Override
publicvoidonClose(Connectionconnection){
}

});

this.running=true;

}
}


synchronized(this.lifecycleMonitor)加锁保证同一时间只有一个线程访问该代码,随后调用this.connectionFactory.addConnectionListener添加连接监听,各连接工厂关系:

实际调用为CachingConnectionFactory

publicvoidaddConnectionListener(ConnectionListenerlistener){
super.addConnectionListener(listener);
//Iftheconnectionisalreadyaliveweassumethatthenewlistenerwantstobenotified
if(this.connection!=null){
listener.onCreate(this.connection);
}
}


此时connection为null,无法执行到listener.onCreate(this.connection);
往CompositeConnectionListenerconnectionListener中添加监听信息,最终保证在集合中

privateList<ConnectionListener>delegates=newCopyOnWriteArrayList<ConnectionListener>();


这里添加的监听代码执行,在后面调用时再来讲解。

至此~~RabbitAdmin创建完成。

Exchange

接下来继续来看AmqpConfig.java中的代码

@Bean
publicDirectExchangedefaultExchange(){
returnnewDirectExchange(EXCHANGE);
}


以上代码创建一个交换机,交换机类型为direct

在申明交换机时需要指定交换机名称,默认创建可持久交换机

Queue

publicQueuequeue(){
returnnewQueue("spring-boot-queue",true);//队列持久
}


默认创建可持久队列

Binding

@Bean
publicBindingbinding(){
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(AmqpConfig.ROUTINGKEY);
}


BindingBuilder.bind(queue())实现为:

publicstaticDestinationConfigurerbind(Queuequeue){
returnnewDestinationConfigurer(queue.getName(),DestinationType.QUEUE);
}


DestinationConfigurer通过name、type区分不同配置信息,其to()方法为重载方法,传递参数为四种交换机,分别返回XxxExchangeRoutingKeyConfigurer,其中with方法返回Bingding实例,因此在Binding信息中存储了
队列、交换机、路由key等相关信息

publicclassBindingextendsAbstractDeclarable{

publicstaticenumDestinationType{
QUEUE,EXCHANGE;
}

privatefinalStringdestination;

privatefinalStringexchange;

privatefinalStringroutingKey;

privatefinalMap<String,Object>arguments;

privatefinalDestinationTypedestinationType;
...
}


以上信息理解都非常简单,下面来看比较复杂点的SimpleMessageListenerContainer

SimpleMessageListenerContainer

@Bean
publicSimpleMessageListenerContainermessageContainer(){
SimpleMessageListenerContainercontainer=newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener(){

@Override
publicvoidonMessage(Messagemessage,Channelchannel)throwsException{
byte[]body=message.getBody();
System.out.println("receivemsg:"+newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费
}
});
returncontainer;
}





查看其实现的接口,注意SmartLifecycle

接下来设置队列信息,在AbstractMessageListenerContainer

privatevolatileList<String>queueNames=newCopyOnWriteArrayList<String>();

添加队列信息
AbstractMessageListenerContainer#exposeListenerChannel设置为true

container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);


设置并发消费者数量,默认情况为1

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认


设置消费者成功消费消息后确认模式,分为两种

自动模式,默认模式,在RabbitMQBroker消息发送到消费者后自动删除

手动模式,消费者客户端显示编码确认消息消费完成,Broker给生产者发送回调,消息删除

接下来设置消费者端消息监听,为privatevolatileObjectmessageListener赋值

到这里消息监听容器也创建完成了,但令人纳闷的时,消费者如何去消费消息呢?从这里完全看不出来。那么接下来看下SmartLifecycle接口

SmartLifecycle

熟悉Spring都应该知道该接口,其定义为:

publicinterfaceSmartLifecycleextendsLifecycle,Phased{

booleanisAutoStartup();
voidstop(Runnablecallback);

}


其中的isAutoStartup设置为true时,会自动调用Lifecycle接口中的start方法,既然我们为源码分析,也简单看下这个聪明的声明周期接口是如何实现它的聪明方法的

springboot实战(第十篇)SpringbootBean加载源码分析中讲到执行Bean加载时,调用AbstractApplicationContext#refresh(),其中存在一个方法调用finishRefresh()

[html]viewplaincopy

protectedvoidfinishRefresh(){

//Initializelifecycleprocessorforthiscontext.

initLifecycleProcessor();

//Propagaterefreshtolifecycleprocessorfirst.

getLifecycleProcessor().onRefresh();

//Publishthefinalevent.

publishEvent(newContextRefreshedEvent(this));

//ParticipateinLiveBeansViewMBean,ifactive.

LiveBeansView.registerApplicationContext(this);

}

其中initLifecycleProcessor初始化生命周期处理器,

[html]viewplaincopy

protectedvoidinitLifecycleProcessor(){

ConfigurableListableBeanFactorybeanFactory=getBeanFactory();

if(beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)){

this.lifecycleProcessor=

beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME,LifecycleProcessor.class);

if(logger.isDebugEnabled()){

logger.debug("UsingLifecycleProcessor["+this.lifecycleProcessor+"]");

}

}

else{

DefaultLifecycleProcessordefaultProcessor=newDefaultLifecycleProcessor();

defaultProcessor.setBeanFactory(beanFactory);

this.lifecycleProcessor=defaultProcessor;

beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME,this.lifecycleProcessor);

if(logger.isDebugEnabled()){

logger.debug("UnabletolocateLifecycleProcessorwithname'"+

LIFECYCLE_PROCESSOR_BEAN_NAME+

"':usingdefault["+this.lifecycleProcessor+"]");

}

}

}

注册DefaultLifecycleProcessor对应bean

getLifecycleProcessor().onRefresh()调用DefaultLifecycleProcessor中方法onRefresh,调用startBeans(true)

[html]viewplaincopy

privatevoidstartBeans(booleanautoStartupOnly){

Map<String,Lifecycle>lifecycleBeans=getLifecycleBeans();

Map<Integer,LifecycleGroup>phases=newHashMap<Integer,LifecycleGroup>();

for(Map.Entry<String,?extendsLifecycle>entry:lifecycleBeans.entrySet()){

Lifecyclebean=entry.getValue();

if(!autoStartupOnly||(beaninstanceofSmartLifecycle&&((SmartLifecycle)bean).isAutoStartup())){

intphase=getPhase(bean);

LifecycleGroupgroup=phases.get(phase);

if(group==null){

group=newLifecycleGroup(phase,this.timeoutPerShutdownPhase,lifecycleBeans,autoStartupOnly);

phases.put(phase,group);

}

group.add(entry.getKey(),bean);

}

}

if(phases.size()>0){

List<Integer>keys=newArrayList<Integer>(phases.keySet());

Collections.sort(keys);

for(Integerkey:keys){

phases.get(key).start();

}

}

}

其中
Map<String,Lifecycle>lifecycleBeans=getLifecycleBeans();

获取所有实现Lifecycle接口bean,执行beaninstanceofSmartLifecycle&&((SmartLifecycle)bean).isAutoStartup()判断,如果bean同时也为Phased实例,则加入到LifecycleGroup中,随后phases.get(key).start()调用start方法

接下来要做的事情就很明显:要了解消费者具体如何实现,查看SimpleMessageListenerContainer中的start是如何实现的。

至此~~整合RabbitMQ源码分析准备工作完成,下一篇中正式解读消费者的实现。

==============================

踩坑记录

近日在用springboot架构一个微服务框架,服务发现与治理、发布REST接口各种轻松惬意。但是服务当设计MQ入口时,就发现遇到无数地雷,现在整理成下文,供各路大侠围观与嘲笑。

版本

当前使用的spring-boot-starter-amqp版本为2016.5发布的1.3.5.RELEASE

也许若干年后,你们版本都不会有这些问题了。:(

RabbitMQ

当需要用到MQ的时候,我的第一反映就是使用RabbitMQ,猫了一眼springboot的官方说明,上面说springboot为rabbit准备了spring-boot-starter-amqp,并且为RabbitTemplate和RabbitMQ提供了自动配置选项。暗自窃喜~~

瞅瞅[官方文档]http://docs.spring.io/spring-boot/docs/current-SNAPSHOT/reference/htmlsingle/#boot-features-rabbitmq和例子,SOEASY,再看一眼GITHUB上的官方例了,也有例子。

心情愉悦的照着例子,开干~~。

踩坑

十五分钟后的代码类似这样:

@Service
@RabbitListener(queues="merchant")
publicclassMQReceiver{
protectedLoggerlogger=Logger.getLogger(MQReceiver.class
.getName());

@RabbitHandler
publicvoidprocess(@PayloadUpdateMerchantrequest){
UpdateMerchantResponseresponse=newUpdateMerchantResponse();
logger.info(request.getMerchantId()+"->"+response.getReturnCode());
}
}

消费信息后,应该记录一条日志。
结果得到只有org.springframework.amqp.AmqpException:Nomethodfoundforclass[B这个异常,并且还无限循环抛出这个异常。。。

记得刚才官方文档好像说了异常什么的,转身去猫一眼,果然有:


Ifretriesarenotenabledandthelistenerthrowsanexception,bydefaultthedeliverywillberetriedindefinitely.Youcanmodifythisbehaviorintwoways;setthedefaultRequeueRejected
propertytofalse
andzerore-deliverieswillbeattempted;or,throwanAmqpRejectAndDontRequeueException
tosignalthemessageshouldberejected.Thisisthemechanismusedwhenretriesareenabledandthemaximumdeliveryattemptsarereached.


知道了为啥会无限重试了,下面来看看为啥会抛出这个异常,google搜一下,貌似还有一个倒霉鬼遇到了这个问题。

进去看完问题和大神的解答,豁然开朗。


Therearetwoconversionsinthe@RabbitListenerpipeline.
ThefirstconvertsfromaSpringAMQPMessagetoaspring-messagingMessage.
ThereiscurrentlynowaytochangethefirstconverterfromSimpleMessageConverterwhichhandlesString,Serializableandpasseseverythingelseasbyte[].
Thesecondconverterconvertsthemessagepayloadtothemethodparametertype(ifnecessary).
Withmethod-level@RabbitListenersthereisatightbindingbetweenthehandlerandthemethod.
Withclass-level@RabbitListeners,themessagepayloadfromthefirstconversionisusedtoselectwhichmethodtoinvoke.Onlythen,istheargumentconversionattempted.
ThismechanismworksfinewithJavaSerializableobjectssincethepayloadhasalreadybeenconvertedbeforethemethodisselected.
However,withJSON,thefirstconversionreturnsabyte[]andhencewefindnomatching@RabbitHandler.
Weneedamechanismsuchthatthefirstconverterissettablesothatthepayloadisconvertedearlyenoughinthepipelinetoselecttheappropriatehandlermethod.
AContentTypeDelegatingMessageConverterisprobablymostappropriate.
And,asstatedinAMQP-574,weneedtoclearlydocumenttheconversionneedsfora@RabbitListener,especiallywhenusingJSONoracustomconversion.


得嘞,官方示例果然是坑,试试大神的解决方案,手动新增下转换。

@Bean
publicRabbitTemplaterabbitTemplate(ConnectionFactoryconnectionFactory){
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory);
template.setMessageConverter(newJackson2JsonMessageConverter());
returntemplate;
}

@Bean
publicSimpleRabbitListenerContainerFactoryrabbitListenerContainerFactory(ConnectionFactoryconnectionFactory){
SimpleRabbitListenerContainerFactoryfactory=newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(newJackson2JsonMessageConverter());
returnfactory;
}

然后在生产和消费信息的地方使用他们:

@RabbitListener(queues="merchant",containerFactory="rabbitListenerContainerFactory")
publicvoidprocess(@PayloadUpdateMerchantrequest){
UpdateMerchantResponseresponse=newUpdateMerchantResponse();
logger.info(request.getMerchantId()+"->"+response.getReturnCode());
}

再来一次,果然可以了


c.l.s.m.service.MQReceiver:00000001->null


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐
章节导航