您的位置:首页 > 编程语言 > Java开发

spring boot 使用RabbitMQ

2019-06-10 14:32 99 查看
版权声明:Copyright ©2018-2019 凉白开不加冰 版权所有 https://blog.csdn.net/qq_21082615/article/details/91375573

介绍:通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。

那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。

  • 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
  • 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
    这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
  • 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系

第一步:pom文件

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

第二步:application.yml文件

spring:
application:
name: rabbitmq
rabbitmq:
host: 队列IP
username: admin
password: admin
port: 5672

一、点对点模式

点对点配置

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 点对点配置
**/
@Configuration
public class RabbitP2PConfigure {
public static final String QUEUE_NAME = "p2p_queue";

@Bean
public Queue queue() {
return new Queue(QUEUE_NAME, true);
}
}

点对点模式:发送者消息保留在RabbitMQ中,当有订阅者是推送消息给订阅者

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 点对点模式:发送者消息保留在RabbitMQ中,当有订阅者是推送消息给订阅者
**/
@Component
public class Cousmer {

@RabbitListener(queues = RabbitP2PConfigure.QUEUE_NAME)
public void receive(String msg){
System.out.println("点对点模式 --> "+msg);
}
}

生产消息

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 生产消息
**/
@Component
public class Procuder {
@Autowired
private AmqpTemplate amqpTemplate;

public void send(String msg) {
amqpTemplate.convertAndSend(RabbitP2PConfigure.QUEUE_NAME, msg);
}
}

二、TOPIC模式

TOPIC模式配置

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: TOPIC模式配置
**/
@Configuration
public class RabbitTopicConfigure {

public final static String message = "topic.message";
public final static String message2 = "topic.message2";
public final static String messages = "topic.messages";
public final static String exchange = "topicExchange";

@Bean
public Queue queueMessage() {
return new Queue(RabbitTopicConfigure.message);
}

@Bean
public Queue queueMessage2() {
return new Queue(RabbitTopicConfigure.message2);
}

@Bean
public Queue queueMessages() {
return new Queue(RabbitTopicConfigure.messages);
}

@Bean
TopicExchange exchange() {
return new TopicExchange(RabbitTopicConfigure.exchange);
}

@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

@Bean
Binding bindingExchangeMessage2(Queue queueMessage2, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage2).to(exchange).with("topic.message2");
}

@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}

消息消费者

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 消息消费者
**/
@Component
@RabbitListener(queues = {RabbitTopicConfigure.message})
public class ReceiverA {

@RabbitHandler
public void message(String message) {
System.out.println("message --> "+message);
}
}

消息消费者

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 消息消费者
**/
@Component
@RabbitListener(queues = {RabbitTopicConfigure.messages})
public class ReceiverB {

@RabbitHandler
public void messages(String message) {
System.out.println("messages --> "+message);
}
}
/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 消息消费者
**/
@Component
@RabbitListener(queues = {RabbitTopicConfigure.message2})
public class ReceiverC {

@RabbitHandler
public void message2(String message) {
System.out.println("message2 --> "+message);
}
}

消息生产者

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 消息生产者
**/
@Component
public class ProducerTopic {
@Autowired
private AmqpTemplate rabbitTemplate;

public void message() {
rabbitTemplate.convertAndSend(RabbitTopicConfigure.exchange, RabbitTopicConfigure.message, "message");
}

public void message2() {
rabbitTemplate.convertAndSend(RabbitTopicConfigure.exchange, RabbitTopicConfigure.message2, "message2");
}

public void messages() {
rabbitTemplate.convertAndSend(RabbitTopicConfigure.exchange, RabbitTopicConfigure.messages, "messages");
}
}

三、交换机模式

交换机模式配置

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 交换机模式
**/
@Configuration
public class RabbitFanoutConfigure {

public final static String fanoutA = "fanout.A";
public final static String fanoutB = "fanout.B";
public final static String fanoutC = "fanout.C";
public final static String fanout_exchange = "fanoutExchange";

@Bean
public Queue messageA() {
return new Queue(fanoutA);
}

@Bean
public Queue messageB() {
return new Queue(fanoutB);
}

@Bean
public Queue messageC() {
return new Queue(fanoutC);
}

@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(fanout_exchange);
}

@Bean
Binding bindingExchangeA(Queue messageA,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(messageA).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue messageB, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(messageB).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue messageC, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(messageC).to(fanoutExchange);
}

}

消息消费者

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2019/2/18
* @Description: 消息消费者
**/
@Component
@RabbitListener(queues = RabbitFanoutConfigure.fanoutA)
public class FanoutReceiverA {

@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver A  : " + message);
}
}
/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2019/2/18
* @Description: 消息消费者
**/
@Component
@RabbitListener(queues = RabbitFanoutConfigure.fanoutB)
public class FanoutReceiverB {

@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver B  : " + message);
}
}
/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2019/2/18
* @Description: 消息消费者
**/
@Component
@RabbitListener(queues = RabbitFanoutConfigure.fanoutC)
public class FanoutReceiverC {

@RabbitHandler
public void process(String message) {
System.out.println("fanout Receiver C  : " + message);
}
}

消息生产者

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/24
* @Description: 消息生产者
**/
@Component
public class Procuder {

@Autowired
private AmqpTemplate amqpTemplate;

public void sendA(Object msg){
amqpTemplate.convertAndSend(RabbitFanoutConfigure.fanout_exchange,RabbitFanoutConfigure.fanoutA,msg);
}

public void send(String msg) {
String context = "hi, fanout msg "+msg;
System.out.println("Sender : " + context);
this.amqpTemplate.convertAndSend(RabbitFanoutConfigure.fanout_exchange,"", context);
}

}

四、延迟消费

延迟消费配置文件

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/25
* @Description: 延迟消费配置文件
**/
@Configuration
public class RabbitLazyConfigure {

/**
* 延迟队列 TTL 名称
*/
private static final String REGISTER_DELAY_QUEUE = "dev.activity.register.delay.queue";

/**
* DLX,dead letter发送到的 exchange
* 此处的 exchange 很重要,具体消息就是发送到该交换机的
*/
public static final String REGISTER_DELAY_EXCHANGE = "dev.activity.register.delay.exchange";

/**
* routing key 名称
* 此处的 routingKey 很重要要,具体消息发送在该 routingKey 的
*/
public static final String DELAY_ROUTING_KEY = "activity";

/**
* 队列名称
*/
public static final String REGISTER_QUEUE_NAME = "dev.activity.register.queue";

/**
* 交换机名称
*/
public static final String REGISTER_EXCHANGE_NAME = "dev.activity.register.exchange";

public static final String ROUTING_KEY = "all";

/**
* 延迟队列配置
* <p>
* 1、params.put("x-message-ttl", 5 * 1000);
* 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
* 2、rabbitTemplate.convertAndSend(book, message -> {
* message.getMessageProperties().setExpiration(2 * 1000 + "");
* return message;
* });
* 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
**/
@Bean
public Queue delayProcessQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
params.put("x-dead-letter-exchange", REGISTER_EXCHANGE_NAME);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
params.put("x-dead-letter-routing-key", ROUTING_KEY);
return new Queue(REGISTER_DELAY_QUEUE, true, false, false, params);
}

/**
* 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
* 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
* 它不像 TopicExchange 那样可以使用通配符适配多个
* @return DirectExchange
*/
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(REGISTER_DELAY_EXCHANGE);
}

/**
* 延迟队列交换机绑定
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);
}

/**
* 活动队列
*/
@Bean
public Queue registerActivityQueue() {
return new Queue(REGISTER_QUEUE_NAME, true);
}

/**
* 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
* 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
**/
@Bean
public TopicExchange registerActivityTopicExchange() {
return new TopicExchange(REGISTER_EXCHANGE_NAME);
}

/**
* 队列交换器绑定
*/
@Bean
public Binding registerActivityBinding() {
//如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
return BindingBuilder.bind(registerActivityQueue()).to(registerActivityTopicExchange()).with(ROUTING_KEY);
}

}

消费消息

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/25
* @Description: 消费消息
**/
@Component
public class LazyConsumer {

@RabbitListener(queues = {RabbitLazyConfigure.REGISTER_QUEUE_NAME})
public void listenerDelayQueue(String msg, Message message, Channel channel) {
System.out.println("[listenerDelayQueue 监听的消息] - [消费时间] -> "+ LocalDateTime.now()+"-[消费信息] -> "+ msg);
//        try {
//            //通知 MQ 消息已被成功消费,可以ACK了
//            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//        } catch (Exception e) {
//            //如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
//
//        }
}
}

延迟发送消息

/**
* @Author: 凉白开不加冰
* @Version: 0.0.1V
* @Date: 2018/10/25
* @Description: 延迟发送消息
**/
@Component
public class LazySender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void sendlazy(Object msg){
rabbitTemplate.convertAndSend(RabbitLazyConfigure.REGISTER_DELAY_EXCHANGE, RabbitLazyConfigure.DELAY_ROUTING_KEY, msg, message -> {
message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, msg);
//如果配置了 params.put("x-message-ttl", 5 * 1000);
// 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
message.getMessageProperties().setExpiration(5 * 1000 + "");
return message;
});
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: