您的位置:首页 > 编程语言 > Java开发

Springboot+RabbitMQ整合示例

2018-09-25 10:58 633 查看
版权声明: https://blog.csdn.net/typ1805/article/details/82835318

一、RabbitMQ简介

         MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

1、MQ特点: MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

2、含义:RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统。他遵循Mozilla Public License开源协议

3、概念:RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。如果不熟悉AMQP,直接看RabbitMQ的文档会比较困难。不过它也只有几个关键概念,这里简单介绍。

RabbitMQ的结构图如下:

Broker:简单来说就是消息队列服务器实体。

  •   Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
  •   Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
  •   Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
  •   Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
  •   vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
  •   producer:消息生产者,就是投递消息的程序。
  •   consumer:消息消费者,就是接受消息的程序。
  •   channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

消息队列的使用过程大概如下:

  •        客户端连接到消息队列服务器,打开一个channel。
  •   客户端声明一个exchange,并设置相关属性。
  •   客户端声明一个queue,并设置相关属性。
  •   客户端使用routing key,在exchange和queue之间建立好绑定关系。
  •   客户端投递消息到exchange。

      exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

      exchange也有几个类型,完全根据key进行投递的叫做Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫做Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不需要key的,叫做Fanout交换机,它采取广播模式,一个消息进来时,投递到与该交换机绑定的所有队列。

       RabbitMQ支持消息的持久化,也就是数据写在磁盘上,为了数据安全考虑,我想大多数用户都会选择持久化。消息队列持久化包括3个部分:

  •   exchange持久化,在声明时指定durable => 1
  •   queue持久化,在声明时指定durable => 1
  •   消息持久化,在投递时指定delivery_mode => 2(1是非持久化)

        如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的。如果exchange和queue两者之间有一个持久化,一个非持久化,就不允许建立绑定。

4、安装(Centos7):https://blog.csdn.net/typ1805/article/details/82744899

访问:http://192.168.0.132:15672/#/queues

 

二、Springboot整合RabbitMQ

1、添加pom.xml依赖

[code]<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.example.demo.rabbitmq</groupId>
<artifactId>rabbitmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>rabbitmq-demo</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

2、application.yml配置文件主要是对rabbimq的配置信息

[code]server:
port: 8081

spring:
application:
name: rabbitmq-demo
rabbitmq:
host: 192.168.0.132
port: 5672
username: admin
password: admin

3、初始化创建队列、转发器,并把队列绑定到转发器(RabbitConfig.java)

[code]package com.example.demo.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.boot.SpringApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 路径:com.example.demo.rabbitmq.config
* 类名:
* 功能:队列配置
* 备注:
* 创建人:typ
* 创建时间:2018/9/23 21:46
* 修改人:
* 修改备注:
* 修改时间:
*/
@Configuration
public class RabbitConfig {

@Bean
public Queue helloQueue() {
return new Queue("hello");
}

@Bean
public Queue userQueue() {
return new Queue("user");
}

//===============以下是验证topic Exchange的队列==========
@Bean
public Queue queueMessage() {
return new Queue("topic.message");
}

@Bean
public Queue queueMessages() {
return new Queue("topic.messages");
}
//===============以上是验证topic Exchange的队列==========

//===============以下是验证Fanout Exchange的队列==========
@Bean
public Queue AMessage() {
return new Queue("fanout.A");
}

@Bean
public Queue BMessage() {
return new Queue("fanout.B");
}

@Bean
public Queue CMessage() {
return new Queue("fanout.C");
}
//===============以上是验证Fanout Exchange的队列==========

@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}

/**
* 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

/**
* 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}

@Bean
Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(AMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(BMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}
}

4、最简单的hello生产和消费实现(单生产者和单消费者)

生产者:

[code]package com.example.demo.rabbitmq.service.oneToOne;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:生产者
* 备注:单生产者-单消费者
* 创建人:typ
* 创建时间:2018/9/23 21:49
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloSender {

private static final Logger log = LoggerFactory.getLogger(HelloSender.class);

@Autowired
public AmqpTemplate amqpTemplate;

public void send(){
String context = "hello " + new Date();
log.info("Sender:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}

消费者:

[code]package com.example.demo.rabbitmq.service.oneToOne;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:消费者
* 备注:单生产者-单消费者
* 创建人:typ
* 创建时间:2018/9/23 22:14
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloReceiver {

private static final Logger log = LoggerFactory.getLogger(HelloReceiver.class);

//监听器监听指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("Receiver:"+hello);

}

}

controller测试:

[code]package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.oneToOne.HelloSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 路径:com.example.demo.rabbitmq.controller
* 类名:
* 功能:《用一句描述一下》
* 备注:单生产者-单消费者
* 创建人:typ
* 创建时间:2018/9/23 22:35
* 修改人:
* 修改备注:
* 修改时间:
*/
@RestController
public class RabbitOneToOneTest {

@Autowired
private HelloSender helloSender;

@PostMapping("/hello")
public void hello(){
helloSender.send();
}
}

启动程序,执行:

http://localhost:8081/hello

结果如下:

[code]Sender : hello1 Thu September 24 17:23:31 CST 2018
Receiver  : hello1 Thu September 24 17:23:31 CST 2018

5、单生产者-多消费者

生产者:

[code]package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:生产者
* 备注:单生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 21:49
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloSender1 {

private static final Logger log = LoggerFactory.getLogger(HelloSender1.class);

@Autowired
public AmqpTemplate amqpTemplate;

public void send(String msg){
String context = msg + new Date();
log.info("Sender1:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}

消费者1:

[code]package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:消费者1
* 备注:单生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 22:14
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloReceiver1 {

private static final Logger log = LoggerFactory.getLogger(HelloReceiver1.class);

//监听器监听指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("Receiver1:"+hello);
}

}

消费者2:

[code]package com.example.demo.rabbitmq.service.oneToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:消费者2
* 备注:单生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 22:14
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloReceiver2 {

private static final Logger log = LoggerFactory.getLogger(HelloReceiver2.class);

//监听器监听指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("Receiver2:"+hello);

}

}

controller测试:

[code]package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.oneToMany.HelloSender1;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 路径:com.example.demo.rabbitmq.controller
* 类名:
* 功能:《用一句描述一下》
* 备注:单生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 22:35
* 修改人:
* 修改备注:
* 修改时间:
*/
@RestController
public class RabbitOneToManyTest {

@Autowired
private HelloSender1 helloSender;

/**
* 方法名:
* 功能:单生产者-多消费者
* 描述:
* 创建人:typ
* 创建时间:2018/9/23 22:46
* 修改人:
* 修改描述:
* 修改时间:
*/
@PostMapping("/oneToMany")
public void ontToMany(){
for (int i=0;i<10;i++){
helloSender.send("hello smg:"+i);
}
}
}

6、多生产者-多消费者

生产者1:

[code]package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:生产者1
* 备注:多生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 21:49
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloSenderA {

private static final Logger log = LoggerFactory.getLogger(HelloSenderA.class);

@Autowired
public AmqpTemplate amqpTemplate;

public void send(String msg){
String context = msg + new Date();
log.info("SenderA:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}

生产者2:

[code]package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:生产者2
* 备注:多生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 21:49
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloSenderB {

private static final Logger log = LoggerFactory.getLogger(HelloSenderB.class);

@Autowired
public AmqpTemplate amqpTemplate;

public void send(String msg){
String context = msg + new Date();
log.info("SenderB:" + context);
this.amqpTemplate.convertAndSend("hello",context);
}
}

消费者1:

[code]package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:消费者1
* 备注:多生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 22:14
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloReceiverA {

private static final Logger log = LoggerFactory.getLogger(HelloReceiverA.class);

//监听器监听指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("ReceiverA:"+hello);
}

}

消费者2:

[code]package com.example.demo.rabbitmq.service.manyToMany;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service
* 类名:
* 功能:消费者2
* 备注:多生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 22:14
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class HelloReceiverB {

private static final Logger log = LoggerFactory.getLogger(HelloReceiverB.class);

//监听器监听指定的Queue
@RabbitListener(queues="hello")
public void process(String hello){
log.info("ReceiverB:"+hello);
}

}

controller测试:

[code]package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.manyToMany.HelloSenderA;
import com.example.demo.rabbitmq.service.manyToMany.HelloSenderB;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 路径:com.example.demo.rabbitmq.controller
* 类名:
* 功能:《用一句描述一下》
* 备注:多生产者-多消费者
* 创建人:typ
* 创建时间:2018/9/23 22:35
* 修改人:
* 修改备注:
* 修改时间:
*/
@RestController
public class RabbitManyToManyTest {

@Autowired
private HelloSenderA helloSenderA;

@Autowired
private HelloSenderB helloSenderB;

/**
* 方法名:
* 功能:多生产者-多消费者
* 描述:
* 创建人:typ
* 创建时间:2018/9/23 22:46
* 修改人:
* 修改描述:
* 修改时间:
*/
@PostMapping("/manyToMany")
public void ontToMany(){
for (int i=0;i<10;i++){
helloSenderA.send("hello smg:"+i);
helloSenderB.send("hello smg:"+i);
}
}
}

7、实体类传输,springboot完美的支持对象的发送和接收,不需要格外的配置。

实体类(必须实现序列化接口):

[code]package com.example.demo.rabbitmq.service.entity;

import java.io.Serializable;

/**
* 路径:com.example.demo.rabbitmq.service.entity
* 类名:
* 功能:《用一句描述一下》
* 备注:
* 创建人:typ
* 创建时间:2018/9/24 19:59
* 修改人:
* 修改备注:
* 修改时间:
*/
public class User implements Serializable{

private String name;
private String pass;

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getPass() {
return pass;
}

public void setPass(String pass) {
this.pass = pass;
}
}

生产者:

[code]package com.example.demo.rabbitmq.service.entity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.entity
* 类名:
* 功能:实体类传输
* 备注:生产者
* 创建人:typ
* 创建时间:2018/9/24 20:01
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class UserSender {

private static final Logger log = LoggerFactory.getLogger(UserSender.class);

@Autowired
private AmqpTemplate amqpTemplate;

public void send() {
User user = new User();
user.setName("test");
user.setPass("123456");
log.info("user Sender:" + user.getName() + "," + user.getPass());
amqpTemplate.convertAndSend("user", user);
}
}

消费者:

[code]package com.example.demo.rabbitmq.service.entity;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.entity
* 类名:
* 功能:实体类传输
* 备注:消费者
* 创建人:typ
* 创建时间:2018/9/24 20:07
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
@RabbitListener(queues = "user")
public class UserReceiver {

private static final Logger log = LoggerFactory.getLogger(UserReceiver.class);

@RabbitHandler
public void process(User user) {
log.info("user Receive:" + user.getName() + "," + user.getPass());
}
}

controller测试:

[code]package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.entity.UserSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 路径:com.example.demo.rabbitmq.controller
* 类名:
* 功能:实体类传输测试
* 备注:
* 创建人:typ
* 创建时间:2018/9/24 20:09
* 修改人:
* 修改备注:
* 修改时间:
*/
@RestController
public class RabbitUserTest {

@Autowired
private UserSender userSender;

@PostMapping("/userTest")
public void userTets(){
userSender.send();
}
}

8、topic ExChange示例

     topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列。首先对topic规则配置,这里使用两个队列来测试(也就是在Application类中创建和绑定的topic.message和topic.messages两个队列),其中topic.message的bindting_key为“topic.message”,topic.messages的binding_key为“topic.#”。

生产者:

[code]package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.topic
* 类名:
* 功能:topic ExChange示例------生产者
* 备注:topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列
* 创建人:typ
* 创建时间:2018/9/24 20:12
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class TopicSender {

private static final Logger log = LoggerFactory.getLogger(TopicSender.class);

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String msg1 = "I am topic.mesaage msg1!";
log.info("sender1 : " + msg1);
this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

String msg2 = "I am topic.mesaages msg2!";
log.info("sender2 : " + msg2);
this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
}
}

消费者1:

[code]package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.topic
* 类名:
* 功能:topic ExChange示例
* 备注:消费者1(topic.message)
* 创建人:typ
* 创建时间:2018/9/24 20:12
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

private static final Logger log = LoggerFactory.getLogger(TopicReceiver1.class);

@RabbitHandler
public void process(String msg) {
log.info("topicReceiver1: " +msg);
}

}

消费者2:

[code]package com.example.demo.rabbitmq.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.topic
* 类名:
* 功能:topic ExChange示例
* 备注:消费者2(topic.messages)
* 创建人:typ
* 创建时间:2018/9/24 20:12
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

private static final Logger log = LoggerFactory.getLogger(TopicReceiver2.class);

@RabbitHandler
public void process(String msg) {
log.info("topicReceiver2 : " +msg);
}

}

