springboot+rabbitmq整合
1.安装好rabbitmq
2.新建一个springBoot项目:rabbitmq_demo
3.添加pom依赖:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies>
4.application.properties:
server.port=8080 spring.application.name=rabbitmq_demo spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
5.启动类声明一个Queue,用于测试:
package com; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqDemoApplication { @Bean public Queue helloQueue() { return new Queue("helloQueue"); } public static void main(String[] args) { SpringApplication.run(RabbitmqDemoApplication.class, args); } }
多场景实现:
1.单生产者和单消费者
生产者1:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生产者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } }
消费者1:
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消费者1 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1:" + hello); } }
测试controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 测试类 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @RequestMapping("/hello") public String hello() { helloSender1.send(); return "ok"; } }
运行项目,访问http:localhost:8080/hello :
Sender1:hello1 Thu May 11 17:23:31 CST 2017 Receiver1:hello1 Thu May 11 17:23:31 CST 2017
2.单生产者-多消费者
生产者1不变
增加消费者2:
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消费者2 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver2 { @RabbitHandler public void process(String mesg) { System.out.println("Receiver2:" + mesg); } }
测试controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 测试类 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender1.send(); return "ok"; } }
运行项目,访问http:localhost:8080/hello :
Sender1:hello1 Thu May 11 17:23:31 CST 2017 Sender1:hello1 Thu May 11 17:23:31 CST 2017 Receiver1:hello1 Thu May 11 17:23:31 CST 2017 Receiver2:hello1 Thu May 11 17:23:31 CST 2017
消息会被多个消费者交替消费,每条消息只能被一个消费者所接收。
3.多生产者-多消费者
增加生产者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生产者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } }
消费者1、2不变
测试controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 测试类 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } }
运行项目,访问http:localhost:8080/hello :
Sender1:hello1 Thu May 11 17:23:31 CST 2017 Sender2:hello2 Thu May 11 17:23:31 CST 2017 Receiver1:hello2 Thu May 11 17:23:31 CST 2017 Receiver2:hello1 Thu May 11 17:23:31 CST 2017
多个生产者将消息放入helloQueue的队列中,队列中的消息会被多个消费者交替消费,每条消息只能被一个消费者所接收。
4.实体类传输
支持对象的发送和接收,实体类只需要支持序列化即可。
实体类
package com.demo.model; import java.io.Serializable; /** * @Description: */ public class User implements Serializable { private String userName; private String password; private String sex; private String level; public String getUserName() { return userName; } public void setUserName(String userName) { this.userName = userName; } public String getPassword() { return password; } public void setPassword(String password) { this.password = password; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public String getLevel() { return level; } public void setLevel(String level) { this.level = level; } }
生产者1:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生产者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user){ System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } }
生产者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生产者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user) { System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } }
消费者1:
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消费者1 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver1 { @RabbitHandler public void process(String hello) { System.out.println("Receiver1:" + hello); } @RabbitHandler public void processUser(User user) { System.out.println("user receive1:" + user.getUserName()+"/"+user.getPassword()); } }
消费者2:
package com.demo.receiver; import com.demo.model.User; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * helloQueue消费者2 */ @Component @RabbitListener(queues = "helloQueue") public class HelloReceiver2 { @RabbitHandler public void process(String mesg) { System.out.println("Receiver2:" + mesg); } @RabbitHandler public void processUser(User user) { System.out.println("user receive2:" + user.getUserName()+"/"+user.getPassword()); } }
测试的controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 测试类 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } @RequestMapping("/user") public String user() { User user=new User(); user.setUserName("a"); user.setPassword("1"); user.setSex("m"); user.setLevel("1"); helloSender1.sendUser(user); helloSender2.sendUser(user); return "ok"; } }
运行项目,访问http:localhost:8080/user :
user Sender1:a/1 user Sender2:a/1 user receive1:a/1 user receive2:a/1
5.TopicExchange的使用
启动类新增声明两个Queue,用于测试:
package com; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqDemoApplication { /***************************************队列***********************************************/ @Bean public Queue helloQueue() { return new Queue("helloQueue"); } @Bean public Queue topicMessage() { return new Queue("topicMessage"); } @Bean public Queue topicMessages() { return new Queue("topicMessages"); } /***************************************exchange***********************************************/ @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } /***************************************将队列和exchange绑定***********************************************/ /** * 将队列topicMessage与topicExchange绑定, * 只有栏目名为topic.Message才能匹配, * 得到当前的Queue * @param topicMessage * @param topicExchange * @return */ @Bean Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message"); } /** * 将队列topicMessages与topicExchange绑定, * 以topic开头的栏目名均会模糊匹配, * 得到当前的Queue * @param topicMessages * @param topicExchange * @return */ @Bean Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#"); } public static void main(String[] args) { SpringApplication.run(RabbitmqDemoApplication.class, args); } }
生产者1:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生产者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user){ System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage1:" + msg); //第一个参数:指定了exchange //第二个参数:指定了接受消息的栏目名 //第三个参数:消息内容 //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages1:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } }
生产者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生产者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user) { System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage2:" + msg); //第一个参数:指定了exchange //第二个参数:指定了接受消息的栏目名 //第三个参数:消息内容 //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages2:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } }
topicMessage消费者:
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * topicMessage消费者 */ @Component @RabbitListener(queues = "topicMessage") public class TopMessageReceiver { @RabbitHandler public void process(String msg) { System.out.println("topMessageReceiver:" +msg); } }
topicMessages消费者:
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * topicMessages消费者 */ @Component @RabbitListener(queues = "topicMessages") public class TopMessagesReceiver { @RabbitHandler public void process(String msg) { System.out.println("topMessagesReceiver:" +msg); } }
测试controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 测试类 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } @RequestMapping("/user") public String user() { User user=new User(); user.setUserName("a"); user.setPassword("1"); user.setSex("m"); user.setLevel("1"); helloSender1.sendUser(user); helloSender2.sendUser(user); return "ok"; } @RequestMapping("/topMessage") public String topMessage() { helloSender1.testTopPicMessage(); helloSender2.testTopPicMessage(); return "ok"; } }
运行项目,访问http:localhost:8080/topMessage :
sendTopPicMessage1:sendTopPicMessage sendTopPicMessages1:sendTopPicMessages sendTopPicMessage2:sendTopPicMessage sendTopPicMessages2:sendTopPicMessages topMessageReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessages topMessageReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessage topMessagesReceiver:sendTopPicMessages
通过exchange发送的每条消息,所有的消费者都能收到。
需要注意:
rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合,因此两个消费者都收到消息
rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合,只有topMessages符合接受消息的条件
6.FanoutExchange的使用
启动类新增声明三个Queue,用于测试:
package com; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; @SpringBootApplication public class RabbitmqDemoApplication { /***************************************队列***********************************************/ @Bean public Queue helloQueue() { return new Queue("helloQueue"); } @Bean public Queue topicMessage() { return new Queue("topicMessage"); } @Bean public Queue topicMessages() { return new Queue("topicMessages"); } @Bean public Queue fanoutA() { return new Queue("fanoutA"); } @Bean public Queue fanoutB() { return new Queue("fanoutB"); } @Bean public Queue fanoutC() { return new Queue("fanoutC"); } /***************************************exchange***********************************************/ @Bean TopicExchange topicExchange() { return new TopicExchange("topicExchange"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } /***************************************将队列和exchange绑定***********************************************/ /** * 将队列topicMessage与topicExchange绑定, * 只有栏目名为topic.Message才能匹配, * 得到当前的Queue * @param topicMessage * @param topicExchange * @return */ @Bean Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message"); } /** * 将队列topicMessages与topicExchange绑定, * 以topic开头的栏目名均会模糊匹配, * 得到当前的Queue * @param topicMessages * @param topicExchange * @return */ @Bean Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) { return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#"); } /** * 将队列fanoutA与fanoutExchange绑定 * * @param fanoutA * @param fanoutExchange * @return */ @Bean Binding bindingExchangeA(Queue fanoutA, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutA).to(fanoutExchange); } /** * 将队列fanoutA与fanoutExchange绑定 * * @param fanoutB * @param fanoutExchange * @return */ @Bean Binding bindingExchangeB(Queue fanoutB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutB).to(fanoutExchange); } /** * 将队列fanoutA与fanoutExchange绑定 * * @param fanoutC * @param fanoutExchange * @return */ @Bean Binding bindingExchangeC(Queue fanoutC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanoutC).to(fanoutExchange); } public static void main(String[] args) { SpringApplication.run(RabbitmqDemoApplication.class, args); } }
生产者1:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: * 生产者1 */ @Component public class Sender1 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello1 " + new Date(); System.out.println("Sender1:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user){ System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage1:" + msg); //第一个参数:指定了exchange //第二个参数:指定了接受消息的栏目名 //第三个参数:消息内容 //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages1:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } public void testFanoutMessage(){ String sendMsg = "sendFanoutMessage"; System.out.println("fanout Sender1:" + sendMsg); //第二个参数不会进行正则表达式的过滤 //但是必须要填,才能根据exchange找到相关Queue rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg); } }
生产者2:
package com.demo.sender; import com.demo.model.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Date; /** * @Description: 生产者2 */ @Component public class Sender2 { @Autowired private AmqpTemplate rabbitTemplate; public void send() { String sendMsg = "hello2 " + new Date(); System.out.println("Sender2:" + sendMsg); rabbitTemplate.convertAndSend("helloQueue", sendMsg); } public void sendUser(User user) { System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword()); rabbitTemplate.convertAndSend("helloQueue", user); } public void testTopPicMessage() { String msg = "sendTopPicMessage"; System.out.println("sendTopPicMessage2:" + msg); //第一个参数:指定了exchange //第二个参数:指定了接受消息的栏目名 //第三个参数:消息内容 //到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息 rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合 msg = "sendTopPicMessages"; System.out.println("sendTopPicMessages2:" + msg); rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合 } public void testFanoutMessage(){ String sendMsg = "sendFanoutMessage"; System.out.println("fanout Sender2:" + sendMsg); //第二个参数不会进行正则表达式的过滤 //但是必须要填,才能根据exchange找到相关Queue rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg); } }
fanoutA消费者
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * fanoutA消费者 */ @Component @RabbitListener(queues = "fanoutA") public class FanoutReceiverA { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverA:" + msg); } }
fanoutB消费者
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * fanoutB消费者 */ @Component @RabbitListener(queues = "fanoutB") public class FanoutReceiverB { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverB:" + msg); } }
fanoutC消费者
package com.demo.receiver; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Description: * fanoutC消费者 */ @Component @RabbitListener(queues = "fanoutC") public class FanoutReceiverC { @RabbitHandler public void process(String msg) { System.out.println("FanoutReceiverC:" + msg); } }
测试controller:
package com.demo.controller; import com.demo.model.User; import com.demo.sender.Sender1; import com.demo.sender.Sender2; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @Description: 测试类 */ @RestController public class RabbitController { @Autowired private Sender1 helloSender1; @Autowired private Sender2 helloSender2; @RequestMapping("/hello") public String hello() { helloSender1.send(); helloSender2.send(); return "ok"; } @RequestMapping("/user") public String user() { User user=new User(); user.setUserName("a"); user.setPassword("1"); user.setSex("m"); user.setLevel("1"); helloSender1.sendUser(user); helloSender2.sendUser(user); return "ok"; } @RequestMapping("/topMessage") public String topMessage() { helloSender1.testTopPicMessage(); helloSender2.testTopPicMessage(); return "ok"; } @RequestMapping("/fanoutMessage") public String fanoutMessage() { helloSender1.testFanoutMessage(); helloSender2.testFanoutMessage(); return "ok"; } }
运行项目,访问http:localhost:8080/fanoutMessage :
fanout Sender1:sendFanoutMessage fanout Sender2:sendFanoutMessage FanoutReceiverA:sendFanoutMessage FanoutReceiverB:sendFanoutMessage FanoutReceiverC:sendFanoutMessage FanoutReceiverA:sendFanoutMessage FanoutReceiverB:sendFanoutMessage FanoutReceiverC:sendFanoutMessage
通过exchange发送的每条消息,所有的消费者都能收到。
- spring boot整合activemq rabbitmq
- Springboot+RabbitMQ整合示例
- springboot+rabbitmq整合示例程
- SpringBoot RabbitMQ 整合进阶版
- springboot+rabbitMq整合开发实战二:模拟用户下单的过程
- Springboot+RabbitMQ整合
- springboot+rabbitmq整合示例程
- springboot+rabbitmq整合示例程
- springboot+rabbitMq整合开发实战一
- springboot+rabbitmq 整合实例
- springboot+rabbitmq例子
- Springboot + rabbitMQ实现延迟消费以及spring与策略模式联合处理不同的业务(二)
- SpringBoot + RabbitMQ 使用Demo
- springboot+dubbo+zookeeper+mybatis+redis+druid+rabbitmq
- 集群与负载均衡系列(6)——消息队列之rabbitMQ+spring-boot+spring amqp发送可靠的消息
- SpringBoot ActiveMQ 整合使用
- Spring Boot RabbitMQ快速入门 (1)
- springboot整合mq同时监听queue和topic
- Spring Boot RabbitMQ快速入门 (2)