springboot研究五:springboot整合rabbitmq
rabbitmq是当下非常流行的消息队列,本文主要介绍springboot中如何配置使用rabbitmq。
文中代码基于springboot2.1.6,源代码见文末地址。
1.为了自己玩方便,可以用docker安装rabbitmq,见专栏内文章
《docker安装rabbitmq》
2.相关配置
spring.rabbitmq.host=192.168.59.128 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin #这个如果不配置,就会默认找"/" spring.rabbitmq.virtual-host=my_vhost #指定心跳超时,单位秒,0为不指定;默认60s spring.rabbitmq.requested-heartbeat=20 #是否启用【发布确认】 spring.rabbitmq.publisher-confirms=true #是否启用【发布返回】 spring.rabbitmq.publisher-returns=true #连接超时,单位毫秒,0表示无穷大,不超时 spring.rabbitmq.connection-timeout=10
3.pom依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
4.rabbitmq有4种exchange
Exchange type | Default pre-declared names |
---|---|
Direct exchange | (Empty string) and amq.direct |
Fanout exchange | amq.fanout |
Topic exchange | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
a.direct exchange使用routing key进行消息传输,如下图,routing key其实就是queue和exchange的绑定。适用于多工作者协同工作的场景。
绑定代码如下:代码中queue名称和routing key名称都是"direct"
@Configuration public class DirectRabbitConfig { @Bean public Queue direct() { return new Queue("direct"); } @Bean public DirectExchange directExchange() { return new DirectExchange("directExchange"); } @Bean public Binding directBindingExchange(Queue direct, DirectExchange directExchange) { return BindingBuilder.bind(direct).to(directExchange).with("direct"); } }
sender如下:
@Service public class DirectSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void sendString(String message) { logger.info("direct sender : " + message); rabbitTemplate.convertAndSend("directExchange", "direct", message); } public void sendObject(Object message) { String messageStr = JSONObject.toJSONString(message); logger.info(messageStr); rabbitTemplate.convertAndSend("directExchange", "direct", messageStr); } }
receiver:
@RabbitHandler @RabbitListener(queues = {"direct"}) public void processDirect(Message message) { logger.info("Receiver direct: {}", new String(message.getBody())); }
b.fanout exchange就是广播模式,把消息路有给所有的绑定队列,可以适用于群聊天的场景。
配置代码如下:其中有3个队列绑定一个fanout exchange
@Configuration public class FanoutRabbitConfig { @Bean public Queue queueA(){ return new Queue("fanout.a"); } @Bean public Queue queueB(){ return new Queue("fanout.b"); } @Bean public Queue queueC(){ return new Queue("fanout.c"); } @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean public Binding bindingExchangeA(Queue queueA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueA).to(fanoutExchange); } @Bean public Binding bindingExchangeB(Queue queueB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueB).to(fanoutExchange); } @Bean public Binding bindingExchangeC(Queue queueC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(queueC).to(fanoutExchange); } }
sender:
@Service public class FanoutSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private AmqpTemplate rabbitTemplate; public void send(String message) { logger.info("fanout sender : {}", message); rabbitTemplate.convertAndSend("fanoutExchange","", message); } }
receiver:
@RabbitHandler @RabbitListener(queues = {"fanout.a", "fanout.b", "fanout.c"}) public void processFanout1(Message message) { logger.info("Receiver fanout: {}", new String(message.getBody())); }
c.topic exchange通过routing key和通配符来路由消息,适用于发布订阅场景。
配置代码:
@Configuration public class TopicRabbitConfig { @Bean public Queue queueMessage() { return new Queue("topic.message"); } @Bean public Queue queueMessage2() { return new Queue("topic.message2"); } /** * 将队列绑定到Topic交换器 * @return */ @Bean public TopicExchange exchange() { return new TopicExchange("topicExchange"); } /** * 将队列绑定到Topic交换器 * @param queueMessage * @param exchange * @return */ @Bean public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 将队列绑定到Topic交换器 采用#的方式 * @param exchange * @param queueMessage2 * @return */ @Bean Binding bindingExchangeMessage2(TopicExchange exchange, Queue queueMessage2) { return BindingBuilder.bind(queueMessage2).to(exchange).with("topic.#"); } }
sender:
@Service public class TopicSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void send1(String message) { logger.info("topic sender1 : " + message); rabbitTemplate.convertAndSend("topicExchange", "topic.message", message); } public void send2(String message) { logger.info("topic sender2 : " + message); rabbitTemplate.convertAndSend("topicExchange", "topic.message2", message); } }
接受
@RabbitHandler @RabbitListener(queues = {"topic.message"}) public void processTopic(Message message) { logger.info("Receiver topic: {}", new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = {"topic.message2"}) public void processTopic2(Message message) { logger.info("Receiver topic2: {}", new String(message.getBody())); }
d.header exchange忽略routing key参数,用header来取代
配置
@Configuration public class HeadersRabbitConfig { @Bean public Queue headerQueue() { return new Queue("headerQueue"); } @Bean public Queue headerQueue2() { return new Queue("headerQueue2"); } @Bean public HeadersExchange headerExchange() { return new HeadersExchange("headerExchange"); } @Bean public HeadersExchange headerExchange2() { return new HeadersExchange("headerExchange2"); } @Bean public Binding bindingExchange(Queue headerQueue, HeadersExchange headerExchange) { Map<String,Object> headerValues = new HashMap<>(3); headerValues.put("param1", "value1"); headerValues.put("param2", "value2"); return BindingBuilder.bind(headerQueue).to(headerExchange).whereAll(headerValues).match(); } @Bean public Binding bindingExchange2(Queue headerQueue2, HeadersExchange headerExchange2) { Map<String,Object> header = new HashMap<>(3); header.put("param1", "value1"); header.put("param2", "value2"); return BindingBuilder.bind(headerQueue2).to(headerExchange2).whereAny(header).match(); } }
发送:
@Service public class HeadersSenderService { private Logger logger = LoggerFactory.getLogger(getClass()); @Resource private AmqpTemplate rabbitTemplate; public void headerSend(Map<String, Object> head, String msg){ logger.info("header send message: "+msg); rabbitTemplate.convertAndSend("headerExchange", "headerQueue", getMessage(head, msg)); } public void headerSend2(Map<String, Object> head, String msg){ logger.info("header1 send message: "+msg); rabbitTemplate.convertAndSend("headerExchange2", "headerQueue2", getMessage(head, msg)); } private Message getMessage(Map<String, Object> head, Object msg){ MessageProperties messageProperties = new MessageProperties(); for (Map.Entry<String, Object> entry : head.entrySet()) { messageProperties.setHeader(entry.getKey(), entry.getValue()); } MessageConverter messageConverter = new SimpleMessageConverter(); return messageConverter.toMessage(msg, messageProperties); } }
接收:
@RabbitHandler @RabbitListener(queues = {"headerQueue"}) public void processHeaders(Message message) { logger.info("Receiver header: {}", new String(message.getBody())); } @RabbitHandler @RabbitListener(queues = {"headerQueue2"}) public void processHeaders1(Message message) { logger.info("Receiver header2: {}", new String(message.getBody())); }
5.测试,测试代码写在RabbitMqController中,启动Application即可进行url测试。见源码。
说明:
a.topic exchange,浏览器输入http://localhost:8082/mq/topic后,topic.#的routing key收到了2条消息,topic.message的routing key收到了1条,可以看出通配符的作用
b.headers exchange:浏览器输入http://localhost:8082/mq/headers,发送了4条消息,但是第1条没有收到。因为headerExchange绑定时使用了whereAll,headerExchange2绑定时使用了whereAny。
- RabbitMQ3.7 研究-SpringBoot整合RibbitMQ
- spring boot整合rabbitmq踩坑
- Springboot整合RabbitMQ,良心推荐
- RabbitMQ(三):RabbitMQ与Spring Boot简单整合
- 第一次使用spring boot整合RabbitMQ
- Spring Boot整合RabbitMQ
- spring boot实战(番外篇)整合RabbitMQ
- (转载)Springboot 整合RabbitMq
- Spring boot 整合RabbitMq有简单到深入
- SpringBoot学习(六)—— springboot快速整合RabbitMQ
- Spring Boot 整合 RabbitMQ 之 Topic转发模式 (二)
- Spring Boot教程(二)关于RabbitMQ服务器整合
- springboot2.x整合rabbitMQ:简单的生产者和消费者
- Spring Boot 整合 RabbitMQ
- Spring Boot整合RabbitMQ实例
- SpringBoot非官方教程 | 第十五篇:Springboot整合RabbitMQ
- SpringBoot整合RabbitMQ
- springboot整合rabbitmq入门(一)
- Spring-Boot整合RabbitMQ
- spring boot整合RabbitMQ(Direct模式)