SpringBoot 1.7 整合RabbitMQ
1.7 整合RabbitMQ
1.7.1 RabbitMQ简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。 AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。 RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
在开发过程中,我们需要了解RabbitMQ的消息交换类型(Exchange 类型):
Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。只说前三种模式。
- (1) Direct模式
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配
- (2) Topic模式
topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词。
- (3) Fanout模式
每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
1.7.2 配置工程
Pom配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application-dev.properties 配置 #================== RabbitMq ===================# spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin #================== RabbitMq 队列配置 ===================# mq.env=local basic.info.mq.exchange.name=${mq.env}:sys:info:mq:exchange basic.info.mq.routing.key.name=${mq.env}:sys:info:mq:routing:key basic.info.mq.queue.name=${mq.env}:sys:info:mq:queue
创建RabbitmqConfig,在配置类中创建队列、交换机、路由及其绑定。
package com.zone7.demo.helloworld.config.rabbitmq; import com.zone7.demo.helloworld.sys.service.impl.CustomerMqServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; /** * RabbitMq配置类 * */ @Configuration public class RabbitMqConfig { private static final Logger log= LoggerFactory.getLogger(RabbitMqConfig.class); @Autowired private Environment env; @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; /** * 单一消费者 * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); return factory; } /** * 多个消费者 * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer(){ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory,connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setAcknowledgeMode(AcknowledgeMode.NONE); factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class)); factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class)); factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class)); return factory; } @Bean public RabbitTemplate rabbitTemplate(){ connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause); } }); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message); } }); return rabbitTemplate; } //TODO:基本消息模型构建 @Bean public DirectExchange basicExchange(){ return new DirectExchange(env.getProperty("basic.info.mq.exchange.name"), true,false); } @Bean(name = "basicQueue") public Queue basicQueue(){ return new Queue(env.getProperty("basic.info.mq.queue.name"), true); } @Bean public Binding basicBinding(){ return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(env.getProperty("basic.info.mq.routing.key.name")); } }
1.7.3 案例开发
- (1) 创建服务层代码
首先开发一个带有消息接收监听功能和消息发送功能的服务。
package com.zone7.demo.helloworld.sys.service.impl; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.zone7.demo.helloworld.sys.service.RabbitService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageBuilder; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.env.Environment; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Service; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; /** * 消息监听器以及发送服务 */ @Service public class RabbitServiceImpl implements RabbitService { private static final Logger log= LoggerFactory.getLogger(RabbitServiceImpl.class); @Autowired private Environment env; @Autowired private RabbitTemplate rabbitTemplate; /** * 消息消费 * @param message */ @RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer") public void consumeMessage(@Payload byte[] message){ try { //TODO:接收String String result=new String(message,"UTF-8"); log.info("接收String消息: {} ",result); }catch (Exception e){ log.error("监听消费消息 发生异常: ",e.fillInStackTrace()); } } /** * 消息发送 * @param message */ @Override public void sendMessage(String message) { try { log.info("待发送的消息: {} ",message); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(env.getProperty("basic.info.mq.exchange.name")); rabbitTemplate.setRoutingKey(env.getProperty("basic.info.mq.routing.key.name")); Message msg= MessageBuilder.withBody(message.getBytes(Charset.forName("UTF-8"))).build(); rabbitTemplate.convertAndSend(msg); }catch (Exception e){ log.error("发送简单消息发生异常: ",e.fillInStackTrace()); } } }
- (2) 创建控制层代码
package com.zone7.demo.helloworld.sys.controller; import com.zone7.demo.helloworld.commons.response.ResponseData; import com.zone7.demo.helloworld.sys.service.RabbitService; import com.zone7.demo.helloworld.sys.service.RedisService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; /** * @Author: zone7 * @Date: 2019/06/17 * @Version 1.0 */ @RestController @RequestMapping("/rabbit") public class RabbitController { @Autowired private RabbitService rabbitService; @GetMapping("/send/{message}") public ResponseData send(@PathVariable String message ){ rabbitService.sendMessage(message); return ResponseData.successMessage("发送消息: "+message+"成功"); } }
- (3) 测试效果
确保Rabbit已经启动,启动工程后在浏览器中输入
http://localhost:8080/rabbit/send/hello
我们将会看到以下信息:
同时查看控制台输出,确认消息发送成功并已经被消费。
通过rabbit监控网页http://localhost:15672/ 我们可看到队列、交换机、路由等信息。
- 点赞
- 收藏
- 分享
- 文章举报
- springboot整合rabbitmq的五种模式示例
- SpringBoot系列八:SpringBoot整合消息服务(SpringBoot 整合 ActiveMQ、SpringBoot 整合 RabbitMQ、SpringBoot 整合 Kafka)
- 集群与负载均衡系列(5)——消息队列之spring-boot整合Rabbitmq
- SpringBoot整合RabbitMQ之典型应用场景实战一
- SpringBoot整合RabbitMQ
- springboot整合rabbitmq,支持消息确认机制
- SpringBoot整合Rabbitmq设置消息请求头
- springboot整合rabbitmq
- Spring Boot整合RabbitMQ开发实战详解
- 【推荐】springboot学习笔记-6 springboot整合RabbitMQ
- spring boot实战(第十二篇)整合RabbitMQ
- spring boot实战(第十二篇)整合RabbitMQ
- Spring Boot 构建应用——整合消息中间件 RabbitMQ
- 【SpringBoot】整合RabbitMQ
- rabbitmq学习(五):springboot整合rabbitmq
- SpringBoot 整合 RabbitMQ(包含三种消息确认机制以及消费端限流)
- Rabbitmq 整合Spring,SpringBoot与Docker
- RabbitMQ详解以及和SpringBoot整合
- SpringBoot整合RabbitMQ之典型应用场景实战一
- SpringBoot整合RabbitMQ 实现五种消息模型 详细教程