Springboot+RabbitMQ整合示例
一、RabbitMQ简介
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
1、MQ特点: MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
2、含义:RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
3、概念:RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。
RabbitMQ的结构图如下:
Broker:简单来说就是消息队列服务器实体。
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
- Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
- Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
- vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
- producer:消息生产者,就是投递消息的程序。
- consumer:消息消费者,就是接受消息的程序。
- channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。
消息队列的使用过程大概如下:
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange,并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端使用routing key,在exchange和queue之间建立好绑定关系。
- 客户端投递消息到exchange。
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。
RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:
- exchange持久化,在声明时指定durable => 1
- queue持久化,在声明时指定durable => 1
- 消息持久化,在投递时指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。
4、安装(Centos7):https://blog.csdn.net/typ1805/article/details/82744899
访问:http://192.168.0.132:15672/#/queues
二、Springboot整合RabbitMQ
1、添加pom.xml依赖
[code]<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example.demo.rabbitmq</groupId> <artifactId>rabbitmq-demo</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>rabbitmq-demo</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.5.RELEASE</version> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2、application.yml配置文件主要是对rabbimq的配置信息
[code]server: port: 8081 spring: application: name: rabbitmq-demo rabbitmq: host: 192.168.0.132 port: 5672 username: admin password: admin
3、初始化创建队列、转发器,并把队列绑定到转发器(RabbitConfig.java)
[code]package com.example.demo.rabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.boot.SpringApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 路径:com.example.demo.rabbitmq.config * 类名: * 功能:队列配置 * 备注: * 创建人:typ * 创建时间:2018/9/23 21:46 * 修改人: * 修改备注: * 修改时间: */ @Configuration public class RabbitConfig { @Bean public Queue helloQueue() { return new Queue("hello"); } @Bean public Queue userQueue() { return new Queue("user"); } //===============以下是验证topic Exchange的队列========== @Bean public Queue queueMessage() { return new Queue("topic.message"); } @Bean public Queue queueMessages() { return new Queue("topic.messages"); } //===============以上是验证topic Exchange的队列========== //===============以下是验证Fanout Exchange的队列========== @Bean public Queue AMessage() { return new Queue("fanout.A"); } @Bean public Queue BMessage() { return new Queue("fanout.B"); } @Bean public Queue CMessage() { return new Queue("fanout.C"); } //===============以上是验证Fanout Exchange的队列========== @Bean TopicExchange exchange() { return new TopicExchange("exchange"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /** * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配 * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } /** * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配 * @param queueMessage * @param exchange * @return */ @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } @Bean Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
4、最简单的hello生产和消费实现(单生产者和单消费者)
生产者:
[code]package com.example.demo.rabbitmq.service.oneToOne; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:生产者 * 备注:单生产者-单消费者 * 创建人:typ * 创建时间:2018/9/23 21:49 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloSender { private static final Logger log = LoggerFactory.getLogger(HelloSender.class); @Autowired public AmqpTemplate amqpTemplate; public void send(){ String context = "hello " + new Date(); log.info("Sender:" + context); this.amqpTemplate.convertAndSend("hello",context); } }
消费者:
[code]package com.example.demo.rabbitmq.service.oneToOne; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:消费者 * 备注:单生产者-单消费者 * 创建人:typ * 创建时间:2018/9/23 22:14 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloReceiver { private static final Logger log = LoggerFactory.getLogger(HelloReceiver.class); //监听器监听指定的Queue @RabbitListener(queues="hello") public void process(String hello){ log.info("Receiver:"+hello); } }
controller测试:
[code]package com.example.demo.rabbitmq.controller; import com.example.demo.rabbitmq.service.oneToOne.HelloSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * 路径:com.example.demo.rabbitmq.controller * 类名: * 功能:《用一句描述一下》 * 备注:单生产者-单消费者 * 创建人:typ * 创建时间:2018/9/23 22:35 * 修改人: * 修改备注: * 修改时间: */ @RestController public class RabbitOneToOneTest { @Autowired private HelloSender helloSender; @PostMapping("/hello") public void hello(){ helloSender.send(); } }
启动程序,执行:
结果如下:
[code]Sender : hello1 Thu September 24 17:23:31 CST 2018 Receiver : hello1 Thu September 24 17:23:31 CST 2018
5、单生产者-多消费者
生产者:
[code]package com.example.demo.rabbitmq.service.oneToMany; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:生产者 * 备注:单生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 21:49 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloSender1 { private static final Logger log = LoggerFactory.getLogger(HelloSender1.class); @Autowired public AmqpTemplate amqpTemplate; public void send(String msg){ String context = msg + new Date(); log.info("Sender1:" + context); this.amqpTemplate.convertAndSend("hello",context); } }
消费者1:
[code]package com.example.demo.rabbitmq.service.oneToMany; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:消费者1 * 备注:单生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 22:14 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloReceiver1 { private static final Logger log = LoggerFactory.getLogger(HelloReceiver1.class); //监听器监听指定的Queue @RabbitListener(queues="hello") public void process(String hello){ log.info("Receiver1:"+hello); } }
消费者2:
[code]package com.example.demo.rabbitmq.service.oneToMany; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:消费者2 * 备注:单生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 22:14 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloReceiver2 { private static final Logger log = LoggerFactory.getLogger(HelloReceiver2.class); //监听器监听指定的Queue @RabbitListener(queues="hello") public void process(String hello){ log.info("Receiver2:"+hello); } }
controller测试:
[code]package com.example.demo.rabbitmq.controller; import com.example.demo.rabbitmq.service.oneToMany.HelloSender1; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * 路径:com.example.demo.rabbitmq.controller * 类名: * 功能:《用一句描述一下》 * 备注:单生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 22:35 * 修改人: * 修改备注: * 修改时间: */ @RestController public class RabbitOneToManyTest { @Autowired private HelloSender1 helloSender; /** * 方法名: * 功能:单生产者-多消费者 * 描述: * 创建人:typ * 创建时间:2018/9/23 22:46 * 修改人: * 修改描述: * 修改时间: */ @PostMapping("/oneToMany") public void ontToMany(){ for (int i=0;i<10;i++){ helloSender.send("hello smg:"+i); } } }
6、多生产者-多消费者
生产者1:
[code]package com.example.demo.rabbitmq.service.manyToMany; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:生产者1 * 备注:多生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 21:49 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloSenderA { private static final Logger log = LoggerFactory.getLogger(HelloSenderA.class); @Autowired public AmqpTemplate amqpTemplate; public void send(String msg){ String context = msg + new Date(); log.info("SenderA:" + context); this.amqpTemplate.convertAndSend("hello",context); } }
生产者2:
[code]package com.example.demo.rabbitmq.service.manyToMany; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:生产者2 * 备注:多生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 21:49 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloSenderB { private static final Logger log = LoggerFactory.getLogger(HelloSenderB.class); @Autowired public AmqpTemplate amqpTemplate; public void send(String msg){ String context = msg + new Date(); log.info("SenderB:" + context); this.amqpTemplate.convertAndSend("hello",context); } }
消费者1:
[code]package com.example.demo.rabbitmq.service.manyToMany; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:消费者1 * 备注:多生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 22:14 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloReceiverA { private static final Logger log = LoggerFactory.getLogger(HelloReceiverA.class); //监听器监听指定的Queue @RabbitListener(queues="hello") public void process(String hello){ log.info("ReceiverA:"+hello); } }
消费者2:
[code]package com.example.demo.rabbitmq.service.manyToMany; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service * 类名: * 功能:消费者2 * 备注:多生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 22:14 * 修改人: * 修改备注: * 修改时间: */ @Component public class HelloReceiverB { private static final Logger log = LoggerFactory.getLogger(HelloReceiverB.class); //监听器监听指定的Queue @RabbitListener(queues="hello") public void process(String hello){ log.info("ReceiverB:"+hello); } }
controller测试:
[code]package com.example.demo.rabbitmq.controller; import com.example.demo.rabbitmq.service.manyToMany.HelloSenderA; import com.example.demo.rabbitmq.service.manyToMany.HelloSenderB; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * 路径:com.example.demo.rabbitmq.controller * 类名: * 功能:《用一句描述一下》 * 备注:多生产者-多消费者 * 创建人:typ * 创建时间:2018/9/23 22:35 * 修改人: * 修改备注: * 修改时间: */ @RestController public class RabbitManyToManyTest { @Autowired private HelloSenderA helloSenderA; @Autowired private HelloSenderB helloSenderB; /** * 方法名: * 功能:多生产者-多消费者 * 描述: * 创建人:typ * 创建时间:2018/9/23 22:46 * 修改人: * 修改描述: * 修改时间: */ @PostMapping("/manyToMany") public void ontToMany(){ for (int i=0;i<10;i++){ helloSenderA.send("hello smg:"+i); helloSenderB.send("hello smg:"+i); } } }
7、实体类传输,springboot完美的支持对象的发送和接收,不需要格外的配置。
实体类(必须实现序列化接口):
[code]package com.example.demo.rabbitmq.service.entity; import java.io.Serializable; /** * 路径:com.example.demo.rabbitmq.service.entity * 类名: * 功能:《用一句描述一下》 * 备注: * 创建人:typ * 创建时间:2018/9/24 19:59 * 修改人: * 修改备注: * 修改时间: */ public class User implements Serializable{ private String name; private String pass; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getPass() { return pass; } public void setPass(String pass) { this.pass = pass; } }
生产者:
[code]package com.example.demo.rabbitmq.service.entity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.entity * 类名: * 功能:实体类传输 * 备注:生产者 * 创建人:typ * 创建时间:2018/9/24 20:01 * 修改人: * 修改备注: * 修改时间: */ @Component public class UserSender { private static final Logger log = LoggerFactory.getLogger(UserSender.class); @Autowired private AmqpTemplate amqpTemplate; public void send() { User user = new User(); user.setName("test"); user.setPass("123456"); log.info("user Sender:" + user.getName() + "," + user.getPass()); amqpTemplate.convertAndSend("user", user); } }
消费者:
[code]package com.example.demo.rabbitmq.service.entity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.entity * 类名: * 功能:实体类传输 * 备注:消费者 * 创建人:typ * 创建时间:2018/9/24 20:07 * 修改人: * 修改备注: * 修改时间: */ @Component @RabbitListener(queues = "user") public class UserReceiver { private static final Logger log = LoggerFactory.getLogger(UserReceiver.class); @RabbitHandler public void process(User user) { log.info("user Receive:" + user.getName() + "," + user.getPass()); } }
controller测试:
[code]package com.example.demo.rabbitmq.controller; import com.example.demo.rabbitmq.service.entity.UserSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * 路径:com.example.demo.rabbitmq.controller * 类名: * 功能:实体类传输测试 * 备注: * 创建人:typ * 创建时间:2018/9/24 20:09 * 修改人: * 修改备注: * 修改时间: */ @RestController public class RabbitUserTest { @Autowired private UserSender userSender; @PostMapping("/userTest") public void userTets(){ userSender.send(); } }
8、topic ExChange示例
topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列。首先对topic规则配置,这里使用两个队列来测试(也就是在Application类中创建和绑定的topic.message和topic.messages两个队列),其中topic.message的bindting_key为“topic.message”,topic.messages的binding_key为“topic.#”。
生产者:
[code]package com.example.demo.rabbitmq.service.topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.topic * 类名: * 功能:topic ExChange示例------生产者 * 备注:topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列 * 创建人:typ * 创建时间:2018/9/24 20:12 * 修改人: * 修改备注: * 修改时间: */ @Component public class TopicSender { private static final Logger log = LoggerFactory.getLogger(TopicSender.class); @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msg1 = "I am topic.mesaage msg1!"; log.info("sender1 : " + msg1); this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1); String msg2 = "I am topic.mesaages msg2!"; log.info("sender2 : " + msg2); this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2); } }
消费者1:
[code]package com.example.demo.rabbitmq.service.topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.topic * 类名: * 功能:topic ExChange示例 * 备注:消费者1(topic.message) * 创建人:typ * 创建时间:2018/9/24 20:12 * 修改人: * 修改备注: * 修改时间: */ @Component @RabbitListener(queues = "topic.message") public class TopicReceiver1 { private static final Logger log = LoggerFactory.getLogger(TopicReceiver1.class); @RabbitHandler public void process(String msg) { log.info("topicReceiver1: " +msg); } }
消费者2:
[code]package com.example.demo.rabbitmq.service.topic; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.topic * 类名: * 功能:topic ExChange示例 * 备注:消费者2(topic.messages) * 创建人:typ * 创建时间:2018/9/24 20:12 * 修改人: * 修改备注: * 修改时间: */ @Component @RabbitListener(queues = "topic.messages") public class TopicReceiver2 { private static final Logger log = LoggerFactory.getLogger(TopicReceiver2.class); @RabbitHandler public void process(String msg) { log.info("topicReceiver2 : " +msg); } }
controller测试:
[code]package com.example.demo.rabbitmq.controller; import com.example.demo.rabbitmq.service.topic.TopicSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * 路径:com.example.demo.rabbitmq.controller * 类名: * 功能:topic ExChange示例 * 备注: * 创建人:typ * 创建时间:2018/9/24 20:21 * 修改人: * 修改备注: * 修改时间: */ @RestController public class RabbitTopicTest { @Autowired private TopicSender topicSender; @PostMapping("/topicTest") public void topicTest(){ topicSender.send(); } }
9、fanout ExChange示例
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。
生产者:
[code]package com.example.demo.rabbitmq.service.fanout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.fanout * 类名: * 功能:fanout ExChange示例---生产者 * 备注:Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。 * 创建人:typ * 创建时间:2018/9/24 21:10 * 修改人: * 修改备注: * 修改时间: */ @Component public class FanoutSender { private static final Logger log = LoggerFactory.getLogger(FanoutSender.class); @Autowired private AmqpTemplate rabbitTemplate; public void send() { String msg="fanoutSender :hello i am fanout"; log.info(msg); this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msg); } }
消费者1:
[code]package com.example.demo.rabbitmq.service.fanout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.fanout * 类名: * 功能:fanout ExChange示例 * 备注:消费者A * 创建人:typ * 创建时间:2018/9/24 21:10 * 修改人: * 修改备注: * 修改时间: */ @Component @RabbitListener(queues = "fanout.A") public class FanoutReceiverA { private static final Logger log = LoggerFactory.getLogger(FanoutReceiverA.class); @RabbitHandler public void process(String msg) { log.info("FanoutReceiverA : " + msg); } }
消费者2:
[code]package com.example.demo.rabbitmq.service.fanout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.fanout * 类名: * 功能:fanout ExChange示例 * 备注:消费者B * 创建人:typ * 创建时间:2018/9/24 21:10 * 修改人: * 修改备注: * 修改时间: */ @Component @RabbitListener(queues = "fanout.B") public class FanoutReceiverB { private static final Logger log = LoggerFactory.getLogger(FanoutReceiverB.class); @RabbitHandler public void process(String msg) { log.info("FanoutReceiverB : " + msg); } }
消费者3:
[code]package com.example.demo.rabbitmq.service.fanout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.fanout * 类名: * 功能:fanout ExChange示例 * 备注:消费者C * 创建人:typ * 创建时间:2018/9/24 21:10 * 修改人: * 修改备注: * 修改时间: */ @Component @RabbitListener(queues = "fanout.C") public class FanoutReceiverC { private static final Logger log = LoggerFactory.getLogger(FanoutReceiverC.class); @RabbitHandler public void process(String msg) { log.info("FanoutReceiverC : " + msg); } }
controller测试:
[code]package com.example.demo.rabbitmq.controller; import com.example.demo.rabbitmq.service.fanout.FanoutSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * 路径:com.example.demo.rabbitmq.controller * 类名: * 功能:fanout ExChange示例 * 备注: * 创建人:typ * 创建时间:2018/9/24 22:11 * 修改人: * 修改备注: * 修改时间: */ @RestController public class RabbitFanoutTest { @Autowired private FanoutSender fanoutSender; @PostMapping("/fanoutTest") public void fanoutTest() { fanoutSender.send(); } }
10、callback的消息发送
增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。
rabbitmq配置类:
[code]package com.example.demo.rabbitmq.service.callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Scope; /** * 路径:com.example.demo.rabbitmq.service.callback * 类名: * 功能:增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。 * 备注: * 创建人:typ * 创建时间:2018/9/24 20:09 * 修改人: * 修改备注: * 修改时间: */ public class RabbitConfig { private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class); @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses+":"+port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); //如果要进行消息回调,则这里必须要设置为true connectionFactory.setPublisherConfirms(publisherConfirms); return connectionFactory; } //因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplatenew() { RabbitTemplate template = new RabbitTemplate(connectionFactory()); return template; } }
生产者:
[code]package com.example.demo.rabbitmq.service.callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; /** * 路径:com.example.demo.rabbitmq.service.callback * 类名:CallBackSender * 功能:callback的消息发送-----生产者 * 创建人:typ * 创建时间:2018/9/24 20:09 * 修改人: * 修改备注: * 修改时间: */ @Component public class CallBackSender implements RabbitTemplate.ConfirmCallback{ private static final Logger log = LoggerFactory.getLogger(CallBackSender.class); @Autowired private RabbitTemplate rabbitTemplatenew; public void send() { rabbitTemplatenew.setConfirmCallback(this); String msg="callbackSender : i am callback sender"; log.info(msg); CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("callbackSender UUID: " + correlationData.getId()); this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData); } public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("callbakck confirm: " + correlationData.getId()); } }
消费者:
[code]package com.example.demo.rabbitmq.service.callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * 路径:com.example.demo.rabbitmq.service.callback * 类名: * 功能:callback的消息发送 * 备注:消费者 * 创建人:typ * 创建时间:2018/9/24 20:12 * 修改人: * 修改备注: * 修改时间: */ @Component @RabbitListener(queues = "topic.messages") public class CallBackReceiver { private static final Logger log = LoggerFactory.getLogger(CallBackReceiver.class); @RabbitHandler public void process(String msg) { log.info("CallBackReceiver : " +msg); } }
controller测试:
[code]package com.example.demo.rabbitmq.controller; import com.example.demo.rabbitmq.service.callback.CallBackSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * 路径:com.example.demo.rabbitmq.controller * 类名: * 功能:callback的消息发送 * 备注: * 创建人:typ * 创建时间:2018/9/24 22:20 * 修改人: * 修改备注: * 修改时间: */ @RestController public class RabbitCallBackTest { @Autowired private CallBackSender callBackSender; //执行代码可以看出callbackSender发出的UUID,收到了回应,又传回来了。 @PostMapping("/callback") public void callbak() { callBackSender.send(); } }
阅读更多
- springboot+rabbitmq整合示例程
- springboot+rabbitmq整合示例程
- springboot+rabbitmq整合示例程
- SpringBoot RabbitMQ 整合进阶版
- springboot+rabbitMq整合开发实战一
- springboot+rabbitMq整合开发实战二:模拟用户下单的过程
- Springboot+RabbitMQ整合
- Spring Boot RabbitMQ 延迟消息实现完整版示例
- spring boot整合activemq rabbitmq
- springboot+rabbitmq 整合实例
- Springboot+rabbitmq如何实现高并发的rpc调用
- springboot整合rabbitmq的示例代码
- Spring Boot+RabbitMQ学习笔记
- Spring Boot+Swagger整合示例
- springboot RabbitMq的安装以及使用
- springboot2.0整合dubbo的示例代码
- Spring Boot整合Swagger框架示例项目。
- spring boot + rabbitmq实例
- SpringBoot RocketMQ 整合使用和监控
- SpringBoot+rabbitMq的配置和使用Demo