RabbitMQ五种消息队列学习(四)--发布订阅模式
2017-12-15 17:00
726 查看
RabbitMQ五种消息队列学习(四)–发布订阅模式
标签(空格分隔): RabbitMQ由于生产者和消费者之间用相同的队列,形成了应用直接的耦合关系,那怎么解耦呢。使用发布订阅模式
队列模型
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
消息传递如下图所示:
代码实现
1、生产者注意:消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中
private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息内容 String message = "商品已经更新! id = 1000"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); }
2、消费者1
private final static String QUEUE_NAME = "test_queue_fanout_1"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
3、消费者2
private final static String QUEUE_NAME = "test_queue_fanout_2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义队列的消费者 QueueingConsumer consumer = new QueueingConsumer(channel); // 监听队列,手动返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 获取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }
测试结果
测试结果:同一个消息被多个消费者获取。
在管理工具中查看队列和交换机的绑定关系:
总结
使用订阅模式是一个消息经交换机之后,会发送给不同的消费者进行消费。MQ不能对不同的消息进行不同的分发模式。例如:根据Routing Key进行选择性的分发相关文章推荐
- RabbitMQ五种消息队列学习(六)--通配符模式(路由类型:Topic)
- RabbitMQ五种消息队列学习(三)--Work模式
- rabbitmq消息队列——"发布订阅"
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- Redis学习笔记(十)消息通知(任务队列和发布订阅模式)
- 【转】redis 消息队列发布订阅模式spring boot实现
- 使用redis的发布订阅模式实现消息队列
- 消息队列-ActiveMQ学习笔记(三)-发布-订阅消息模式实现
- Redis消息通知(任务队列和发布订阅模式)
- Redis学习笔记(十)消息通知(任务队列和发布订阅模式)
- Redis基础学习--Redis 事务(watch命令)、生存时间、排序、消息通知("发布/订阅"模式)、管道、节省空间
- 消息队列及发布/订阅模式
- (五)RabbitMQ消息队列-安装amqp扩展并订阅/发布Demo(PHP版)
- 消息队列-ActiveMQ学习笔记(三)-发布-订阅消息模式实现
- JMS消息队列ActiveMQ(发布/订阅模式)
- redis 消息队列发布订阅模式spring boot实现
- (五)RabbitMQ消息队列-安装amqp扩展并订阅/发布Demo(PHP版)
- Spring Data Redis实现消息队列——发布/订阅模式
- 系统拆分解耦利器之消息队列---RabbitMQ-发布/订阅
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式