spring boot 使用RabbitMQ
2019-06-10 14:32
99 查看
版权声明:Copyright ©2018-2019 凉白开不加冰 版权所有 https://blog.csdn.net/qq_21082615/article/details/91375573
介绍:通常我们谈到队列服务, 会有三个概念: 发消息者、队列、收消息者,RabbitMQ 在这个基本概念之上, 多做了一层抽象, 在发消息者和 队列之间, 加入了交换器 (Exchange). 这样发消息者和队列就没有直接联系, 转而变成发消息者把消息给交换器, 交换器根据调度策略再把消息再给队列。
那么,其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
- 虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
- 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。
这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。 - 绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系
第一步:pom文件
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
第二步:application.yml文件
spring: application: name: rabbitmq rabbitmq: host: 队列IP username: admin password: admin port: 5672
一、点对点模式
点对点配置
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 点对点配置 **/ @Configuration public class RabbitP2PConfigure { public static final String QUEUE_NAME = "p2p_queue"; @Bean public Queue queue() { return new Queue(QUEUE_NAME, true); } }
点对点模式:发送者消息保留在RabbitMQ中,当有订阅者是推送消息给订阅者
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 点对点模式:发送者消息保留在RabbitMQ中,当有订阅者是推送消息给订阅者 **/ @Component public class Cousmer { @RabbitListener(queues = RabbitP2PConfigure.QUEUE_NAME) public void receive(String msg){ System.out.println("点对点模式 --> "+msg); } }
生产消息
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 生产消息 **/ @Component public class Procuder { @Autowired private AmqpTemplate amqpTemplate; public void send(String msg) { amqpTemplate.convertAndSend(RabbitP2PConfigure.QUEUE_NAME, msg); } }
二、TOPIC模式
TOPIC模式配置
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: TOPIC模式配置 **/ @Configuration public class RabbitTopicConfigure { public final static String message = "topic.message"; public final static String message2 = "topic.message2"; public final static String messages = "topic.messages"; public final static String exchange = "topicExchange"; @Bean public Queue queueMessage() { return new Queue(RabbitTopicConfigure.message); } @Bean public Queue queueMessage2() { return new Queue(RabbitTopicConfigure.message2); } @Bean public Queue queueMessages() { return new Queue(RabbitTopicConfigure.messages); } @Bean TopicExchange exchange() { return new TopicExchange(RabbitTopicConfigure.exchange); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message"); } @Bean Binding bindingExchangeMessage2(Queue queueMessage2, TopicExchange exchange) { return BindingBuilder.bind(queueMessage2).to(exchange).with("topic.message2"); } @Bean Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#"); } }
消息消费者
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 消息消费者 **/ @Component @RabbitListener(queues = {RabbitTopicConfigure.message}) public class ReceiverA { @RabbitHandler public void message(String message) { System.out.println("message --> "+message); } }
消息消费者
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 消息消费者 **/ @Component @RabbitListener(queues = {RabbitTopicConfigure.messages}) public class ReceiverB { @RabbitHandler public void messages(String message) { System.out.println("messages --> "+message); } }
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 消息消费者 **/ @Component @RabbitListener(queues = {RabbitTopicConfigure.message2}) public class ReceiverC { @RabbitHandler public void message2(String message) { System.out.println("message2 --> "+message); } }
消息生产者
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 消息生产者 **/ @Component public class ProducerTopic { @Autowired private AmqpTemplate rabbitTemplate; public void message() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.exchange, RabbitTopicConfigure.message, "message"); } public void message2() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.exchange, RabbitTopicConfigure.message2, "message2"); } public void messages() { rabbitTemplate.convertAndSend(RabbitTopicConfigure.exchange, RabbitTopicConfigure.messages, "messages"); } }
三、交换机模式
交换机模式配置
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 交换机模式 **/ @Configuration public class RabbitFanoutConfigure { public final static String fanoutA = "fanout.A"; public final static String fanoutB = "fanout.B"; public final static String fanoutC = "fanout.C"; public final static String fanout_exchange = "fanoutExchange"; @Bean public Queue messageA() { return new Queue(fanoutA); } @Bean public Queue messageB() { return new Queue(fanoutB); } @Bean public Queue messageC() { return new Queue(fanoutC); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(fanout_exchange); } @Bean Binding bindingExchangeA(Queue messageA,FanoutExchange fanoutExchange) { return BindingBuilder.bind(messageA).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue messageB, FanoutExchange fanoutExchange) { return BindingBuilder.bind(messageB).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue messageC, FanoutExchange fanoutExchange) { return BindingBuilder.bind(messageC).to(fanoutExchange); } }
消息消费者
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2019/2/18 * @Description: 消息消费者 **/ @Component @RabbitListener(queues = RabbitFanoutConfigure.fanoutA) public class FanoutReceiverA { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver A : " + message); } }
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2019/2/18 * @Description: 消息消费者 **/ @Component @RabbitListener(queues = RabbitFanoutConfigure.fanoutB) public class FanoutReceiverB { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver B : " + message); } }
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2019/2/18 * @Description: 消息消费者 **/ @Component @RabbitListener(queues = RabbitFanoutConfigure.fanoutC) public class FanoutReceiverC { @RabbitHandler public void process(String message) { System.out.println("fanout Receiver C : " + message); } }
消息生产者
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/24 * @Description: 消息生产者 **/ @Component public class Procuder { @Autowired private AmqpTemplate amqpTemplate; public void sendA(Object msg){ amqpTemplate.convertAndSend(RabbitFanoutConfigure.fanout_exchange,RabbitFanoutConfigure.fanoutA,msg); } public void send(String msg) { String context = "hi, fanout msg "+msg; System.out.println("Sender : " + context); this.amqpTemplate.convertAndSend(RabbitFanoutConfigure.fanout_exchange,"", context); } }
四、延迟消费
延迟消费配置文件
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/25 * @Description: 延迟消费配置文件 **/ @Configuration public class RabbitLazyConfigure { /** * 延迟队列 TTL 名称 */ private static final String REGISTER_DELAY_QUEUE = "dev.activity.register.delay.queue"; /** * DLX,dead letter发送到的 exchange * 此处的 exchange 很重要,具体消息就是发送到该交换机的 */ public static final String REGISTER_DELAY_EXCHANGE = "dev.activity.register.delay.exchange"; /** * routing key 名称 * 此处的 routingKey 很重要要,具体消息发送在该 routingKey 的 */ public static final String DELAY_ROUTING_KEY = "activity"; /** * 队列名称 */ public static final String REGISTER_QUEUE_NAME = "dev.activity.register.queue"; /** * 交换机名称 */ public static final String REGISTER_EXCHANGE_NAME = "dev.activity.register.exchange"; public static final String ROUTING_KEY = "all"; /** * 延迟队列配置 * <p> * 1、params.put("x-message-ttl", 5 * 1000); * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先) * 2、rabbitTemplate.convertAndSend(book, message -> { * message.getMessageProperties().setExpiration(2 * 1000 + ""); * return message; * }); * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制 **/ @Bean public Queue delayProcessQueue() { Map<String, Object> params = new HashMap<>(); // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, params.put("x-dead-letter-exchange", REGISTER_EXCHANGE_NAME); // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。 params.put("x-dead-letter-routing-key", ROUTING_KEY); return new Queue(REGISTER_DELAY_QUEUE, true, false, false, params); } /** * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。 * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 * 它不像 TopicExchange 那样可以使用通配符适配多个 * @return DirectExchange */ @Bean public DirectExchange delayExchange() { return new DirectExchange(REGISTER_DELAY_EXCHANGE); } /** * 延迟队列交换机绑定 */ @Bean public Binding dlxBinding() { return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY); } /** * 活动队列 */ @Bean public Queue registerActivityQueue() { return new Queue(REGISTER_QUEUE_NAME, true); } /** * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。 * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。 **/ @Bean public TopicExchange registerActivityTopicExchange() { return new TopicExchange(REGISTER_EXCHANGE_NAME); } /** * 队列交换器绑定 */ @Bean public Binding registerActivityBinding() { //如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键 return BindingBuilder.bind(registerActivityQueue()).to(registerActivityTopicExchange()).with(ROUTING_KEY); } }
消费消息
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/25 * @Description: 消费消息 **/ @Component public class LazyConsumer { @RabbitListener(queues = {RabbitLazyConfigure.REGISTER_QUEUE_NAME}) public void listenerDelayQueue(String msg, Message message, Channel channel) { System.out.println("[listenerDelayQueue 监听的消息] - [消费时间] -> "+ LocalDateTime.now()+"-[消费信息] -> "+ msg); // try { // //通知 MQ 消息已被成功消费,可以ACK了 // channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // } catch (Exception e) { // //如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列 // // } } }
延迟发送消息
/** * @Author: 凉白开不加冰 * @Version: 0.0.1V * @Date: 2018/10/25 * @Description: 延迟发送消息 **/ @Component public class LazySender { @Autowired private AmqpTemplate rabbitTemplate; public void sendlazy(Object msg){ rabbitTemplate.convertAndSend(RabbitLazyConfigure.REGISTER_DELAY_EXCHANGE, RabbitLazyConfigure.DELAY_ROUTING_KEY, msg, message -> { message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, msg); //如果配置了 params.put("x-message-ttl", 5 * 1000); // 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间 message.getMessageProperties().setExpiration(5 * 1000 + ""); return message; }); } }
相关文章推荐
- Spring Boot中使用RabbitMQ
- springboot(3)-Spring Boot中使用RabbitMQ
- Spring boot下使用RabbitMQ实例
- Spring boot 下使用RabbitMQ报错:406
- SpringBoot使用RabbitMQ做消息中间件
- Spring Boot 2.0.0.M7 使用异步消息服务-AMQP(RabbitMQ)
- Spring Boot中使用RabbitMQ
- SpringBoot | 第十二章:RabbitMQ的集成和使用
- Spring Boot中使用RabbitMQ
- Spring Boot与RabbitMQ延迟队列使用示例
- RabbitMQ的Java应用(3) -- 使用spring-boot-starter-amqp开发生产者应用
- springboot使用rabbitMQ的坑
- spring-boot使用rabbitmq示例
- Spring boot使用Rabbitmq注解
- Spring Boot 中使用 RabbitMQ
- Spring Boot中使用RabbitMQ
- RabbitMQ在SpringBoot中使用的一些注意点
- SpringBoot之RabbitMQ的使用方法
- RabbitMQ进阶使用-延时队列的配置(Spring Boot)
- RabbitMQ的安装及在Springboot项目中的简单使用