controller测试:

[code]package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.topic.TopicSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 路径:com.example.demo.rabbitmq.controller
* 类名:
* 功能:topic ExChange示例
* 备注:
* 创建人:typ
* 创建时间:2018/9/24 20:21
* 修改人:
* 修改备注:
* 修改时间:
*/
@RestController
public class RabbitTopicTest {

@Autowired
private TopicSender topicSender;

@PostMapping("/topicTest")
public void topicTest(){
topicSender.send();
}
}

9、fanout ExChange示例

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。

生产者:

[code]package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.fanout
* 类名:
* 功能:fanout ExChange示例---生产者
* 备注:Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。
* 创建人:typ
* 创建时间:2018/9/24 21:10
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class FanoutSender {

private static final Logger log = LoggerFactory.getLogger(FanoutSender.class);

@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
String msg="fanoutSender :hello i am fanout";
log.info(msg);
this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msg);
}
}

消费者1:

[code]package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.fanout
* 类名:
* 功能:fanout ExChange示例
* 备注:消费者A
* 创建人:typ
* 创建时间:2018/9/24 21:10
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiverA {

private static final Logger log = LoggerFactory.getLogger(FanoutReceiverA.class);

@RabbitHandler
public void process(String msg) {
log.info("FanoutReceiverA  : " + msg);
}

}

