RocketMq的学习二:广播模式
第一期中已经按官网搭建好了RocketMQ,这一期主要探讨RocketMQ帮我们解决的问题;
首先是官网介绍的七个例子,向我们简单展示了rocketMQ的主要应用范围,七个例子分别为:订单,广播,计划,批处理,过滤,Logappender,OpenMessaging;
官网链接:http://rocketmq.apache.org/docs/quick-start/
因为项目中需要用到广播,先看下广播的例子;
消息生产者代码:
[code]public class BroadcastProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 100; i++){ Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); }
}
消息消费者代码:
[code]public class BroadcastConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //set to broadcast mode consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); }
}
官网代码很简洁,DefaultMQProducer通过send方法直接发送message对象,返回SendResult; DefaultMQPushConsumer设置从上次接收位位置,设置接收方式,设置订阅名称;设置监听事件,重写消费消息相关代码(此处可以加上接收到相关消息的处理逻辑);
上次在工作中遇到如下问题:A,B两个系统,由于业务发展,关于同一产品两个系统中说明不一致,需要综合A系统与B系统的说明;我们最后就采用了广播模式;
主要工作分为三部分:1.现有不一致数据的处理;2.RocketMq环境的搭建;3.对A,B系统的改造;
对于现有数据不一致的处理,原本以为可以通过查A库的表,B库的表来对比数据,但客户说有导出导入数据功能,可以通过对比结果,写一个小程序来比对字段差异,然后求并集;客户就是爸爸,按照需求做就好了;实际情况是仍然有一部分字段取不到,需要与导出execl解析;此部分花费了我们大量精力去书写不同字段的正则提取代码;最后生成导入文件,重新导入;
RocketMq环境的搭建,此处由于第一部分的存在,是我们已经有了一定的数据不一致的处理办法,加上内部系统实际也没多少并发异常,所以不需考虑中间件集群,单MQ即可处理;
3.对A,B系统的改造,此部分将原有插入部分代码的sql,A,B系统均将涉及到的数据丢入MQ,然后广播给A,B,A,B系统通过在系统监听MQ消息,然后执行消息中的更新Sql即可
关于监听,此部分还没来得及写,想想继承ServletContextListener,注入spring,即系统启动时自动执行消费相关程序即可;代码我后续补充到这篇文章里
由于不是金融项目,加上前期先写了异常数据小程序,导致MQ配置与后期事务保证较差,因为根本不需要同时一致,只要收到就好,收不到,大不了再跑一次;
关于MQ事务看了一篇文章,文章链接:
分布式消息队列RocketMQ–事务消息–解决分布式事务的最佳实践:
https://www.geek-share.com/detail/2693502112.html
- rocket mq 的广播模式示例
- ActiveMQ学习(三)——MQ的通讯模式
- rocketmq学习笔记 二 官方实例<消息过滤>
- rocketmq 学习记录-2
- rocketmq 双master模式
- MQ学习(三)--- 订阅模式
- RocketMQ源码学习--消息存储篇
- RocketMQ多Master多Slave模式部署
- rocketmq学习笔记 四 rocketmq运行架构
- Androidc学习笔记三之BroadcastReceiver广播接收器及单双页模式
- RocketMQ 学习笔记
- rocketmq学习笔记 六 流程之拉消息
- rocketmq学习笔记 五 源码之rocketmq-tools
- rocketmq学习笔记 六 流程之取消息
- ActiveMQ学习(三)——MQ的通讯模式
- RocketMQ多Master模式
- rocketmq学习笔记 五 源码之rocketmq-broker
- RocketMQ 服务器3模式
- rocketmq学习笔记 五 源码之rocketmq-filtersrv
- (转)RocketMQ源码学习--消息存储篇