springboot+RabbitMq之topic和fanout
2018-11-22 00:06
459 查看
1.创建一个springboot项目,导入如下maven坐标
[code] <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.0.0.RELEASE</version> </dependency>
2application.properties 配置
[code]spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
3topic通配符交换器使用
3.1生产者
[code]package com.czxy.controller; import com.czxy.config.TopicRabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; //topic-exange交换机 ROUTING_KEY1 = "topic.message"; @GetMapping("/send1") public void send1() { String context = "hi, i am message 1"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEY1, context); } //topic-exange交换机 ROUTING_KEYx = "topic.#"; @GetMapping("/send2") public void send2() { String context = "hi, i am messages 2"; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend(TopicRabbitConfig.EXCHANGE_NAME, TopicRabbitConfig.ROUTING_KEYx, context); } }
3.2 消费者
[code]package com.czxy.controller; import com.czxy.config.FanoutRabbitConfig; import com.czxy.config.TopicRabbitConfig; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME1) public void consumeMessage(String message) { System.out.println(message+"topic1"); } @RabbitListener(queues = TopicRabbitConfig.QUEUE_NAME2) public void consumeMessage1(String message) { System.out.println(message+"topic2"); } }
3.3 配置类
[code]package com.czxy.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class TopicRabbitConfig { public final static String QUEUE_NAME1 = "topic.message"; public final static String QUEUE_NAME2 = "topic.messages"; public final static String EXCHANGE_NAME = "TopicExchange"; public final static String ROUTING_KEY1 = "topic.message"; public final static String ROUTING_KEY2 = "topic.messages"; public final static String ROUTING_KEYx = "topic.#"; public final static String ROUTING_KEYy = "topic.*"; // 创建队列 @Bean public Queue queue1() { return new Queue(TopicRabbitConfig.QUEUE_NAME1); } @Bean public Queue queue2() { return new Queue(TopicRabbitConfig.QUEUE_NAME2); } // 创建一个 topic 类型的交换器 @Bean public TopicExchange exchange() { return new TopicExchange(TopicRabbitConfig.EXCHANGE_NAME); } // 使用路由键(routingKey)把队列(Queue)绑定到交换器(Exchange) @Bean public Binding binding1(Queue queue1, TopicExchange exchange) { return BindingBuilder.bind(queue1).to(exchange).with(TopicRabbitConfig.ROUTING_KEY1); } @Bean public Binding binding2(Queue queue2, TopicExchange exchange) { return BindingBuilder.bind(queue2).to(exchange).with(TopicRabbitConfig.ROUTING_KEYx); } }
4fanout广播模式交换器使用
4.1生产者
[code]package com.czxy.controller; import com.czxy.config.TopicRabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; // fanout-exange交换机 不需要路由键 @GetMapping("/send") public void send() { String context = "hi, fanout msg "; System.out.println("Sender : " + context); this.rabbitTemplate.convertAndSend("fanoutExchange", "", context); } }
4.2 消费者
[code]package com.czxy.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { public final static String FINDOUT1 = ("fanout.a"); public final static String FINDOUT2 = ("fanout.b"); public final static String FINDOUT3 = ("fanout.c"); public final static String FindoutExange = ("fanoutExchange"); //创建队列 @Bean public Queue AMessage() { return new Queue(FanoutRabbitConfig.FINDOUT1); } @Bean public Queue BMessage() { return new Queue(FanoutRabbitConfig.FINDOUT2); } @Bean public Queue CMessage() { return new Queue(FanoutRabbitConfig.FINDOUT3); } //创建交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FanoutRabbitConfig.FindoutExange); } //为队列模式绑定交换机 @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
4.3配置类
[code]package com.czxy.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutRabbitConfig { public final static String FINDOUT1 = ("fanout.a"); public final static String FINDOUT2 = ("fanout.b"); public final static String FINDOUT3 = ("fanout.c"); public final static String FindoutExange = ("fanoutExchange"); //创建队列 @Bean public Queue AMessage() { return new Queue(FanoutRabbitConfig.FINDOUT1); } @Bean public Queue BMessage() { return new Queue(FanoutRabbitConfig.FINDOUT2); } @Bean public Queue CMessage() { return new Queue(FanoutRabbitConfig.FINDOUT3); } //创建交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FanoutRabbitConfig.FindoutExange); } //为队列模式绑定交换机 @Bean Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
阅读更多
相关文章推荐
- RabbitMQ Exchange Type(Direct/Fanout/Topic)理解测试
- spring amqp rabbitmq fanout配置
- rabbitmq direct、fanout、topic 三种Exchange java 代码比较
- 基于dubbo的分布式项目框架搭建 开发工具idea (springboot+dubbo+zookeeper+redis+rabbitmq+基于Swagger2的restful api) --(一)
- springboot+Rabit实战一:(Rabbit MQ windows 环境搭建)
- springboot+rabbitMq整合开发实战一
- 集群与负载均衡系列(6)——消息队列之rabbitMQ+spring-boot+spring amqp发送可靠的消息
- 带着新人学springboot的应用07(springboot+RabbitMQ 下)
- Python RabbitMQ fanout
- 第四十六章:SpringBoot & RabbitMQ完成消息延迟消费
- spring boot Rabbitmq集成,延时消息队列实现
- SpringBoot + RabbitMQ 使用Demo
- 4 微服务实战系列 - SpringBoot RabbitMQ 实战解决项目中实践
- RabbitMQ Exchange模式之Fanout
- springboot+rabbitmq整合示例程
- springboot+dubbo+zookeeper+mybatis+redis+druid+rabbitmq
- RabbitMQ Exchange中的fanout类型
- springboot RabbitMq的安装以及使用
- 基于dubbo的分布式项目框架搭建 开发工具idea (springboot+dubbo+zookeeper+redis+rabbitmq+基于Swagger2的restful api) --(五)
- 带着新人学springboot的应用05(springboot+RabbitMQ 上)