消费者2:

[code]package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.fanout
* 类名:
* 功能:fanout ExChange示例
* 备注:消费者B
* 创建人:typ
* 创建时间:2018/9/24 21:10
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiverB {

private static final Logger log = LoggerFactory.getLogger(FanoutReceiverB.class);

@RabbitHandler
public void process(String msg) {
log.info("FanoutReceiverB  : " + msg);
}

}

消费者3:

[code]package com.example.demo.rabbitmq.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.fanout
* 类名:
* 功能:fanout ExChange示例
* 备注:消费者C
* 创建人:typ
* 创建时间:2018/9/24 21:10
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiverC {

private static final Logger log = LoggerFactory.getLogger(FanoutReceiverC.class);

@RabbitHandler
public void process(String msg) {
log.info("FanoutReceiverC  : " + msg);
}

}

controller测试:

[code]package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.fanout.FanoutSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 路径:com.example.demo.rabbitmq.controller
* 类名:
* 功能:fanout ExChange示例
* 备注:
* 创建人:typ
* 创建时间:2018/9/24 22:11
* 修改人:
* 修改备注:
* 修改时间:
*/
@RestController
public class RabbitFanoutTest {

@Autowired
private FanoutSender fanoutSender;

@PostMapping("/fanoutTest")
public void fanoutTest() {
fanoutSender.send();
}
}

10、callback的消息发送

       增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。

rabbitmq配置类:

[code]package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;

/**
* 路径:com.example.demo.rabbitmq.service.callback
* 类名:
* 功能:增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。
* 备注:
* 创建人:typ
* 创建时间:2018/9/24 20:09
* 修改人:
* 修改备注:
* 修改时间:
*/
public class RabbitConfig {

private static final Logger log = LoggerFactory.getLogger(RabbitConfig.class);

@Value("${spring.rabbitmq.host}")
private String addresses;

@Value("${spring.rabbitmq.port}")
private String port;

@Value("${spring.rabbitmq.username}")
private String username;

@Value("${spring.rabbitmq.password}")
private String password;

@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;

@Value("${spring.rabbitmq.publisher-confirms}")
private boolean publisherConfirms;

@Bean
public ConnectionFactory connectionFactory() {

CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses+":"+port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);

//如果要进行消息回调,则这里必须要设置为true
connectionFactory.setPublisherConfirms(publisherConfirms);
return connectionFactory;
}

//因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public RabbitTemplate rabbitTemplatenew() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}

}

生产者:

[code]package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
* 路径:com.example.demo.rabbitmq.service.callback
* 类名:CallBackSender
* 功能:callback的消息发送-----生产者
* 创建人:typ
* 创建时间:2018/9/24 20:09
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
public class CallBackSender implements  RabbitTemplate.ConfirmCallback{

private static final Logger log = LoggerFactory.getLogger(CallBackSender.class);

@Autowired
private RabbitTemplate rabbitTemplatenew;

public void send() {
rabbitTemplatenew.setConfirmCallback(this);
String msg="callbackSender : i am callback sender";
log.info(msg);
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("callbackSender UUID: " + correlationData.getId());
this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
}

public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("callbakck confirm: " + correlationData.getId());
}
}

消费者:

[code]package com.example.demo.rabbitmq.service.callback;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 路径:com.example.demo.rabbitmq.service.callback
* 类名:
* 功能:callback的消息发送
* 备注:消费者
* 创建人:typ
* 创建时间:2018/9/24 20:12
* 修改人:
* 修改备注:
* 修改时间:
*/
@Component
@RabbitListener(queues = "topic.messages")
public class CallBackReceiver {

private static final Logger log = LoggerFactory.getLogger(CallBackReceiver.class);

@RabbitHandler
public void process(String msg) {
log.info("CallBackReceiver : " +msg);
}

}

controller测试:

[code]package com.example.demo.rabbitmq.controller;

import com.example.demo.rabbitmq.service.callback.CallBackSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* 路径:com.example.demo.rabbitmq.controller
* 类名:
* 功能:callback的消息发送
* 备注:
* 创建人:typ
* 创建时间:2018/9/24 22:20
* 修改人:
* 修改备注:
* 修改时间:
*/
@RestController
public class RabbitCallBackTest {

@Autowired
private CallBackSender callBackSender;

//执行代码可以看出callbackSender发出的UUID,收到了回应,又传回来了。
@PostMapping("/callback")
public void callbak() {
callBackSender.send();
}
}

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: