RabbitMq05——订阅模式
2018-07-08 13:29
344 查看
有时候我们希望的是当生产者发送一条消息的时候,与它相关的消费者都能接收到该消息,而不是每一条消息只能一个消费者消费。即相当于我们关注的公众号,公众号发送一条消息,但是所有关注它的用户都可以收到该条消息。
要实现这种模式,需要加入交换机来作为中间转换。即生产者发送消息不再直接发到队列,而是发送到交换机,再由交换机发送到对应的队列之中。但是每一个消费者都有一个自己的队列,并与之绑定,同时再将队列都绑定到交换机上,这样就可以实现一条消息有多个消费者来消费,也就是订阅模式。
生产者
[code]package com.mmr.rabbitmq.fanout; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.mmr.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "fanout_exchange"; public static void main(String[] args) throws IOException, TimeoutException { //获取MQ连接 Connection connection = ConnectionUtils.getConnection(); //获取信道 Channel channel = connection.createChannel(); //声明交换机,设置交换机类型为fanout channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = "i am coming------>"; for (int i = 0; i < 20; i++) { //向交换机中发送消息 channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes()); System.out.println(message + i); } channel.close(); connection.close(); } }
消费者1
[code]package com.mmr.rabbitmq.fanout; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.mmr.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Rev1 { private final static String EXCHANGE_NAME = "fanout_exchange"; private final static String QUEUE_NAME = "fanout_queue01"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); //声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //声明每次向消费者发送一条消息 channel.basicQos(1); //将队列与交换机绑定 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //创建一个消费者 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); try { Thread.sleep(1000); } catch (InterruptedException e) { //一般用于异常处理,告知MQ消息处理失败, //第二个参数为true,则该条消息被重新放回队列,为false则放弃该条消息 // channel.basicReject(envelope.getDeliveryTag(), false); e.printStackTrace(); }finally { System.out.println("Recv1---->" + msg + "---->Done"); //向MQ发送确认,告诉MQ该条消息已经消费,可以重新发送了 //第二个参数为false表示只确认当前这一条消息, //如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认(批量确认针对的是整个信道) channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); } }
消费者2
消费者2与消费者1类似
[code]package com.mmr.rabbitmq.fanout; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.mmr.rabbitmq.util.ConnectionUtils; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; public class Rev2 { private final static String EXCHANGE_NAME = "fanout_exchange"; private final static String QUEUE_NAME = "fanout_queue02"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("Recv1---->" + msg + "---->Done"); channel.basicAck(envelope.getDeliveryTag(), false); } } }; channel.basicConsume(QUEUE_NAME, false, consumer); } }
运行两个消费者,然后运行生产者,从控制台我们可以看到消费者1和消费者2同时都收到了生产者所发送的消息。
长按关注公众号“魔性JAVA”,会不定时免费推送JAVA相关的学习资料和知识
阅读更多
相关文章推荐
- JavaScript设计模式系列05_观察者模式(发布订阅)写的数据联动(类似于vue的数据绑定)
- RabbitMQ 一二事(3) - 订阅模式(微信公众号模式)的应用
- RabbitMQ教程 (三):Publish/Subscribe 发布订阅模式
- rabbitmq(5)订阅模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- RabbitMQ 使用 | 第三篇:发布/订阅模式
- (java)简单实现原生RabbitMQ中的广播订阅(fanout)模式
- RabbitMQ3.7.2入门到进阶之订阅模式Publish/Subscribe
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMQ入门-发布订阅模式
- RabbitMQ五种消息队列学习(四)--发布订阅模式
- RabbitMQ-订阅模式publish/subscribe
- 【RabbitMQ】三种Exchange模式——订阅、路由、通配符模式
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMq六种使用模式(3)_订阅发布模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- RabbitMQ下的生产消费者模式与订阅发布模式
- 05-rabbitmq-发布订阅
- RabbitMQ下的生产消费者模式与订阅发布模式
- .Net下RabbitMQ发布订阅模式实践