您的位置:首页 > 其它

RocketMq的学习二:广播模式

2018-08-07 15:33 169 查看

第一期中已经按官网搭建好了RocketMQ,这一期主要探讨RocketMQ帮我们解决的问题;
首先是官网介绍的七个例子,向我们简单展示了rocketMQ的主要应用范围,七个例子分别为:订单,广播,计划,批处理,过滤,Logappender,OpenMessaging;
官网链接:http://rocketmq.apache.org/docs/quick-start/

因为项目中需要用到广播,先看下广播的例子;
消息生产者代码:

[code]public class BroadcastProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();

for (int i = 0; i < 100; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
}

}

消息消费者代码:

[code]public class BroadcastConsumer {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//set to broadcast mode
consumer.setMessageModel(MessageModel.BROADCASTING);

consumer.subscribe("TopicTest", "TagA || TagC || TagD");

consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
}

}

官网代码很简洁,DefaultMQProducer通过send方法直接发送message对象,返回SendResult; DefaultMQPushConsumer设置从上次接收位位置,设置接收方式,设置订阅名称;设置监听事件,重写消费消息相关代码(此处可以加上接收到相关消息的处理逻辑);

上次在工作中遇到如下问题:A,B两个系统,由于业务发展,关于同一产品两个系统中说明不一致,需要综合A系统与B系统的说明;我们最后就采用了广播模式;
主要工作分为三部分:1.现有不一致数据的处理;2.RocketMq环境的搭建;3.对A,B系统的改造;

对于现有数据不一致的处理,原本以为可以通过查A库的表,B库的表来对比数据,但客户说有导出导入数据功能,可以通过对比结果,写一个小程序来比对字段差异,然后求并集;客户就是爸爸,按照需求做就好了;实际情况是仍然有一部分字段取不到,需要与导出execl解析;此部分花费了我们大量精力去书写不同字段的正则提取代码;最后生成导入文件,重新导入;

RocketMq环境的搭建,此处由于第一部分的存在,是我们已经有了一定的数据不一致的处理办法,加上内部系统实际也没多少并发异常,所以不需考虑中间件集群,单MQ即可处理;

3.对A,B系统的改造,此部分将原有插入部分代码的sql,A,B系统均将涉及到的数据丢入MQ,然后广播给A,B,A,B系统通过在系统监听MQ消息,然后执行消息中的更新Sql即可

关于监听,此部分还没来得及写,想想继承ServletContextListener,注入spring,即系统启动时自动执行消费相关程序即可;代码我后续补充到这篇文章里

由于不是金融项目,加上前期先写了异常数据小程序,导致MQ配置与后期事务保证较差,因为根本不需要同时一致,只要收到就好,收不到,大不了再跑一次;

关于MQ事务看了一篇文章,文章链接:
分布式消息队列RocketMQ–事务消息–解决分布式事务的最佳实践:
https://www.geek-share.com/detail/2693502112.html

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: