RabbitMQ3.7.2入门到进阶之订阅模式Publish/Subscribe
2018-02-01 16:15
465 查看
RabbitMQ录播视频
此套免费课程视频,本人录播到了腾讯课堂更多请关注腾讯课堂牧码人或 登录网站http://www.51mmr.net
https://ke.qq.com/course/288116#tuin=5740604a
1.消息中间件概述,使用场景(日志处理,异步,系统解耦,流量削锋)
2.Rabbitmq3.7.2安装,控制台简介,管理员添加
3.用户vhost配置以及介绍
4.java操作简单队列,生产者发送消息到队列,消费者接收消息
5.简单队列的缺陷,工作队列work queues之 轮询分发(Round-robin),以及轮询分发现象
6.工作队列work queues 公平分发(fair dispatch);prefetchCount = 1来限制RabbitMQ发送的消息,手动应答ack。
7.消息应答ack与消息持久化durable
8.publish/subscribe发布订阅模式 交换机(转发器)匿名转发Nameless exchange, Fanout Exchange不处理路由键 , Direct Exchange处理路由键, Topic Exchange将路由键和某模式进行匹配。队列绑定交换机(Exchange) ;
9.routing路由模式
10.topic主题模式
11.Rabbitmq之消息确认机制(AMQP事务机制)txSelect(), txCommit()以及txRollback(),事务机制的缺陷
12.Rabbitmq之消息确认机制(Confirm机制串行) waitForConfirms
13.Rabbitmq之消息确认机制(Confirm机制异步) ConfirmListener deliveryTag unconfirm集合维护
14.spring集成rabbitmq-client,template 的使用
15.大数据日志收集系统消息中间件应用
16.搜索系统DIH消息中间件应用
publish/subscribe模型图
我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这时候怎么办? 这时候我们就得用到了消息中的发布订阅模型在前面的教程中,我们创建了一个工作队列,都是一个任务只交给一个消费者。
这次我们做 将消息发送给多个消费者。这种模式叫做“发布/订阅”。
这种模式类似微信订阅号 发布文章消息 就可以广播给所有的接收者。(订阅者)
那么咱们来看一下图,我们学过前两种有一些不一样,work模式 是不是同一个队列 多个消费者,而ps这种模式呢,是一个队列对应一个消费者,pb模式还多了一个X(交换机 转发器) ,这时候我们要获取消息 就需要队列绑定到交换机上,交换机把消息发送到队列 , 消费者才能获取队列的消息
解读:
1、1个生产者,多个消费者
2、每一个消费者都有自己的一个队列
3、生产者没有将消息直接发送到队列,而是发送到了交换机(转发器)
4、每个队列都要绑定到交换机
5、生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
案例
注册完 —>发短信和 发邮件
生产者
public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); // 声明exchange 交换机 转发器 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //fanout 分裂 // 消息内容 String message = "Hello PB"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
那么先看一下控制台 是不是有这个交换机
但是这个发送的消息到哪了呢? 消息丢失了!!!因为交换机没有存储消息的能力,在rabbitmq中只有队列存储消息的能力.因为这时还没有队列,所以就会丢失;
小结:消息发送到了一个没有绑定队列的交换机时,消息就会丢失!
那么我们再来写消费者
消费者1
邮件发送系统public class Recv { private final static String QUEUE_NAME = "test_queue_fanout_email"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); //------------下面逻辑和work模式一样----- // 同一时刻服务器只会发一条消息给消费者 channel.basicQos(1); // 定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { // 消息到达 触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[1] Recv msg:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[1] done "); // 手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } }
消费者2
类似短信发送系统public class Recv2 { private final static String QUEUE_NAME = "test_queue_fanout_2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 获取到连接以及mq通道 Connection connection = ConnectionUtils.getConnection(); final Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 绑定队列到交换机 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一时刻服务器只会发一条消息给消费者 // 定义一个消费者 Consumer consumer = new DefaultConsumer(channel) { // 消息到达 触发这个方法 @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, "utf-8"); System.out.println("[2] Recv msg:" + msg); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("[2] done "); // 手动回执 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer); }
测试
一个消息 可以被多个消费者获取 ,交换机绑定界面
相关文章推荐
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMQ-订阅模式publish/subscribe
- RabbitMQ入门教程(五):扇形交换机发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ入门(3)——发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
- RabbitMQ3.7.2入门到进阶之路由模式(Routing)
- RabbitMQ消息分发模式----"Publish/Subscribe"发布/订阅模式
- RabbitMQ系列教程之三:发布/订阅(Publish/Subscribe)(转载)
- RabbitMQ系列教程之三:发布\/订阅(Publish\/Subscribe)
- 设计模式---订阅发布模式(Subscribe/Publish)
- 订阅模式(publish-subscribe)
- 观察者模式 /发布-订阅(Publish/Subscribe)/模型-视图(Model/View)/源-监听器(Source/Listener)/从属者(Dependents) 模式
- 订阅发布模式(Subscribe/Publish)
- redis之发布与订阅(publish/subscribe模式)
- 消息队列 RabbitMQ系列 第四篇:发布/订阅 Publish/Subscribe
- RabbitMQ入门-发布订阅模式
- 观察者模式(Observer)和发布(Publish/订阅模式(Subscribe)的区别