您的位置:首页 > 编程语言 > Java开发

springboot+rabbitmq整合

2018-10-19 13:06 786 查看

1.安装好rabbitmq

2.新建一个springBoot项目:rabbitmq_demo

3.添加pom依赖:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>

4.application.properties:

server.port=8080
spring.application.name=rabbitmq_demo
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/

5.启动类声明一个Queue,用于测试:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
@Bean
public Queue helloQueue() {
return new Queue("helloQueue");
}

public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args);
}
}

多场景实现:

1.单生产者和单消费者

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description:
* 生产者1
*/
@Component
public class Sender1 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello1 " + new Date();
System.out.println("Sender1:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}
}

    消费者1:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* helloQueue消费者1
*/
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1:" + hello);
}
}

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @Description: 测试类
*/
@RestController
public class RabbitController {

@Autowired
private Sender1 helloSender1;

@RequestMapping("/hello")
public String hello() {
helloSender1.send();
return "ok";
}
}

    运行项目,访问http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017

    Receiver1:hello1 Thu May 11 17:23:31 CST 2017

2.单生产者-多消费者

    生产者1不变

    增加消费者2:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* helloQueue消费者2
*/
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
@RabbitHandler
public void process(String mesg) {
System.out.println("Receiver2:" + mesg);
}
}

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @Description: 测试类
*/
@RestController
public class RabbitController {

@Autowired
private Sender1 helloSender1;

@RequestMapping("/hello")
public String hello() {
helloSender1.send();
helloSender1.send();
return "ok";
}
}

    运行项目,访问http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017
 Sender1:hello1 Thu May 11 17:23:31 CST 2017

    Receiver1:hello1 Thu May 11 17:23:31 CST 2017
    Receiver2:hello1 Thu May 11 17:23:31 CST 2017

    消息会被多个消费者交替消费,每条消息只能被一个消费者所接收。

3.多生产者-多消费者

    增加生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description: 生产者2
*/
@Component
public class Sender2 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello2 " + new Date();
System.out.println("Sender2:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}
}

    消费者1、2不变

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @Description: 测试类
*/
@RestController
public class RabbitController {

@Autowired
private Sender1 helloSender1;

@Autowired
private Sender2 helloSender2;

@RequestMapping("/hello")
public String hello() {
helloSender1.send();
helloSender2.send();
return "ok";
}
}

    运行项目,访问http:localhost:8080/hello :

    Sender1:hello1 Thu May 11 17:23:31 CST 2017
 Sender2:hello2 Thu May 11 17:23:31 CST 2017

    Receiver1:hello2 Thu May 11 17:23:31 CST 2017
    Receiver2:hello1 Thu May 11 17:23:31 CST 2017

    多个生产者将消息放入helloQueue的队列中,队列中的消息会被多个消费者交替消费,每条消息只能被一个消费者所接收。

4.实体类传输

    支持对象的发送和接收,实体类只需要支持序列化即可。

    实体类

package com.demo.model;

import java.io.Serializable;

/**
* @Description:
*/
public class User implements Serializable {
private String userName;

private String password;

private String sex;

private String level;

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getSex() {
return sex;
}

public void setSex(String sex) {
this.sex = sex;
}

public String getLevel() {
return level;
}

public void setLevel(String level) {
this.level = level;
}
}

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description:
* 生产者1
*/
@Component
public class Sender1 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello1 " + new Date();
System.out.println("Sender1:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

public void sendUser(User user){
System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
rabbitTemplate.convertAndSend("helloQueue", user);
}
}

    生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description: 生产者2
*/
@Component
public class Sender2 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello2 " + new Date();
System.out.println("Sender2:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

public void sendUser(User user) {
System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
rabbitTemplate.convertAndSend("helloQueue", user);
}
}

    消费者1:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* helloQueue消费者1
*/
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1:" + hello);
}

@RabbitHandler
public void processUser(User user) {
System.out.println("user receive1:" + user.getUserName()+"/"+user.getPassword());
}
}

    消费者2:

