RabbitMQ-主题模式Topic
2018-03-11 21:08
513 查看
Topic Exchange
生产者
消费者1
消费者2
将路由和某个模式匹配 # 匹配一个或者多个 * 匹配一个 例如 Good.insert Good.delete Good.#都能获得
商品 : 发布 删除 修改 查询....
生产者
package com.ithzk.rabbitmq.topic; import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/10 */ public class Send { private final static String EXCHANGER_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = RabbitMQConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明交换机 channel.exchangeDeclare(EXCHANGER_NAME,"topic"); String msg = "goods......"; //发送goods.add 消费者1和消费者2都可以收到消息 //发送goods.delete 消费者2收到消息 String routingKey = "goods.delete"; channel.basicPublish(EXCHANGER_NAME,routingKey,null,msg.getBytes()); System.out.println("Send msg"+msg); channel.close(); connection.close(); } }
消费者1
package com.ithzk.rabbitmq.topic; import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/10 */ public class Recv1 { private static final String QUEUE_NAME="test_queue_topic_one"; private final static String EXCHANGER_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = RabbitMQConnectionUtils.getConnection(); //从连接中获取频道 final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); String routingKey = "goods.add"; //绑定队列到交换机 转发器 channel.queueBind(QUEUE_NAME,EXCHANGER_NAME,routingKey); //保证一次只发一个 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg:" + msg); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); System.out.println("[Consumer 1 start]"); } }
消费者2
package com.ithzk.rabbitmq.topic; import com.ithzk.rabbitmq.utils.RabbitMQConnectionUtils; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; /** * @author hzk * @date 2018/3/10 */ public class Recv2 { private static final String QUEUE_NAME="test_queue_topic_two"; private final static String EXCHANGER_NAME = "test_exchange_topic"; public static void main(String[] args) throws IOException, TimeoutException { //获取连接 Connection connection = RabbitMQConnectionUtils.getConnection(); //从连接中获取频道 final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //绑定队列到交换机 转发器 String routingKey = "goods.#"; channel.queueBind(QUEUE_NAME,EXCHANGER_NAME,routingKey); //保证一次只发一个 channel.basicQos(1); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[2] Recv msg:" + msg); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done"< c635 /span>); channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); System.out.println("[Consumer 2 start]"); } }
相关文章推荐
- (八)RabbitMQ消息队列-通过Topic主题模式分发消息
- (八)RabbitMQ消息队列-通过Topic主题模式分发消息
- RabbitMQ的Exchange 模式之topic(主题模式)
- RabbitMQ消息分发模式----"Topic"主题模式
- RabbitMQ消息分发模式----"Topic"主题模式
- RabbitMQ (五)主题(Topic)
- RabbitMQ入门-Topic模式
- RabbitMQ (七) 主题(Topic)
- [转]RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较
- RabbitMQ三种Exchange模式(fanout,direct,topic)的特性 -摘自网络
- RabbitMQ系列教程之五:主题(Topic)(转载)
- 消息队列MQ实践----实现Queue(队列消息)和Topic(主题消息)两种模式
- RabbitMQ (五)主题(Topic)
- RabbitMQ (五)主题(Topic)
- RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较(转)
- RabbitMQ (五)主题(Topic)
- RabbitMQ (五)主题(Topic)
- Spring Boot 整合 RabbitMQ 之 Topic转发模式 (二)
- rabbitmq 交换模式-Topic
- RabbitMQ三种Exchange模式(fanout,direct,topic)介绍