RabbitMQ的Java应用(3) -- 使用spring-boot-starter-amqp开发生产者应用
2017-01-12 11:09
1266 查看
上一篇我们介绍了如何使用Spring AMQP和RabbitMQ结合,开发消费者应用程序,使用的是Xml配置的Spring框架。
本篇我们仍然使用Spring AMQP开发生产者应用,不过我们使用零 XML配置的Spring Boot环境进行开发,使用的库是spring-boot-starter-amqp库。
我们使用Spring Boot 1.4.3 RELEASE版本,pom.xml文件内容如下:
需要注意的是,当使用spring-boot-configuration-processor库时会默认打开RabbitAutoConfiguration,它会自动创建一个 CachingConnectionFactory对象,一个RabbitTemplate对象,一个AmqpAdmin对象以及一个RabbitMessageTemplate对象。
如果我们想定制自己的ConnectionFactory,RabbitTemplate等对象,建议在Configuration中将其禁用:
在src/main/resources目录下创建application.properties文件,添加rabbitmq相关设置,以spring.rabbitmq为前缀,这样它会自动映射到RabbitProperties配置类
添加RabbitConfig类,用于创建自定义ConnectionFactory,RabbitAdmin,RabbitTemplate等。 这个类头部如下所示
这里用到了org.springframework.boot.autoconfigure.amqp.RabbitProperties类,记录application.properties中的RabbitMQ连接设置。
再创建ConnectionFactory:
我们的生产者-消费者场景是系列1中提到的RPC方式计算阶乘,使用Rest接口调用生产者应用,生产者应用发送消息给消费者应用,计算出阶乘结果返回给生产者。
为了实现这个场景,我们先在生产者程序中定义用于发送请求消息和接收返回消息的服务接口SendMessageService和它的实现类SendMessageServiceImpl类。
1.8引入)在后台线程池中异步执行,我们自定义了MQSenderSupplier类,实现了发送请求和接收返回消息的操作。
Context环境中直接获取,而没有使用自动注入方式。ApplicationContextUtil是我们定义的一个Context辅助类(类似代码可以从网上搜索到,这里就不列出了),使用RabbitTemplate.sendAndReceive方法发送请求消息和接收返回消息,没有另行定义Listener类。
再添加发送消息的Rest接口类SendMessageController
Listener,绑定springMessageQueue队列,替换之前定义的Message Listener。
这里计算阶乘的方法factorial和系列1中相同,代码不再列出。
修改applicationContext.xml文件,替换Message Listener Container中的Listener对象为CaculateListener对象。
修改完成后启动消费者和生产者程序,使用生产者的rest接口计算20的阶乘值(生产者程序使用8180端口启动),我们可以看到消费者应用计算出了20的阶乘结果,并返回给生产者应用。
假如消费者应用计算时间较长,生产者是否还能获取正确的结算结果呢?
我们修改消费者应用,在计算后睡眠10s,再返回计算结果。
重新启动消费者和生产者程序,调用计算阶乘的rest接口,我们会看到生产者端抛出了空引用异常,没有获取到返回消息。
这是因为RabbitTemplate对象默认的接收Reply消息的超时时间只有5000ms(5s),超过这个时间sendAndReceive方法会直接返回空对象。 查看RabbitTemplate的代码,我们可以看到是内置的PendingReply对象从它自身的阻塞队列queue中获取返回消息。 RabbitTemplate对象是在onMessage方法中获取到返回消息时才把消息放入这个阻塞队列的。
为了解决这个问题,我们可以把RabbitTemplate的replyTimeout时间设置的稍微长一点,例如1分钟(60000ms)。
使用RabbitTemplate.sendAndReceive方法发送消息给消费者,获得返回消息,这是一个同步的过程,发送消息和等待接收消息是 在同一个线程中执行,如果等待返回消息的时间过长,当前方法的后续操作将被卡住,无法执行。
为了验证这一点,我们在生产者程序中添加日志输出代码。
我们再调用阶乘接口,生产者端控制台的输出日志如下图
从日志可以看出“The following operation”日志信息的输出是在获得返回消息之后执行的,假如当前方法的后续操作不依赖于返回消息的内容,但却要等待返回消息的获取完成后才能执行,这样的同步操作是不合适的,我们需要分离消息发送和返回消息的接收,使这两个操作在不同的线程中异步执行。
要做到这一点有两种方式。
第一种方式是使用RabbitTemplate.send方法发送消息,定义一个Message Listener侦听返回队列,获取并处理返回消息,这种方式在前面的代码中已经使用过,不再赘述。
第二种方式是使用spring-amqp 1.6引入的AsyncRabbitTemplate类,实现消息的异步发送,下面我们详细介绍这种方式的实现。
为了区分同步消息和异步消息的发送,我们新定义两个消息队列,一个名为“springAsyncMessageQueue”,用于接收AsyncRabbitTemplate发送的异步消息,一个名为“springAsyncReplyMessageQueue”,用于接收异步消息处理后的返回消息,同时把这两个消息队列分别绑定到 “springMessageExchange”和“springReplyMessageExchange”上,routingKey分别是“springAsyncMessage”和“springAsyncReplyMessage”。
消费者端的applicationContext.xml修改如下:
再添加一个处理异步消息的MessageListener类AsyncMessageListener,这个Listener类接收异步消息后,休眠10s后发送返回消息。
在Message Listener Container中追加这个Listener
在生产者端,我们在RabitConfig中添加一个AsyncRabbitTemplate的bean定义
这里设置的ReceiveTimeout属性类似于RabbitTemplate的ReplyTimeout属性,是获取返回消息的超时时间。 AsyncRabbitTemplate构造函数中的四个参数分别是连接工厂,发送消息的Exchange名,发送异步消息的RoutingKey(“springAsyncMessage”,用于Exchange将异步消息转发到springAsyncMessageQueue队列),返回消息队列名(“springAsyncMessageQueue”,用于接收异步消息对应的返回消息)
在SendMessageService和SendMessageServiceImpl中添加一个发送异步消息的接口方法sendAsyncMessage
使用了CompletableFuture的runAsync方法,没有返回值。
负责发送消息的后台任务类AsyncMQSenderThread主要代码如下:
在run方法中,AsyncRabbitTemplate.sendAndReceive方法返回的结果是一个RabbitMessageFuture对象,它实现了Google Guava提供的ListenableFuture接口,可以通过添加CallBack对返回消息进行处理。我们这里是添加了一个匿名的ListenableFutureCallback对象。
在SendMessageController中添加一个Rest接口sendAsyncMessage
从日志可以看出,发送消息和处理返回消息是由独立的两个线程异步处理的,在发送完消息后,当前线程并没有阻塞,等待返回消息,而是直接执行后续的代码,对返回消息的处理是由RabbitMessageFuture对象绑定的Callback进行处理的。
AsyncRabbitTemplate方式和Message Listener方式都可以实现对返回消息的异步处理,它们的区别在于对返回消息中包含的CorrelationId与发送消息中包含的CorrelationId的比较,前一种方式是交给AsyncRabbitTemplate对象的内置方法处理, 而Message Listener方式需要开发者自行处理。
使用RabbitTemplate和AsyncRabbitTemplate都是发送单条消息,每条消息发送完进行确认,但如果我们想发送20条消息,对这些消息进行一次性的确认,应该如何实现呢?我们可以使用spring-amqp 1.4.1引入的BatchingRabbitTemplate进行消息发送。
BatchingRabbitTemplate的构造函数形式如下:
使用BatchingRabbitTemplate发送消息的流程如下图所示
1)调用端调用BatchingRabbitTemplate的send方法发送消息。
2)send方法调用BatchingRabbitTemplate对象设置的BatchingStrategy属性的addToBatch方法。
3)以SimpleBatchingStrategy.addToBatch方法为例,如果累计的未发送消息数达到消息数上限,或者消息总长度大于缓存区大小,
会将未发送消息队列里的消息拼装成一条完整消息,封装在一个MessageBatch中返回给BatchingRabbitTemplate对象。
4)BatchingRabbitTemplate对象使用父类(RabbitTemplate)的send方法发送MessageBatch中包含的消息。
我们修改生产者程序,添加BatchingRabbitTemplate对象
我们这里的BatchingStrategy设置的消息条数上限为20条,消息总大小上限为1000。使用的TaskScheduler是java自带的 ConcurrentTaskScheduler对象,使用单线程池执行后续发送批量消息的计划任务,实际使用时可以设置为自定义的TaskScheduler.
为了测试方便,我们再添加两个消息队列,一个消息队列名为springBatchMessageQueue,用于接收发送的批量消息,一个消息队列名为springBatchReplyMessageQueue。在生产者和消费者端分别添加定义队列和消息绑定的代码,由于和异步消息类似,这里不再列出。 在消费者端定义一个BatchMessageListener,用于从springBatchMessageQueue接收和处理消息,它的主要代码如下:
在生产者端,我们在SendMessageService接口中定义发送批量消息的接口sendBatchMessages方法。
BatchMQSenderThread.run方法循环读取消息List中的消息,调用batchingRabbitTemplate对象进行消息发送,每条消息我们都生成一个uuid作为消息的correlationId,我们接下来会看到,在最后发送的批量消息中,只有第一条消息的correlationId被采用。
最后我们定义一个发送批量消息的rest接口,批量消息使用逗号进行分割。
从生产者端日志可以看出,SendMessageServiceImpl调用BatchRabbitTemplate.send方法,发送了30条消息,但是由于消息条数超过了我们预设的20条限制,实际只批量发送了20条消息(从消费者端的日志截图可以看到),消费者端仍然是把发送的批量消息作为20条独立的消息各自接收的。
由于我们定义的SimpleBatchingStrategy策略对象的超时时间是60000ms(1分钟),1分钟后,剩余的10条消息被BatchRabbitTemplate对象批量发出,被消费者接收,消费者日志显示了这一点。
消费者日志还显示,BatchRabbitTemplate对象发送的30条消息,它们都具有相同的correlationId,是第一条消息的correlationId,而不是创建这些消息时生成的uuid,这是由SimpleBatchingStrategy类的批量策略决定的。
下面我们修改一下生产者的SimpleBatchingStrategy属性设定,设定消息缓存上限为200,超时时间为120000ms(2分钟),再发送30条消息。 消费者端的输出日志如下:
我们可以看出,由于消息缓存上限的限制,BatchRabbitTemplate只批量发送了16条消息,后14条消息是在2分钟后延时发送。
在实际使用时,我们也可以根据业务需要,自定义BatchStrategy策略。
在系列2里我们提到了使用spring-rabbit的rabbit前缀简化applicationContext.xml配置文件中的RabbitMQ beans配置,spring-amqp还提供了RabbitMQ常用对象对应的Annoation,这些注解如下图所示:
下面我们使用这些Annotation简化我们的生产者应用代码。
我们直接修改ReplyBatchMessageListener,在它的onMessage方法头部添加以下注解:
这个注解中使用到了@RabbitListener,将onMessage方法定义为一个Message Listener, @QueueBinding用于定义Binding对象,@Queue定义Queue对象,@Exchange定义Exchange对象,key属性是Bind对象的routingKey. spring-amqp内置的RabbitAdmin对象根据@Queue和@Exchange的属性,在RabbitMQ服务器上创建消息队列和Exchange, 根据@QueueBinding的属性,在RabbitMQ服务器上建立对应的Bind关系。
admin属性是创建Message Listener Container使用的RabbitAdmin对象,我们这里引用RabbitConfig中定义的“rabbitAdmin”对象。
@RabbitListener定义的Message Listener,所在的Message Listener Container是spring-amqp根据默认设置创建的Message Listener Container对象。如果我们想根据需要使用定制的Message Listener Container,我们需要在RabbitConfig类中定制SimpleMessageListenerContainerFactory对象:
我们可以看出这个方法设置的factory对象属性,和我们之前定义的Message Listener Container bean对象的属性有相似之处, 不过factory对象的属性,并没有完全覆盖Message Listener Container对象的属性定义,例如messagePropertiesConverter属性的设置,在SimpleRabbitListenerContainerFactory类中就找不到,因此,如果想使用Message Listener Container比较复杂的属性,建议还是使用@Bean定义。
如果@RabbitListener关联的消息队列,Exchange和Bind关系在RabbitMQ服务器中已经创建,我们可以在@RabbitListener中不使用@QueueBinding,直接使用queues属性,此时onMessage方法头部的注解如下:
为了使Rabbit注解生效,还需要添加@EnableRabbit注解,我们在RabbitConfig类头部添加这个注解
启动生产者和消费者应用,使用批量接口发送20条消息,从生产者日志我们可以看出我们定义在ReplyBatchMessageListener.onMessage方法上的添加的@RabbitListener将onMessage方法变成了一个Message Listener,接收了springBatchReplyMessageQueue队列的返回消息。
如果有很多方法想定义为一个Message Listener,而他们使用的@RabbitListener注解内容又相同,我们可以自定义RabbitListener注解,例如,我们可以将上面的@RabbitListener注解定义为ReplyRabbitListener注解接口
Listener,在这个类头部我们还添加了@Component注解,使这个对象在SpringBoot启动时被初始化。
我们再启动生产者应用,发送批量消息,从生产者日志可以看出,@ReplyRabbitListener注解使处理返回消息的方法变成了ReplyConsumerDelegate.processReplyMessage
前面的例子中发送的消息消息本体都是String类型,如果我们想发送一个对象类型的消息时,应该怎么处理呢?下面的实例将演示如何发送对象消息。
我们定义两个实体类Company和Employee
我们创建一个名为“springRabbitHandlerQueue”的消息队列,绑定到“springMessageExchange”Exchange对象上。
我们设定分别发送一条Company类型的消息,一条Employee类型的消息到springRabbitHandlerQueue消息队列里,消费者在同一个Message Listener中使用不同方法处理不同类型的消息。
我们在生产者程序中添加HandlerMQSenderThread类,用于发送对象类型的消息,它的主要代码如下:
run方法里调用toByteArray方法,把对象消息转换成byte数组。在MessageProperties中,我们设定content-type为 “application/x-java-serialized-object”,表示发送的消息体类型为Java对象,便于消费者端进行反序列化。
在SendMessageService中我们定义一个接口方法sendObjectMessage,用于发送对象消息。
在SendMessageController类中添加两个Rest接口方法sendCompanyMessage和sendEmployeeMessage,这两个方法从客户端接收Json形式的报文,将其转换为Company对象和Employee对象。
消费者端,我们使用spring-boot-starter-amqp创建一个消费者应用,启动类和Config类的主要代码如下:
创建一个POJO类RabbitHandlerConsumer,通过添加@RabbitListener注解和@RabbitHandler注解,使它成为springRabbitHandlerQueue 消息的Message Listener对象,并且根据不同类型的消息分别处理。
RabbitHandler注解是spring-amqp 1.5开始引入的,它用于将POJO类方法转换为消息处理方法,RabbitListener注解则提升到类级别, 可以在POJO类头部添加,作为全局设定。需要指出的是RabbitHandler起作用必须是在RabbitMQ消息本体被正常转换为对象后,否则它将无法根据方法参数类型确定实际的handlerMethod,在实际使用时需要对MessageConverter做相关设定,具体做法请参考spring-amqp文档。
最后将Employee和Company实体类添加到消费者项目中。
启动生产者和消费者应用。
先通过sendCompanyMessage接口发送一个Company类型的消息。
消费者端日志显示这条消息被RabbitHandlerConsumer.processCompanyMessage方法接收并处理。
再发送一条Employee类型的消息
消费者端日志显示这条消息被RabbitHandlerConsumer.processEmployeeMessage方法接收并处理。
我们上面的消费者应用是基于Spring Boot环境的,如果我们想在Spring AMQP框架接收并处理对象消息,我们不需要添加@EnableRabbit和@Configuration注解,只需要在applicationContext.xml文件中添加<rabbit:annotation-driven>标签即可。
我们新建一个基于Maven的消费者项目,pom.xml添加的库为:
在src/main/resources目录下添加applicationContext.xml文件,它的主要内容如下:
再添加RabbitHandlerConsumer,Employee,Company类。
最后在主程序中加载applicationContext.xml文件,启动消费者应用
启动生产者应用,发送Company消息和Employee消息,我们可以看到消费者应用的响应与基于SpringBoot的消费者应用程序相同。
本篇我们仍然使用Spring AMQP开发生产者应用,不过我们使用零 XML配置的Spring Boot环境进行开发,使用的库是spring-boot-starter-amqp库。
使用Spring-boot-starter-amqp搭建框架
我们使用Spring Boot 1.4.3 RELEASE版本,pom.xml文件内容如下:<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>SpringMQProducer</artifactId> <version>0.1.0</version> <packaging>jar</packaging> <name>SpringMQProducer</name> <description>The MQ producer in spring boot environment</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.3.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
需要注意的是,当使用spring-boot-configuration-processor库时会默认打开RabbitAutoConfiguration,它会自动创建一个 CachingConnectionFactory对象,一个RabbitTemplate对象,一个AmqpAdmin对象以及一个RabbitMessageTemplate对象。
@Configuration @ConditionalOnClass({ RabbitTemplate.class, Channel.class }) @EnableConfigurationProperties(RabbitProperties.class) @Import(RabbitAnnotationDrivenConfiguration.class) public class RabbitAutoConfiguration { @Co 1e92c nfiguration @ConditionalOnMissingBean(ConnectionFactory.class) protected static class RabbitConnectionFactoryCreator { @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties config) { ........ @Configuration @Import(RabbitConnectionFactoryCreator.class) protected static class RabbitTemplateConfiguration { @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean(RabbitTemplate.class) public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { ........ @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty(prefix = "spring.rabbitmq", name = "dynamic", matchIfMissing = true) @ConditionalOnMissingBean(AmqpAdmin.class) public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } ....... @Configuration @ConditionalOnClass(RabbitMessagingTemplate.class) @ConditionalOnMissingBean(RabbitMessagingTemplate.class) @Import(RabbitTemplateConfiguration.class) protected static class MessagingTemplateConfiguration { @Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate( RabbitTemplate rabbitTemplate) { return new RabbitMessagingTemplate(rabbitTemplate); }
如果我们想定制自己的ConnectionFactory,RabbitTemplate等对象,建议在Configuration中将其禁用:
@Configuration @ComponentScan("com.qf.rabbitmq") @EnableAutoConfiguration(exclude = RabbitAutoConfiguration.class) @SpringBootApplication public class SpringMqProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringMqProducerApplication.class, args); } }
在src/main/resources目录下创建application.properties文件,添加rabbitmq相关设置,以spring.rabbitmq为前缀,这样它会自动映射到RabbitProperties配置类
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=rabbitmq_producer spring.rabbitmq.password=123456 spring.rabbitmq.virtualHost=test_vhosts
添加RabbitConfig类,用于创建自定义ConnectionFactory,RabbitAdmin,RabbitTemplate等。 这个类头部如下所示
@Configuration @EnableConfigurationProperties(RabbitProperties.class) public class RabbitConfig { @Autowired private RabbitProperties rabbitProperties;
这里用到了org.springframework.boot.autoconfigure.amqp.RabbitProperties类,记录application.properties中的RabbitMQ连接设置。
再创建ConnectionFactory:
@Bean("connectionFactory") public ConnectionFactory getConnectionFactory() { com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = new com.rabbitmq.client.ConnectionFactory(); rabbitConnectionFactory.setHost(rabbitProperties.getHost()); rabbitConnectionFactory.setPort(rabbitProperties.getPort()); rabbitConnectionFactory.setUsername(rabbitProperties.getUsername()); rabbitConnectionFactory.setPassword(rabbitProperties.getPassword()); rabbitConnectionFactory.setVirtualHost(rabbitProperties.getVirtualHost()); ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory); return connectionFactory; }这里可以像消费者应用里提到的那样,设置CacheMode和最大缓存数,我们这里暂不设置。 再创建RabbitAdmin对象
@Bean(name="rabbitAdmin") public RabbitAdmin getRabbitAdmin() { RabbitAdmin rabbitAdmin = new RabbitAdmin(getConnectionFactory()); rabbitAdmin.setAutoStartup(true); return rabbitAdmin; }定义MessageConverter和MessagePropertiesConverter对象
@Bean(name="serializerMessageConverter") public MessageConverter getMessageConverter(){ return new SimpleMessageConverter(); } @Bean(name="messagePropertiesConverter") public MessagePropertiesConverter getMessagePropertiesConverter() { return new DefaultMessagePropertiesConverter(); }定义发送消息所用的RabbitTemplate对象,由于我们的场景要求发送之后立即从消费者处获得返回消息,因此我们在RabbitTemplate对象中设置了ReplyAddress,而且在下面的MessageListenerContainer中将这个对象作为Listener设置到Container中。
@Bean(name="rabbitTemplate") public RabbitTemplate getRabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(getConnectionFactory()); rabbitTemplate.setUseTemporaryReplyQueues(false); rabbitTemplate.setMessageConverter(getMessageConverter()); rabbitTemplate.setMessagePropertiesConverter(getMessagePropertiesConverter()); rabbitTemplate.setReplyAddress(AppConstants.REPLY_QUEUE_NAME); rabbitTemplate.setReceiveTimeout(60000); return rabbitTemplate; }使用RabbitAdmin对象定义发送消息Exchange/Queue/Binding,返回消息Exchange/Queue/Binding对象,如果它们在RabbitMQ中已经存在,下面的定义代码可以省略.这里的Exchange/Queue名称和前面的消费者应用使用的相同。
@Bean(name="springMessageQueue") public Queue createQueue(@Qualifier("rabbitAdmin")RabbitAdmin rabbitAdmin) { Queue sendQueue = new Queue(AppConstants.SEND_QUEUE_NAME,true,false,false); rabbitAdmin.declareQueue(sendQueue); return sendQueue; } @Bean(name="springMessageExchange") public Exchange createExchange(@Qualifier("rabbitAdmin")RabbitAdmin rabbitAdmin) { DirectExchange sendExchange = new DirectExchange(AppConstants.SEND_EXCHANGE_NAME,true,false); rabbitAdmin.declareExchange(sendExchange); return sendExchange; } @Bean(name="springMessageBinding") public Binding createMessageBinding(@Qualifier("rabbitAdmin")RabbitAdmin rabbitAdmin) { Map<String,Object> arguments = new HashMap<String,Object>(); Binding sendMessageBinding = new Binding(AppConstants.SEND_QUEUE_NAME, Binding.DestinationType.QUEUE, AppConstants.SEND_EXCHANGE_NAME, AppConstants.SEND_MESSAGE_KEY, arguments); rabbitAdmin.declareBinding(sendMessageBinding); return sendMessageBinding; } @Bean(name="springReplyMessageQueue") public Queue createReplyQueue(@Qualifier("rabbitAdmin")RabbitAdmin rabbitAdmin) { Queue replyQueue = new Queue(AppConstants.REPLY_QUEUE_NAME,true,false,false); rabbitAdmin.declareQueue(replyQueue); return replyQueue; } @Bean(name="springReplyMessageExchange") public Exchange createReplyExchange(@Qualifier("rabbitAdmin")RabbitAdmin rabbitAdmin) { DirectExchange replyExchange = new DirectExchange(AppConstants.REPLY_EXCHANGE_NAME,true,false); rabbitAdmin.declareExchange(replyExchange); return replyExchange; } @Bean(name="springReplyMessageBinding") public Binding createReplyMessageBinding(@Qualifier("rabbitAdmin")RabbitAdmin rabbitAdmin) { Map<String,Object> arguments = new HashMap<String,Object>(); Binding replyMessageBinding = new Binding(AppConstants.REPLY_QUEUE_NAME, Binding.DestinationType.QUEUE, AppConstants.REPLY_EXCHANGE_NAME, AppConstants.REPLY_MESSAGE_KEY, arguments); rabbitAdmin.declareBinding(replyMessageBinding); return replyMessageBinding; }最后定义接收返回消息的Message Listener Container,这里的Listener属性设置的是上面创建的RabbitTemplate对象。
@Bean(name="replyMessageListenerContainer") public SimpleMessageListenerContainer createReplyListenerContainer() { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(getConnectionFactory()); listenerContainer.setQueueNames(AppConstants.REPLY_QUEUE_NAME); listenerContainer.setMessageConverter(getMessageConverter()); listenerContainer.setMessageListener(getRabbitTemplate()); listenerContainer.setRabbitAdmin(getRabbitAdmin()); listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO); return listenerContainer; }我们还定义了一个ThreadPoolExecutor对象,用于RabbitTemplate异步执行,发送和接收消息使用。
@Bean(name="threadExecutor") public ThreadPoolTaskExecutor createThreadPoolTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setMaxPoolSize(10); threadPoolTaskExecutor.setQueueCapacity(200); threadPoolTaskExecutor.setKeepAliveSeconds(20000); return threadPoolTaskExecutor; }
我们的生产者-消费者场景是系列1中提到的RPC方式计算阶乘,使用Rest接口调用生产者应用,生产者应用发送消息给消费者应用,计算出阶乘结果返回给生产者。
为了实现这个场景,我们先在生产者程序中定义用于发送请求消息和接收返回消息的服务接口SendMessageService和它的实现类SendMessageServiceImpl类。
public interface SendMessageService { String sendMessage(String message); } @Service("sendMessageService") public class SendMessageServiceImpl implements SendMessageService { @Autowired private ThreadPoolTaskExecutor executor; public String sendMessage(String message) { CompletableFuture<String> resultCompletableFuture = CompletableFuture.supplyAsync(new MQSenderSupplier(message), executor); try { String result = resultCompletableFuture.get(); return result; } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } }在实现类中,我们将发送请求消息和接收返回消息交给一个Supplier对象(JDK
1.8引入)在后台线程池中异步执行,我们自定义了MQSenderSupplier类,实现了发送请求和接收返回消息的操作。
public class MQSenderSupplier implements Supplier<String> { private String message; public MQSenderSupplier(String message) { this.message = message; } public String get() { Date sendTime = new Date(); String correlationId = UUID.randomUUID().toString(); MessagePropertiesConverter messagePropertiesConverter = (MessagePropertiesConverter) ApplicationContextUtil.getBean("messagePropertiesConverter"); RabbitTemplate rabbitTemplate = (RabbitTemplate)ApplicationContextUtil.getBean("rabbitTemplate"); AMQP.BasicProperties props = new AMQP.BasicProperties("text/plain", "UTF-8", null, 2, 0, correlationId, AppConstants.REPLY_EXCHANGE_NAME, null, null, sendTime, null, null, "SpringProducer", null); MessageProperties sendMessageProperties = messagePropertiesConverter.toMessageProperties(props, null,"UTF-8"); sendMessageProperties.setReceivedExchange(AppConstants.REPLY_EXCHANGE_NAME); sendMessageProperties.setReceivedRoutingKey(AppConstants.REPLY_MESSAGE_KEY); sendMessageProperties.setRedelivered(true); Message sendMessage = MessageBuilder.withBody(message.getBytes()) .andProperties(sendMessageProperties) .build(); Message replyMessage = rabbitTemplate.sendAndReceive(AppConstants.SEND_EXCHANGE_NAME, AppConstants.SEND_MESSAGE_KEY, sendMessage); String replyMessageContent = null; try { replyMessageContent = new String(replyMessage.getBody(),"UTF-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return replyMessageContent; } }由于这里的MQSenderSupplier对象是临时构建,因此它使用的RabbitTemplate和MessagePropertiesConverter对象都是从Spring
Context环境中直接获取,而没有使用自动注入方式。ApplicationContextUtil是我们定义的一个Context辅助类(类似代码可以从网上搜索到,这里就不列出了),使用RabbitTemplate.sendAndReceive方法发送请求消息和接收返回消息,没有另行定义Listener类。
再添加发送消息的Rest接口类SendMessageController
@RestController public class SendMessageController { @Autowired private SendMessageService sendMessageService; @RequestMapping(value = "/caculate", method = RequestMethod.POST) @ResponseBody public String sendMessage(@RequestBody String number) { String result = sendMessageService.sendMessage(number); return "Factorial(" + number + ") = " + result; } }修改消费者应用程序,添加一个用于计算阶乘的Message
Listener,绑定springMessageQueue队列,替换之前定义的Message Listener。
public class CaculateListener implements ChannelAwareMessageListener { @Autowired private MessagePropertiesConverter messagePropertiesConverter; @Autowired private RabbitTemplate rabbitTemplate; public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8"); String numberContent = null; numberContent = new String(message.getBody(),"UTF-8"); System.out.println("The received number is:" + numberContent); String consumerTag = messageProperties.getConsumerTag(); int number = Integer.parseInt(numberContent); String result = factorial(number); AMQP.BasicProperties replyRabbitMQProps = new AMQP.BasicProperties("text/plain", "UTF-8", null, 2, 0, rabbitMQProperties.getCorrelationId(), null, null, null, null, null, null, consumerTag, null); Envelope replyEnvelope = new Envelope(messageProperties.getDeliveryTag(), true, "springReplyMessageExchange", "springReplyMessage"); MessageProperties replyMessageProperties = messagePropertiesConverter.toMessageProperties(replyRabbitMQProps, replyEnvelope,"UTF-8"); Message replyMessage = MessageBuilder.withBody(result.getBytes()) .andProperties(replyMessageProperties) .build(); rabbitTemplate.send("springReplyMessageExchange","springReplyMessage", replyMessage); channel.basicAck(messageProperties.getDeliveryTag(), false); } ..........
这里计算阶乘的方法factorial和系列1中相同,代码不再列出。
修改applicationContext.xml文件,替换Message Listener Container中的Listener对象为CaculateListener对象。
<bean id="caculateListener" class="com.qf.rabbitmq.listener.CaculateListener" /> <rabbit:listener-container message-converter="serializerMessageConverter" .................. recovery-back-off="backOff"> <rabbit:listener ref="caculateListener" queues="springMessageQueue"/> </rabbit:listener-container>
修改完成后启动消费者和生产者程序,使用生产者的rest接口计算20的阶乘值(生产者程序使用8180端口启动),我们可以看到消费者应用计算出了20的阶乘结果,并返回给生产者应用。
假如消费者应用计算时间较长,生产者是否还能获取正确的结算结果呢?
我们修改消费者应用,在计算后睡眠10s,再返回计算结果。
重新启动消费者和生产者程序,调用计算阶乘的rest接口,我们会看到生产者端抛出了空引用异常,没有获取到返回消息。
这是因为RabbitTemplate对象默认的接收Reply消息的超时时间只有5000ms(5s),超过这个时间sendAndReceive方法会直接返回空对象。 查看RabbitTemplate的代码,我们可以看到是内置的PendingReply对象从它自身的阻塞队列queue中获取返回消息。 RabbitTemplate对象是在onMessage方法中获取到返回消息时才把消息放入这个阻塞队列的。
public class RabbitTemplate { //默认Reply Timeout时间5000ms private volatile long replyTimeout = DEFAULT_REPLY_TIMEOUT; .............. //这个方法被sendAndReceive调用,实际处理消息的发送和返回消息的接收。 private Message exchangeMessages(final String exchange, final String routingKey, final Message message, final CorrelationData correlationData, Channel channel, final PendingReply pendingReply, String messageTag) throws Exception { ............ //发送消息到阻塞队列 doSend(channel, exchange, routingKey, message, mandatory, correlationData); //从pendingReply的阻塞队列queue中获取返回消息。 reply = this.replyTimeout < 0 ? pendingReply.get() : pendingReply.get(this.replyTimeout, TimeUnit.MILLISECONDS); return reply; } public void onMessage(Message message) { try { String messageTag; ............... PendingReply pendingReply = this.replyHolder.get(messageTag); ............... message.getMessageProperties().setReplyTo(savedReplyTo); //PendingReply对象将返回消息放入阻塞队列中 pendingReply.reply(message); ............. private static class PendingReply { ............. //从阻塞队列中获取返回消息对象。 public Message get(long timeout, TimeUnit unit) throws InterruptedException { Object reply = this.queue.poll(timeout, unit); //如果返回消息不为空,将返回对象转换为Amqp Message对象。 return reply == null ? null : processReply(reply); } private Message processReply(Object reply) { if (reply instanceof Message) { return (Message) reply; } ............ } public void reply(Message reply) { this.queue.add(reply); }由于我们设置的消费者应用处理时间为10000ms,导致PendingReply从阻塞队列取返回消息时,onMessage方法还没有被触发,阻塞队列为空,因此返回的消息对象为空,从而抛出异常。
为了解决这个问题,我们可以把RabbitTemplate的replyTimeout时间设置的稍微长一点,例如1分钟(60000ms)。
@Bean(name="rabbitTemplate") public RabbitTemplate getRabbitTemplate() { ................ rabbitTemplate.setReplyTimeout(60000); return rabbitTemplate; }再次启动生产者程序,计算30的阶乘值,我们看到空引用异常不再抛出,阶乘计算的结果也正常返回。
使用AsyncRabbitTemplate发送异步消息
使用RabbitTemplate.sendAndReceive方法发送消息给消费者,获得返回消息,这是一个同步的过程,发送消息和等待接收消息是 在同一个线程中执行,如果等待返回消息的时间过长,当前方法的后续操作将被卡住,无法执行。 为了验证这一点,我们在生产者程序中添加日志输出代码。
Message replyMessage = rabbitTemplate.sendAndReceive(AppConstants.SEND_EXCHANGE_NAME, AppConstants.SEND_MESSAGE_KEY, sendMessage); logger.info("Send the caculate number to consumer"); String replyMessageContent = null; try { replyMessageContent = new String(replyMessage.getBody(),"UTF-8"); logger.info("The reply message is:" + replyMessageContent); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } logger.info("The following operation");
我们再调用阶乘接口,生产者端控制台的输出日志如下图
从日志可以看出“The following operation”日志信息的输出是在获得返回消息之后执行的,假如当前方法的后续操作不依赖于返回消息的内容,但却要等待返回消息的获取完成后才能执行,这样的同步操作是不合适的,我们需要分离消息发送和返回消息的接收,使这两个操作在不同的线程中异步执行。
要做到这一点有两种方式。
第一种方式是使用RabbitTemplate.send方法发送消息,定义一个Message Listener侦听返回队列,获取并处理返回消息,这种方式在前面的代码中已经使用过,不再赘述。
第二种方式是使用spring-amqp 1.6引入的AsyncRabbitTemplate类,实现消息的异步发送,下面我们详细介绍这种方式的实现。
为了区分同步消息和异步消息的发送,我们新定义两个消息队列,一个名为“springAsyncMessageQueue”,用于接收AsyncRabbitTemplate发送的异步消息,一个名为“springAsyncReplyMessageQueue”,用于接收异步消息处理后的返回消息,同时把这两个消息队列分别绑定到 “springMessageExchange”和“springReplyMessageExchange”上,routingKey分别是“springAsyncMessage”和“springAsyncReplyMessage”。
消费者端的applicationContext.xml修改如下:
<rabbit:queue id="springAsyncMessageQueue" name="springAsyncMessageQueue" auto-delete="false" durable="true" exclusive="false" auto-declare="true" declared-by="rabbitAdmin" /> <rabbit:direct-exchange id="springMessageExchange" name="springMessageExchange" durable="true" auto-declare="false" auto-delete="false" declared-by="rabbitAdmin"> <rabbit:bindings> <rabbit:binding queue="springMessageQueue" key="springMessage" /> <rabbit:binding queue="springAsyncMessageQueue" key="springAsyncMessage" /> </rabbit:bindings> </rabbit:direct-exchange> <rabbit:queue id="springAsyncReplyMessageQueue" name="springAsyncReplyMessageQueue" auto-delete="false" durable="true" exclusive="false" auto-declare="true" declared-by="rabbitAdmin" /> <rabbit:direct-exchange id="springReplyMessageExchange" name="springReplyMessageExchange" durable="true" auto-delete="false" auto-declare="true" declared-by="rabbitAdmin"> <rabbit:bindings> <rabbit:binding queue="springReplyMessageQueue" key="springReplyMessage" /> <rabbit:binding queue="springAsyncReplyMessageQueue" key="springAsyncReplyMessage" /> </rabbit:bindings> </rabbit:direct-exchange>
再添加一个处理异步消息的MessageListener类AsyncMessageListener,这个Listener类接收异步消息后,休眠10s后发送返回消息。
public class AsyncMessageListener implements ChannelAwareMessageListener { @Autowired private MessagePropertiesConverter messagePropertiesConverter; @Autowired private RabbitTemplate rabbitTemplate; @Override public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8"); String messageContent = new String(message.getBody(), "UTF-8"); System.out.println("The received message is:" + messageContent); String consumerTag = messageProperties.getConsumerTag(); String replyMessageContent = consumerTag + " have received the message '" + messageContent + "'"; Thread.sleep(10000); AMQP.BasicProperties replyRabbitMQProps = new AMQP.BasicProperties("text/plain", "UTF-8", null, 2, 0, rabbitMQProperties.getCorrelationId(), null, null, null, null, null, null, consumerTag, null); //创建返回消息的信封头 Envelope replyEnvelope = new Envelope(messageProperties.getDeliveryTag(), true, "springReplyMessageExchange", "springAsyncReplyMessage"); MessageProperties replyMessageProperties = messagePropertiesConverter.toMessageProperties(replyRabbitMQProps, replyEnvelope,"UTF-8"); Message replyMessage = MessageBuilder.withBody(replyMessageContent.getBytes()) .andProperties(replyMessageProperties) .build(); rabbitTemplate.send("springReplyMessageExchange","springAsyncReplyMessage", replyMessage); channel.basicAck(messageProperties.getDeliveryTag(), false); } }
在Message Listener Container中追加这个Listener
<bean id="asyncMessageListener" class="com.qf.rabbitmq.listener.AsyncMessageListener" /> <rabbit:listener-container message-converter="serializerMessageConverter" ........................ recovery-back-off="backOff"> <rabbit:listener ref="caculateListener" queues="springMessageQueue"/> <rabbit:listener ref="asyncMessageListener" queues="springAsyncMessageQueue" /> </rabbit:listener-container>
在生产者端,我们在RabitConfig中添加一个AsyncRabbitTemplate的bean定义
@Bean(name="asyncRabbitTemplate") public AsyncRabbitTemplate getAsyncRabbitTemplate() { AsyncRabbitTemplate asyncRabbitTemplate= new AsyncRabbitTemplate(getConnectionFactory(), AppConstants.SEND_EXCHANGE_NAME, AppConstants.SEND_ASYNC_MESSAGE_KEY, AppConstants.REPLY_ASYNC_QUEUE_NAME); asyncRabbitTemplate.setReceiveTimeout(60000); asyncRabbitTemplate.setAutoStartup(true); return rabbitTemplate; }
这里设置的ReceiveTimeout属性类似于RabbitTemplate的ReplyTimeout属性,是获取返回消息的超时时间。 AsyncRabbitTemplate构造函数中的四个参数分别是连接工厂,发送消息的Exchange名,发送异步消息的RoutingKey(“springAsyncMessage”,用于Exchange将异步消息转发到springAsyncMessageQueue队列),返回消息队列名(“springAsyncMessageQueue”,用于接收异步消息对应的返回消息)
在SendMessageService和SendMessageServiceImpl中添加一个发送异步消息的接口方法sendAsyncMessage
public interface SendMessageService { .......... void sendAsyncMessage(String message); } @Service("sendMessageService") public class SendMessageServiceImpl implements SendMessageService { @Autowired private ThreadPoolTaskExecutor executor; .......... @Override public void sendAsyncMessage(String message) { CompletableFuture<Void> resultCompletableFuture = CompletableFuture.runAsync(new AsyncMQSenderThread(message), executor); } }在SendMessageServiceImpl.sendAsyncMessage方法中,我们把发送异步消息的任务交给了一个后台线程异步执行,
使用了CompletableFuture的runAsync方法,没有返回值。
负责发送消息的后台任务类AsyncMQSenderThread主要代码如下:
public class AsyncMQSenderThread implements Runnable { ................. private String message; public AsyncMQSenderThread(String message) { this.message = message; } public void run() { AsyncRabbitTemplate rabbitTemplate = (AsyncRabbitTemplate) ApplicationContextUtil.getBean("asyncRabbitTemplate"); ......... Message sendMessage = MessageBuilder.withBody(message.getBytes()) .andProperties(sendMessageProperties) .build(); logger.info("Send message to consumer"); AsyncRabbitTemplate.RabbitMessageFuture replyMessageFuture = rabbitTemplate.sendAndReceive("springMessageExchange", "springAsyncMessage", sendMessage); replyMessageFuture.addCallback(new ListenableFutureCallback<Message>() { @Override public void onFailure(Throwable ex) { } @Override public void onSuccess(Message result) { logger.info("Received the reply message"); try { String replyMessageContent = new String(result.getBody(), "UTF-8"); logger.info("The reply message is:" + replyMessageContent); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } }); logger.info("The following operation"); }
在run方法中,AsyncRabbitTemplate.sendAndReceive方法返回的结果是一个RabbitMessageFuture对象,它实现了Google Guava提供的ListenableFuture接口,可以通过添加CallBack对返回消息进行处理。我们这里是添加了一个匿名的ListenableFutureCallback对象。
在SendMessageController中添加一个Rest接口sendAsyncMessage
@RestController public class SendMessageController { ............ @RequestMapping(value = "/sendMessage", method = RequestMethod.POST) public void sendAsyncMessage(@RequestBody String message) { sendMessageService.sendAsyncMessage(message); } }启动生产者和消费者应用,调用sendMessage接口,生产者端控制台的日志如下所示:
从日志可以看出,发送消息和处理返回消息是由独立的两个线程异步处理的,在发送完消息后,当前线程并没有阻塞,等待返回消息,而是直接执行后续的代码,对返回消息的处理是由RabbitMessageFuture对象绑定的Callback进行处理的。
AsyncRabbitTemplate方式和Message Listener方式都可以实现对返回消息的异步处理,它们的区别在于对返回消息中包含的CorrelationId与发送消息中包含的CorrelationId的比较,前一种方式是交给AsyncRabbitTemplate对象的内置方法处理, 而Message Listener方式需要开发者自行处理。
使用BatchingRabbitTemplate批量发送消息
使用RabbitTemplate和AsyncRabbitTemplate都是发送单条消息,每条消息发送完进行确认,但如果我们想发送20条消息,对这些消息进行一次性的确认,应该如何实现呢?我们可以使用spring-amqp 1.4.1引入的BatchingRabbitTemplate进行消息发送。BatchingRabbitTemplate的构造函数形式如下:
public BatchingRabbitTemplate(BatchingStrategy batchingStrategy, TaskScheduler scheduler)可以看出BatchingRabbitTemplate有两个属性,一个是批量策略,定义了批量发送消息时消息的组合方式以及消息Properties的设定, 另一个是任务调度器,用于定义发送批量消息的定时任务。 我们这里使用的批量策略类是spring-ampq自带的SimpleBatchingStrategy类,它的定义大致如下:
public class SimpleBatchingStrategy implements BatchingStrategy { private final int batchSize; private final int bufferLimit; private final long timeout; public SimpleBatchingStrategy(int batchSize, int bufferLimit, long timeout) { this.batchSize = batchSize; this.bufferLimit = bufferLimit; this.timeout = timeout; } ....... @Override public MessageBatch addToBatch(String exchange, String routingKey, Message message) { .........这个批量策略的思想是设置一个消息缓存区,以及批量消息数的上限,当待发送消息的条数达到上限(batchSize),或者待发送的消息的总大小超过缓存区上限(bufferLimit),即将消息拼装成一个大消息进行发送。
使用BatchingRabbitTemplate发送消息的流程如下图所示
1)调用端调用BatchingRabbitTemplate的send方法发送消息。
2)send方法调用BatchingRabbitTemplate对象设置的BatchingStrategy属性的addToBatch方法。
3)以SimpleBatchingStrategy.addToBatch方法为例,如果累计的未发送消息数达到消息数上限,或者消息总长度大于缓存区大小,
会将未发送消息队列里的消息拼装成一条完整消息,封装在一个MessageBatch中返回给BatchingRabbitTemplate对象。
@Override public MessageBatch addToBatch(String exchange, String routingKey, Message message) { ............ //消息队列里现存消息加待发送消息大小超过缓存区上限,先将消息队列里已有消息拼装为 //一条消息返回。 if (this.messages.size() > 0 && this.currentSize + bufferUse > this.bufferLimit) { //拼装消息队列里已有消息到MessageBatch batch = doReleaseBatch(); //使用新消息的exchange,routingKey初始化exchange和routingKey this.exchange = exchange; this.routingKey = routingKey; } this.currentSize += bufferUse; this.messages.add(message); //如果添加了新消息的消息队列的消息数超过消息上限或者消息总大小超过缓存区上限, //将消息队列里所有消息拼装成一条消息,封装到MessageBatch对象中返回。 if (batch == null && (this.messages.size() >= this.batchSize || this.currentSize >= this.bufferLimit)) { batch = doReleaseBatch(); } ................... } private MessageBatch doReleaseBatch() { if (this.messages.size() < 1) { return null; } Message message = assembleMessage(); MessageBatch messageBatch = new MessageBatch(this.exchange, this.routingKey, message); this.messages.clear(); this.currentSize = 0; this.exchange = null; this.routingKey = null; return messageBatch; }
4)BatchingRabbitTemplate对象使用父类(RabbitTemplate)的send方法发送MessageBatch中包含的消息。
我们修改生产者程序,添加BatchingRabbitTemplate对象
@Bean(name="batchingStrategy") public BatchingStrategy createBatchingStrategy() { SimpleBatchingStrategy batchingStrategy = new SimpleBatchingStrategy(20,1000,60000); return batchingStrategy; } @Bean(name="batchRabbitTemplate") public BatchingRabbitTemplate createBatchRabbitTemplate() { BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(createBatchingStrategy(), new ConcurrentTaskScheduler()); batchTemplate.setConnectionFactory(getConnectionFactory()); return batchTemplate; }
我们这里的BatchingStrategy设置的消息条数上限为20条,消息总大小上限为1000。使用的TaskScheduler是java自带的 ConcurrentTaskScheduler对象,使用单线程池执行后续发送批量消息的计划任务,实际使用时可以设置为自定义的TaskScheduler.
为了测试方便,我们再添加两个消息队列,一个消息队列名为springBatchMessageQueue,用于接收发送的批量消息,一个消息队列名为springBatchReplyMessageQueue。在生产者和消费者端分别添加定义队列和消息绑定的代码,由于和异步消息类似,这里不再列出。 在消费者端定义一个BatchMessageListener,用于从springBatchMessageQueue接收和处理消息,它的主要代码如下:
public class BatchMessageListener implements ChannelAwareMessageListener { private Logger logger = LoggerFactory.getLogger(BatchMessageListener.class); public void onMessage(Message message, Channel channel) throws Exception { MessageProperties messageProperties = message.getMessageProperties(); AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties(messageProperties, "UTF-8"); String messageContent = new String(message.getBody(), "UTF-8"); String correlationId = rabbitMQProperties.getCorrelationId(); //打印接收到的批量消息内容和correlationId,用于检验是否使用第一条消息的correaltionId. logger.info("The received batch message is:" + messageContent); logger.info("The correlation id is:" + correlationId); ...... }
在生产者端,我们在SendMessageService接口中定义发送批量消息的接口sendBatchMessages方法。
public interface SendMessageService { ........ void sendBatchMessage(List<String> messages); } public class SendMessageServiceImpl implements SendMessageService { ......... @Override public void sendBatchMessage(List<String> messages) { CompletableFuture.runAsync(new BatchMQSenderThread(messages), executor); }我们定义了BatchMQSenderThread类来实现后台发送批量消息,它的主要代码如下:
public class BatchMQSenderThread implements Runnable { private Logger logger = LoggerFactory.getLogger(BatchMQSenderThread.class); private List<String> messageList; public BatchMQSenderThread(List<String> messageList) { this.messageList = messageList; } public void run() { BatchingRabbitTemplate rabbitTemplate = (BatchingRabbitTemplate) ApplicationContextUtil.getBean("batchRabbitTemplate"); for(String message:messageList) { Date sendTime = new Date(); String correlationId = UUID.randomUUID().toString(); AMQP.BasicProperties props = new AMQP.BasicProperties("text/plain", "UTF-8", null, 2, 0, correlationId, AppConstants.REPLY_EXCHANGE_NAME, null, null, sendTime, null, null, "SpringProducer", null); MessageProperties sendMessageProperties = messagePropertiesConverter.toMessageProperties(props, null,"UTF-8"); sendMessageProperties.setReceivedExchange(AppConstants.REPLY_EXCHANGE_NAME); sendMessageProperties.setReceivedRoutingKey(AppConstants.REPLY_BATCH_MESSAGE_KEY); sendMessageProperties.setRedelivered(true); Message sendMessage = MessageBuilder.withBody(message.getBytes()) .andProperties(sendMessageProperties) .build(); logger.info("Send message '" + message + "' to batch"); logger.info("The message's correlation id is:" + correlationId); rabbitTemplate.send(AppConstants.SEND_EXCHANGE_NAME, AppConstants.SEND_BATCH_MESSAGE_KEY, sendMessage, null); } }
BatchMQSenderThread.run方法循环读取消息List中的消息,调用batchingRabbitTemplate对象进行消息发送,每条消息我们都生成一个uuid作为消息的correlationId,我们接下来会看到,在最后发送的批量消息中,只有第一条消息的correlationId被采用。
最后我们定义一个发送批量消息的rest接口,批量消息使用逗号进行分割。
@RestController public class SendMessageController { ............ @RequestMapping(value = "/sendMessages", method = RequestMethod.POST) public void sendMessages(@RequestBody String messages) { List<String> messageList = Splitter.on(",").splitToList(messages); sendMessageService.sendBatchMessage(messageList); }修改完成后,我们启动生产者和消费者应用,通过生产者rest接口发送30条消息(消息正文从Message0到Message29)
从生产者端日志可以看出,SendMessageServiceImpl调用BatchRabbitTemplate.send方法,发送了30条消息,但是由于消息条数超过了我们预设的20条限制,实际只批量发送了20条消息(从消费者端的日志截图可以看到),消费者端仍然是把发送的批量消息作为20条独立的消息各自接收的。
由于我们定义的SimpleBatchingStrategy策略对象的超时时间是60000ms(1分钟),1分钟后,剩余的10条消息被BatchRabbitTemplate对象批量发出,被消费者接收,消费者日志显示了这一点。
消费者日志还显示,BatchRabbitTemplate对象发送的30条消息,它们都具有相同的correlationId,是第一条消息的correlationId,而不是创建这些消息时生成的uuid,这是由SimpleBatchingStrategy类的批量策略决定的。
下面我们修改一下生产者的SimpleBatchingStrategy属性设定,设定消息缓存上限为200,超时时间为120000ms(2分钟),再发送30条消息。 消费者端的输出日志如下:
我们可以看出,由于消息缓存上限的限制,BatchRabbitTemplate只批量发送了16条消息,后14条消息是在2分钟后延时发送。
在实际使用时,我们也可以根据业务需要,自定义BatchStrategy策略。
Rabbit Annotation的使用
在系列2里我们提到了使用spring-rabbit的rabbit前缀简化applicationContext.xml配置文件中的RabbitMQ beans配置,spring-amqp还提供了RabbitMQ常用对象对应的Annoation,这些注解如下图所示:下面我们使用这些Annotation简化我们的生产者应用代码。
我们直接修改ReplyBatchMessageListener,在它的onMessage方法头部添加以下注解:
@RabbitListener( bindings = @QueueBinding( value = @Queue(value="springBatchReplyMessageQueue", durable = "true", exclusive = "false",autoDelete = "false"), exchange = @Exchange(value="springReplyMessageExchange", durable = "true"), key = "springBatchReplyMessage"), admin="rabbitAdmin") public void onMessage(Message message) { try { String messageContent = new String(message.getBody(), "UTF-8"); logger.info("The reply message content is:" + messageContent); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } }
这个注解中使用到了@RabbitListener,将onMessage方法定义为一个Message Listener, @QueueBinding用于定义Binding对象,@Queue定义Queue对象,@Exchange定义Exchange对象,key属性是Bind对象的routingKey. spring-amqp内置的RabbitAdmin对象根据@Queue和@Exchange的属性,在RabbitMQ服务器上创建消息队列和Exchange, 根据@QueueBinding的属性,在RabbitMQ服务器上建立对应的Bind关系。
admin属性是创建Message Listener Container使用的RabbitAdmin对象,我们这里引用RabbitConfig中定义的“rabbitAdmin”对象。
@RabbitListener定义的Message Listener,所在的Message Listener Container是spring-amqp根据默认设置创建的Message Listener Container对象。如果我们想根据需要使用定制的Message Listener Container,我们需要在RabbitConfig类中定制SimpleMessageListenerContainerFactory对象:
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(getConnectionFactory()); factory.setConcurrentConsumers(5); factory.setMaxConcurrentConsumers(5); factory.setConsumerTagStrategy(new ReplyConsumerTagStrategy()); factory.setPrefetchCount(5); factory.setMessageConverter(getMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); BackOff backOff = new FixedBackOff(60000,100); factory.setRecoveryBackOff(backOff); return factory; }
我们可以看出这个方法设置的factory对象属性,和我们之前定义的Message Listener Container bean对象的属性有相似之处, 不过factory对象的属性,并没有完全覆盖Message Listener Container对象的属性定义,例如messagePropertiesConverter属性的设置,在SimpleRabbitListenerContainerFactory类中就找不到,因此,如果想使用Message Listener Container比较复杂的属性,建议还是使用@Bean定义。
如果@RabbitListener关联的消息队列,Exchange和Bind关系在RabbitMQ服务器中已经创建,我们可以在@RabbitListener中不使用@QueueBinding,直接使用queues属性,此时onMessage方法头部的注解如下:
@RabbitListener(admin="rabbitAdmin", queues="springBatchReplyMessageQueue") public void onMessage(Message message)
为了使Rabbit注解生效,还需要添加@EnableRabbit注解,我们在RabbitConfig类头部添加这个注解
@Configuration @EnableRabbit @EnableConfigurationProperties(RabbitProperties.class) public class RabbitConfig { ........
启动生产者和消费者应用,使用批量接口发送20条消息,从生产者日志我们可以看出我们定义在ReplyBatchMessageListener.onMessage方法上的添加的@RabbitListener将onMessage方法变成了一个Message Listener,接收了springBatchReplyMessageQueue队列的返回消息。
如果有很多方法想定义为一个Message Listener,而他们使用的@RabbitListener注解内容又相同,我们可以自定义RabbitListener注解,例如,我们可以将上面的@RabbitListener注解定义为ReplyRabbitListener注解接口
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) @RabbitListener(admin="rabbitAdmin", queues="springBatchReplyMessageQueue") public @interface ReplyRabbitListener { }我们注释掉ReplyBatchMessageListener.onMessage方法头部的@RabbitListener注解,重新定义一个POJO类ReplyConsumerDelegate, 在这个类中添加一个消息处理方法processReplyMessage,在这个方法头部添加我们自定义的注解ReplyRabbitListener,把这个方法变成了一个Message
Listener,在这个类头部我们还添加了@Component注解,使这个对象在SpringBoot启动时被初始化。
@Component public class ReplyConsumerDelegate { private Logger logger = LoggerFactory.getLogger(ReplyConsumerDelegate.class); @ReplyRabbitListener public void processReplyMessage(String message) { logger.info("The reply message content is:" + message); } }
我们再启动生产者应用,发送批量消息,从生产者日志可以看出,@ReplyRabbitListener注解使处理返回消息的方法变成了ReplyConsumerDelegate.processReplyMessage
对象类型消息的发送
前面的例子中发送的消息消息本体都是String类型,如果我们想发送一个对象类型的消息时,应该怎么处理呢?下面的实例将演示如何发送对象消息。我们定义两个实体类Company和Employee
public class Company implements java.io.Serializable { private int id; private String companyName; private String establishDate; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getCompanyName() { return companyName; } public void setCompanyName(String companyName) { this.companyName = companyName; } public String getEstablishDate() { return establishDate; } public void setEstablishDate(String establishDate) { this.establishDate = establishDate; } public String toString() { return "companyId=" + Integer.toString(getId()) + ",companyName=" + getCompanyName() + ",establishDate=" + getEstablishDate(); } } public class Employee implements java.io.Serializable { private String employeeId; private String employeeName; private String employeeDate; public String getEmployeeId() { return employeeId; } public void setEmployeeId(String employeeId) { this.employeeId = employeeId; } public String getEmployeeName() { return employeeName; } public void setEmployeeName(String employeeName) { this.employeeName = employeeName; } public String getEmployeeDate() { return employeeDate; } public void setEmployeeDate(String employeeDate) { this.employeeDate = employeeDate; } public String toString() { return "EmployeeId=" + getEmployeeId() + ",EmployeeName=" + getEmployeeName() + ",EmployeeDate=" + getEmployeeDate(); } }
我们创建一个名为“springRabbitHandlerQueue”的消息队列,绑定到“springMessageExchange”Exchange对象上。
我们设定分别发送一条Company类型的消息,一条Employee类型的消息到springRabbitHandlerQueue消息队列里,消费者在同一个Message Listener中使用不同方法处理不同类型的消息。
我们在生产者程序中添加HandlerMQSenderThread类,用于发送对象类型的消息,它的主要代码如下:
public class HandlerMQSenderThread implements Runnable { private Object message; public HandlerMQSenderThread(Object message) { this.message = message; } @Override public void run() { ............ RabbitTemplate rabbitTemplate = (RabbitTemplate)ApplicationContextUtil.getBean("rabbitTemplate"); AMQP.BasicProperties props = new AMQP.BasicProperties(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT, "UTF-8", null, 2, 0, correlationId, AppConstants.REPLY_EXCHANGE_NAME, null, null, sendTime, null, null, "SpringProducer", null); Message sendMessage = MessageBuilder.withBody(toByteArray(message)) .andProperties(sendMessageProperties) .build(); ...... rabbitTemplate.send(AppConstants.SEND_EXCHANGE_NAME, AppConstants.SEND_HANDLER_MESSAGE_KEY, sendMessage); } public byte[] toByteArray (Object obj) { byte[] bytes = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); bytes = bos.toByteArray (); oos.close(); bos.close(); } catch (IOException ex) { ex.printStackTrace(); } return bytes; }
run方法里调用toByteArray方法,把对象消息转换成byte数组。在MessageProperties中,我们设定content-type为 “application/x-java-serialized-object”,表示发送的消息体类型为Java对象,便于消费者端进行反序列化。
在SendMessageService中我们定义一个接口方法sendObjectMessage,用于发送对象消息。
public interface SendMessageService { .......... void sendObjectMessage(Object message); } public class SendMessageServiceImpl implements SendMessageService { ....... @Override public void sendObjectMessage(Object message) { CompletableFuture.runAsync(new HandlerMQSenderThread(message), executor); } }
在SendMessageController类中添加两个Rest接口方法sendCompanyMessage和sendEmployeeMessage,这两个方法从客户端接收Json形式的报文,将其转换为Company对象和Employee对象。
@RequestMapping(value = "/sendCompanyMessage", method = RequestMethod.POST) public void sendCompanyMessage(@RequestBody String message) { JSONObject obj = (JSONObject)JSON.parse(message); Company company = JSONObject.toJavaObject(obj, Company.class); sendMessageService.sendObjectMessage(company); } @RequestMapping(value = "/sendEmployeeMessage", method = RequestMethod.POST) public void sendEmployeeMessage(@RequestBody String message) { JSONObject obj = (JSONObject)JSON.parse(message); Employee employee = JSONObject.toJavaObject(obj, Employee.class); sendMessageService.sendObjectMessage(employee); }
消费者端,我们使用spring-boot-starter-amqp创建一个消费者应用,启动类和Config类的主要代码如下:
@SpringBootApplication @ComponentScan("com.qf.rabbitmq") @EnableAutoConfiguration(exclude = RabbitAutoConfiguration.class) public class SpringConsumerTestApplication { public static void main(String[] args) { SpringApplication.run(SpringConsumerTestApplication.class, args); } } @Configuration @EnableRabbit @EnableConfigurationProperties(RabbitProperties.class) //public class RabbitConfig implements RabbitListenerConfigurer public class RabbitConfig { @Autowired private RabbitProperties rabbitProperties; /** * 创建RabbitMQ连接工厂 * @return 根据application.properties设定参数建立的连接工厂对象 */ @Bean("connectionFactory") public ConnectionFactory getConnectionFactory() { com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = new com.rabbitmq.client.ConnectionFactory(); rabbitConnectionFactory.setHost(rabbitProperties.getHost()); rabbitConnectionFactory.setPort(rabbitProperties.getPort()); rabbitConnectionFactory.setUsername(rabbitProperties.getUsername()); rabbitConnectionFactory.setPassword(rabbitProperties.getPassword()); rabbitConnectionFactory.setVirtualHost(rabbitProperties.getVirtualHost()); ConnectionFactory connectionFactory = new CachingConnectionFactory(rabbitConnectionFactory); return connectionFactory; } /** * 创建Message Listener Container工厂,它负责创建RabbitListener注解的 * Message Listener所在的Container. * @return Message Listener Container工厂对象 */ @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(getConnectionFactory()); factory.setConcurrentConsumers(5); factory.setMaxConcurrentConsumers(5); factory.setPrefetchCount(5); return factory; } }
创建一个POJO类RabbitHandlerConsumer,通过添加@RabbitListener注解和@RabbitHandler注解,使它成为springRabbitHandlerQueue 消息的Message Listener对象,并且根据不同类型的消息分别处理。
@Component @RabbitListener(queues="springRabbitHandlerQueue") public class RabbitHandlerConsumer { private Logger logger = LoggerFactory.getLogger(RabbitHandlerConsumer.class); @RabbitHandler public void processCompanyMessage(Company message) { logger.info("Received the message having Company type."); logger.info("The message content is:" + message.toString()); } @RabbitHandler public void processEmployeeMessage(Employee message) { logger.info("Received the message having Employee type."); logger.info("The message content is:" + message.toString()); } }
RabbitHandler注解是spring-amqp 1.5开始引入的,它用于将POJO类方法转换为消息处理方法,RabbitListener注解则提升到类级别, 可以在POJO类头部添加,作为全局设定。需要指出的是RabbitHandler起作用必须是在RabbitMQ消息本体被正常转换为对象后,否则它将无法根据方法参数类型确定实际的handlerMethod,在实际使用时需要对MessageConverter做相关设定,具体做法请参考spring-amqp文档。
最后将Employee和Company实体类添加到消费者项目中。
启动生产者和消费者应用。
先通过sendCompanyMessage接口发送一个Company类型的消息。
消费者端日志显示这条消息被RabbitHandlerConsumer.processCompanyMessage方法接收并处理。
再发送一条Employee类型的消息
消费者端日志显示这条消息被RabbitHandlerConsumer.processEmployeeMessage方法接收并处理。
我们上面的消费者应用是基于Spring Boot环境的,如果我们想在Spring AMQP框架接收并处理对象消息,我们不需要添加@EnableRabbit和@Configuration注解,只需要在applicationContext.xml文件中添加<rabbit:annotation-driven>标签即可。
我们新建一个基于Maven的消费者项目,pom.xml添加的库为:
<dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-amqp</artifactId> <version>1.6.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.6.5.RELEASE</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.21</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.21</version> </dependency> </dependencies>
在src/main/resources目录下添加applicationContext.xml文件,它的主要内容如下:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.6.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd" > <context:annotation-config/> <context:property-placeholder ignore-unresolvable="true" location="classpath*:/application.properties" /> <context:component-scan base-package="com.qf.rabbitmq" /> <rabbit:annotation-driven container-factory="rabbitListenerContainerFactory" /> <bean id="rabbitMQConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> <property name="username" value="${mq.userName}" /> <property name="password" value="${mq.password}" /> <property name="host" value="${mq.ip}" /> <property name="port" value="${mq.port}" /> <property name="virtualHost" value="${mq.virutalHost}" /> <property name="automaticRecoveryEnabled" value="false" /> <property name="topologyRecoveryEnabled" value="false" /> <property name="networkRecoveryInterval" value="60000" /> </bean> <rabbit:connection-factory id ="connectionFactory" connection-factory="rabbitMQConnectionFactory" connection-timeout="10000" cache-mode="CHANNEL" channel-cache-size="20"/> <bean id="rabbitListenerContainerFactory" class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory"> <property name="connectionFactory" ref="connectionFactory"/> <property name="concurrentConsumers" value="3"/> <property name="maxConcurrentConsumers" value="3"/> </bean> <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory" auto-startup="true"/> </beans>
再添加RabbitHandlerConsumer,Employee,Company类。
最后在主程序中加载applicationContext.xml文件,启动消费者应用
public class RabbitCustomerTest { public static void main( String[] args ) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "classpath:applicationContext.xml" }); context.start(); } }
启动生产者应用,发送Company消息和Employee消息,我们可以看到消费者应用的响应与基于SpringBoot的消费者应用程序相同。
相关文章推荐
- RabbitMQ的Java应用(2) -- 使用Spring AMQP开发消费者应用
- Spring Boot——开发新一代Spring Java应用
- 用Spring Boot颠覆Java应用开发
- Java 开发基于Zookeeper,Spring,vue.js的高并发多用户模块化微信商城系统(四) Java微框架Spring Boot的应用
- Spring Boot——开发新一代Spring Java应用
- Spring Boot——开发新一代Spring Java应用
- Spring Boot——开发新一代Spring Java应用
- 不使用 spring-boot-starter-parent 构建 spring boot 应用
- Spring Boot学习(三):开发新一代Spring Java应用
- Spring Boot——开发新一代Spring Java应用
- RabbitMQ入门之spring-boot-starter-amqp<一>
- 使用SpringBoot,优质快速开发Java项目
- Spring Boot——开发新一代Spring Java应用
- spring,springMVC的优点和区别 spring 是是一个开源框架,是为了解决企业应用程序开发,功能如下 ◆目的:解决企业应用开发的复杂性 ◆功能:使用基本的JavaBean代替EJB,并
- Spring Boot——开发新一代Spring Java应用
- spring boot中通过开发jar包,检查目标应用的注解的非法使用
- 在Spring Boot 应用中使用JSP开发网页
- 用spring Boot颠覆Java应用开发
- Spring Boot 2.0.0.M7 使用异步消息服务-AMQP(RabbitMQ)
- Spring Boot——开发新一代Spring Java应用