package com.demo.receiver;

import com.demo.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* helloQueue消费者2
*/
@Component
@RabbitListener(queues = "helloQueue")
public class HelloReceiver2 {
@RabbitHandler
public void process(String mesg) {
System.out.println("Receiver2:" + mesg);
}

@RabbitHandler
public void processUser(User user) {
System.out.println("user receive2:" + user.getUserName()+"/"+user.getPassword());
}
}

    测试的controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @Description: 测试类
*/
@RestController
public class RabbitController {

@Autowired
private Sender1 helloSender1;

@Autowired
private Sender2 helloSender2;

@RequestMapping("/hello")
public String hello() {
helloSender1.send();
helloSender2.send();
return "ok";
}

@RequestMapping("/user")
public String user() {
User user=new User();
user.setUserName("a");
user.setPassword("1");
user.setSex("m");
user.setLevel("1");
helloSender1.sendUser(user);
helloSender2.sendUser(user);
return "ok";
}
}

    运行项目,访问http:localhost:8080/user :

    user Sender1:a/1
 user Sender2:a/1

    user receive1:a/1
    user receive2:a/1

5.TopicExchange的使用

    启动类新增声明两个Queue,用于测试:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
/***************************************队列***********************************************/
@Bean
public Queue helloQueue() {
return new Queue("helloQueue");
}

@Bean
public Queue topicMessage() {
return new Queue("topicMessage");
}

@Bean
public Queue topicMessages() {
return new Queue("topicMessages");
}
/***************************************exchange***********************************************/
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}

/***************************************将队列和exchange绑定***********************************************/

/**
* 将队列topicMessage与topicExchange绑定,
* 只有栏目名为topic.Message才能匹配,
* 得到当前的Queue
* @param topicMessage
* @param topicExchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {
return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");
}

/**
* 将队列topicMessages与topicExchange绑定,
* 以topic开头的栏目名均会模糊匹配,
* 得到当前的Queue
* @param topicMessages
* @param topicExchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {
return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");
}

public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args);
}
}

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description:
* 生产者1
*/
@Component
public class Sender1 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello1 " + new Date();
System.out.println("Sender1:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

public void sendUser(User user){
System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
rabbitTemplate.convertAndSend("helloQueue", user);
}

public void testTopPicMessage() {
String msg = "sendTopPicMessage";
System.out.println("sendTopPicMessage1:" + msg);
//第一个参数:指定了exchange
//第二个参数:指定了接受消息的栏目名
//第三个参数:消息内容
//到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

msg = "sendTopPicMessages";
System.out.println("sendTopPicMessages1:" + msg);
rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
}
}

    生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description: 生产者2
*/
@Component
public class Sender2 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello2 " + new Date();
System.out.println("Sender2:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

public void sendUser(User user) {
System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
rabbitTemplate.convertAndSend("helloQueue", user);
}

public void testTopPicMessage() {
String msg = "sendTopPicMessage";
System.out.println("sendTopPicMessage2:" + msg);
//第一个参数:指定了exchange
//第二个参数:指定了接受消息的栏目名
//第三个参数:消息内容
//到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

msg = "sendTopPicMessages";
System.out.println("sendTopPicMessages2:" + msg);
rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
}
}

    topicMessage消费者:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* topicMessage消费者
*/
@Component
@RabbitListener(queues = "topicMessage")
public class TopMessageReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topMessageReceiver:" +msg);
}
}

     topicMessages消费者:

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* topicMessages消费者
*/
@Component
@RabbitListener(queues = "topicMessages")
public class TopMessagesReceiver {
@RabbitHandler
public void process(String msg) {
System.out.println("topMessagesReceiver:" +msg);
}
}

    测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @Description: 测试类
*/
@RestController
public class RabbitController {

@Autowired
private Sender1 helloSender1;

@Autowired
private Sender2 helloSender2;

@RequestMapping("/hello")
public String hello() {
helloSender1.send();
helloSender2.send();
return "ok";
}

@RequestMapping("/user")
public String user() {
User user=new User();
user.setUserName("a");
user.setPassword("1");
user.setSex("m");
user.setLevel("1");
helloSender1.sendUser(user);
helloSender2.sendUser(user);
return "ok";
}

@RequestMapping("/topMessage")
public String topMessage() {
helloSender1.testTopPicMessage();
helloSender2.testTopPicMessage();
return "ok";
}
}

    运行项目,访问http:localhost:8080/topMessage :

    sendTopPicMessage1:sendTopPicMessage
sendTopPicMessages1:sendTopPicMessages

    sendTopPicMessage2:sendTopPicMessage
sendTopPicMessages2:sendTopPicMessages

 topMessageReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessages

    topMessageReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessage
    topMessagesReceiver:sendTopPicMessages

    通过exchange发送的每条消息,所有的消费者都能收到。

需要注意:

    rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合,因此两个消费者都收到消息
    rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合,只有topMessages符合接受消息的条件

6.FanoutExchange的使用

    启动类新增声明三个Queue,用于测试:

package com;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class RabbitmqDemoApplication {
/***************************************队列***********************************************/
@Bean
public Queue helloQueue() {
return new Queue("helloQueue");
}

@Bean
public Queue topicMessage() {
return new Queue("topicMessage");
}

@Bean
public Queue topicMessages() {
return new Queue("topicMessages");
}

@Bean
public Queue fanoutA() {
return new Queue("fanoutA");
}

@Bean
public Queue fanoutB() {
return new Queue("fanoutB");
}

@Bean
public Queue fanoutC() {
return new Queue("fanoutC");
}
/***************************************exchange***********************************************/
@Bean
TopicExchange topicExchange() {
return new TopicExchange("topicExchange");
}

@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}

/***************************************将队列和exchange绑定***********************************************/

/**
* 将队列topicMessage与topicExchange绑定,
* 只有栏目名为topic.Message才能匹配,
* 得到当前的Queue
* @param topicMessage
* @param topicExchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue topicMessage, TopicExchange topicExchange) {
return BindingBuilder.bind(topicMessage).to(topicExchange).with("topic.Message");
}

/**
* 将队列topicMessages与topicExchange绑定,
* 以topic开头的栏目名均会模糊匹配,
* 得到当前的Queue
* @param topicMessages
* @param topicExchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue topicMessages, TopicExchange topicExchange) {
return BindingBuilder.bind(topicMessages).to(topicExchange).with("topic.#");
}

/**
* 将队列fanoutA与fanoutExchange绑定
*
* @param fanoutA
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeA(Queue fanoutA, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutA).to(fanoutExchange);
}

/**
* 将队列fanoutA与fanoutExchange绑定
*
* @param fanoutB
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeB(Queue fanoutB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutB).to(fanoutExchange);
}

/**
* 将队列fanoutA与fanoutExchange绑定
*
* @param fanoutC
* @param fanoutExchange
* @return
*/
@Bean
Binding bindingExchangeC(Queue fanoutC, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutC).to(fanoutExchange);
}

public static void main(String[] args) {
SpringApplication.run(RabbitmqDemoApplication.class, args);
}
}

    生产者1:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description:
* 生产者1
*/
@Component
public class Sender1 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello1 " + new Date();
System.out.println("Sender1:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

public void sendUser(User user){
System.out.println("user Sender1:" + user.getUserName()+"/"+user.getPassword());
rabbitTemplate.convertAndSend("helloQueue", user);
}

public void testTopPicMessage() {
String msg = "sendTopPicMessage";
System.out.println("sendTopPicMessage1:" + msg);
//第一个参数:指定了exchange
//第二个参数:指定了接受消息的栏目名
//第三个参数:消息内容
//到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

msg = "sendTopPicMessages";
System.out.println("sendTopPicMessages1:" + msg);
rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
}

public void testFanoutMessage(){
String sendMsg = "sendFanoutMessage";
System.out.println("fanout Sender1:" + sendMsg);
//第二个参数不会进行正则表达式的过滤
//但是必须要填,才能根据exchange找到相关Queue
rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);
}
}

    生产者2:

package com.demo.sender;

import com.demo.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* @Description: 生产者2
*/
@Component
public class Sender2 {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String sendMsg = "hello2 " + new Date();
System.out.println("Sender2:" + sendMsg);
rabbitTemplate.convertAndSend("helloQueue", sendMsg);
}

public void sendUser(User user) {
System.out.println("user Sender2:" + user.getUserName() + "/" + user.getPassword());
rabbitTemplate.convertAndSend("helloQueue", user);
}

public void testTopPicMessage() {
String msg = "sendTopPicMessage";
System.out.println("sendTopPicMessage2:" + msg);
//第一个参数:指定了exchange
//第二个参数:指定了接受消息的栏目名
//第三个参数:消息内容
//到指定exchange找出第二个参数符合的正则表达式,得到对应的Queue,监听相应Queue的消费者接受到消息
rabbitTemplate.convertAndSend("topicExchange", "topic.Message", msg);//topic.Message、topic.#两个都符合

msg = "sendTopPicMessages";
System.out.println("sendTopPicMessages2:" + msg);
rabbitTemplate.convertAndSend("topicExchange", "topic.Messages", msg);//只有topic.#符合
}

public void testFanoutMessage(){
String sendMsg = "sendFanoutMessage";
System.out.println("fanout Sender2:" + sendMsg);
//第二个参数不会进行正则表达式的过滤
//但是必须要填,才能根据exchange找到相关Queue
rabbitTemplate.convertAndSend("fanoutExchange","", sendMsg);
}
}

    fanoutA消费者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* fanoutA消费者
*/
@Component
@RabbitListener(queues = "fanoutA")
public class FanoutReceiverA {

@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverA:" + msg);
}

}

    fanoutB消费者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* fanoutB消费者
*/
@Component
@RabbitListener(queues = "fanoutB")
public class FanoutReceiverB {

@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverB:" + msg);
}

}

    fanoutC消费者

package com.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @Description:
* fanoutC消费者
*/
@Component
@RabbitListener(queues = "fanoutC")
public class FanoutReceiverC {

@RabbitHandler
public void process(String msg) {
System.out.println("FanoutReceiverC:" + msg);
}

}

  测试controller:

package com.demo.controller;

import com.demo.model.User;
import com.demo.sender.Sender1;
import com.demo.sender.Sender2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* @Description: 测试类
*/
@RestController
public class RabbitController {

@Autowired
private Sender1 helloSender1;

@Autowired
private Sender2 helloSender2;

@RequestMapping("/hello")
public String hello() {
helloSender1.send();
helloSender2.send();
return "ok";
}

@RequestMapping("/user")
public String user() {
User user=new User();
user.setUserName("a");
user.setPassword("1");
user.setSex("m");
user.setLevel("1");
helloSender1.sendUser(user);
helloSender2.sendUser(user);
return "ok";
}

@RequestMapping("/topMessage")
public String topMessage() {
helloSender1.testTopPicMessage();
helloSender2.testTopPicMessage();
return "ok";
}

@RequestMapping("/fanoutMessage")
public String fanoutMessage() {
helloSender1.testFanoutMessage();
helloSender2.testFanoutMessage();
return "ok";
}
}

    运行项目,访问http:localhost:8080/fanoutMessage :

    fanout Sender1:sendFanoutMessage
fanout Sender2:sendFanoutMessage
   
 FanoutReceiverA:sendFanoutMessage
    FanoutReceiverB:sendFanoutMessage
    FanoutReceiverC:sendFanoutMessage

    FanoutReceiverA:sendFanoutMessage
    FanoutReceiverB:sendFanoutMessage
    FanoutReceiverC:sendFanoutMessage

    通过exchange发送的每条消息,所有的消费者都能收到。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息