SpringBoot(十二):SpringBoot整合RabbitMQ
如果对RabbitMQ不熟悉的,建议先看RabbitMQ系列教程。
一、环境准备
RabbitMQ 3.7.4
SpringBoot 1.5.10.RELEASE
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.yml
spring:
rabbitmq:
host: 192.168.239.128
port: 5672
# username: test
# password: 123456
# virtual-host: /vhost_test
# publisher-confirms: true
二、简单队列
RabbitMQConfiguration.java 声明队列
package cn.saytime.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
private static final String QUEUE_SIMPLE_NAME = "test_simple_queue";
@Bean
public Queue queue(){
return new Queue(QUEUE_SIMPLE_NAME, false, false, false, null);
}
}
消费者
package cn.saytime.listener.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*/
@RabbitListener(queues = "test_simple_queue")
@Component
public class SimpleRecv {
@RabbitHandler
public void process(String message) {
System.out.println("[x] rev : " + message);
}
}
测试生产者
package cn.saytime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testSimpleQueue() {
String message = "Hello RabbitMQ !";
amqpTemplate.convertAndSend("test_simple_queue", message);
System.out.println("[x] send " + message + " ok");
}
}
执行test:
[x] send Hello RabbitMQ ! ok
[x] rev : Hello RabbitMQ !
三、工作队列
公平分发模式在Spring-amqp中是默认的,这种情况也是日常工作中使用最为正常的,轮询模式用的较少,区别在于prefetch默认是1,如果设置为0就是轮询模式。
3.1 公平分发模式
RabbitMQConfiguration.java 声明队列
package cn.saytime.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
private static final String QUEUE_WORK_NAME = "test_work_queue";
@Bean
public Queue workQueue(){
return new Queue(QUEUE_WORK_NAME, false, false, false, null);
}
}
消费者1
package cn.saytime.listener.workfair;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*/
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv {
@RabbitHandler
public void process(String message){
System.out.println("[1] rev : " + message);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者2
package cn.saytime.listener.workfair;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*/
@RabbitListener(queues = "test_workfair_queue")
@Component
public class WorkRecv2 {
@RabbitHandler
public void process(String message){
System.out.println("[2] rev : " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
测试生产者
package cn.saytime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testWorkFairQueue(){
for (int i = 0; i < 20; i++) {
String message = "Hello RabbitMQ " + i;
// 发送消息
amqpTemplate.convertAndSend("test_workfair_queue", message);
System.out.println(" [x] Sent '" + message + "'");
try {
Thread.sleep(i*100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行test:
[x] Sent 'Hello RabbitMQ 0'
[x] Sent 'Hello RabbitMQ 1'
[2] rev : Hello RabbitMQ 1
[1] rev : Hello RabbitMQ 0
[x] Sent 'Hello RabbitMQ 2'
[x] Sent 'Hello RabbitMQ 3'
[x] Sent 'Hello RabbitMQ 4'
[x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
[x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
[x] Sent 'Hello RabbitMQ 7'
[x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
[x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
[x] Se
4000
nt 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
[x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
[x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
[x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
[x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
[x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
[x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
[x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
[x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
[x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19
公平分发模式测试正常。
3.2 轮询分发模式
修改application.yml
spring:
rabbitmq:
host: 192.168.239.128
port: 5672
# username: test
# password: 123456
# virtual-host: /vhost_test
# publisher-confirms: true
listener:
simple:
prefetch: 0
[x] Sent 'Hello RabbitMQ 0'
[x] Sent 'Hello RabbitMQ 1'
[1] rev : Hello RabbitMQ 0
[2] rev : Hello RabbitMQ 1
[x] Sent 'Hello RabbitMQ 2'
[x] Sent 'Hello RabbitMQ 3'
[x] Sent 'Hello RabbitMQ 4'
[x] Sent 'Hello RabbitMQ 5'
[1] rev : Hello RabbitMQ 2
[x] Sent 'Hello RabbitMQ 6'
[2] rev : Hello RabbitMQ 3
[1] rev : Hello RabbitMQ 4
[x] Sent 'Hello RabbitMQ 7'
[x] Sent 'Hello RabbitMQ 8'
[1] rev : Hello RabbitMQ 5
[x] Sent 'Hello RabbitMQ 9'
[2] rev : Hello RabbitMQ 6
[1] rev : Hello RabbitMQ 7
[x] Sent 'Hello RabbitMQ 10'
[1] rev : Hello RabbitMQ 8
[x] Sent 'Hello RabbitMQ 11'
[2] rev : Hello RabbitMQ 9
[1] rev : Hello RabbitMQ 10
[x] Sent 'Hello RabbitMQ 12'
[1] rev : Hello RabbitMQ 11
[x] Sent 'Hello RabbitMQ 13'
[2] rev : Hello RabbitMQ 12
[1] rev : Hello RabbitMQ 13
[x] Sent 'Hello RabbitMQ 14'
[1] rev : Hello RabbitMQ 14
[x] Sent 'Hello RabbitMQ 15'
[1] rev : Hello RabbitMQ 15
[x] Sent 'Hello RabbitMQ 16'
[2] rev : Hello RabbitMQ 16
[x] Sent 'Hello RabbitMQ 17'
[1] rev : Hello RabbitMQ 17
[x] Sent 'Hello RabbitMQ 18'
[2] rev : Hello RabbitMQ 18
[x] Sent 'Hello RabbitMQ 19'
[1] rev : Hello RabbitMQ 19
可以看到消费者1消费偶数,消费者2消费奇数,表示轮询分发正常。
四、订阅模式
定义交换机、队列、以及队列与交换机的绑定关系。
package cn.saytime.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfiguration {
private static final String EXCHANGE_FANNOUT_NAME = "test_exchange_fanout";
private static final String QUEUE_PS_SMS_NAME = "test_queue_fanout_sms";
private static final String QUEUE_PS_EMAIL_NAME = "test_queue_fanout_email";
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange(){
return new FanoutExchange(EXCHANGE_FANNOUT_NAME);
}
@Bean
public Queue fanoutSmsQueue(){
return new Queue(QUEUE_PS_SMS_NAME, false, false, false, null);
}
@Bean
public Queue fanoutEmailQueue(){
return new Queue(QUEUE_PS_EMAIL_NAME, false, false, false, null);
}
@Bean
public Binding smsQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutSmsQueue){
return BindingBuilder.bind(fanoutSmsQueue).to(fanoutExchange);
}
@Bean
public Binding emailQueueExchangeBinding(FanoutExchange fanoutExchange, Queue fanoutEmailQueue){
return BindingBuilder.bind(fanoutEmailQueue).to(fanoutExchange);
}
}
消费者1 sms
package cn.saytime.listener.ps;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* sms消费者
*/
@RabbitListener(queues = "test_queue_fanout_sms")
@Component
public class SmsRecv {
@RabbitHandler
public void process(String message){
System.out.println("[sms] rev : " + message);
}
}
消费者2
package cn.saytime.listener.ps;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* email消费者
*/
@RabbitListener(queues = "test_queue_fanout_email")
@Component
public class EmailRecv {
@RabbitHandler
public void process(String message){
System.out.println("[email] rev : " + message);
}
}
测试消息:
package cn.saytime;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMQApplicationTests {
@Autowired
private AmqpTemplate amqpTemplate;
@Test
public void testFanoutQueue() {
String message = "Hello, fanout message ";
// 发送消息
amqpTemplate.convertAndSend("test_exchange_fanout", "", message);
System.out.println(" [x] Sent '" + message + "'");
}
}
执行test结果:
[x] Sent 'Hello, fanout message '
[sms] rev : Hello, fanout message
[email] rev : Hello, fanout message
五、路由模式
跟订阅模式一样,只不过Configuration里面配置的是DirectExchange,并设定路由键。
绑定关系:
BindingBuilder.bind(queue).to(exchange).with("info");
发送消息:
amqpTemplate.convertSendAndReceive(exchange, routingkey, message);
六、主题模式
跟路由模式一样,只不过路由键可以模糊匹配,配置的是TopicExchange.
- springboot(十二)整合rabbitmq
- Spring-Boot整合RabbitMQ
- Springboot整合 二 集成 rabbitmq
- Spring Boot整合RabbitMQ实例(Topic模式)
- spring-boot整合rabbitmq启动报错no queue 'dev_pms2invoi_queue' in vhost '/'
- Spring Boot整合RabbitMQ
- Spring-Boot整合RabbitMQ
- RabbitMQ学习(十二)之spring整合发送异步消息(注解实现)
- Spring Boot系列(十二)Spring Boot整合ActiveQ实现消息收发和订阅
- spring boot实战(第十二篇)整合RabbitMQ
- Spring Boot整合RabbitMQ开发实战详解
- Rabbitmq 整合Spring,SpringBoot与Docker
- spring boot整合RabbitMQ实例详解(Fanout模式)
- RabbitMQ 实战(二)Spring Boot 整合 RabbitMQ
- SpringBoot--整合RabbitMq
- rabbitmq 整合springboot
- Springboot整合RabbitMQ,良心推荐
- springboot 学习之路 20 (整合RabbitMQ)
- 【SpringBoot】整合RabbitMQ
- SpringBoot非官方教程 | 第十五篇:Springboot整合RabbitMQ