rocketmq 延迟队列的实现fei
2016-04-19 13:24
375 查看
jdk 里的delayQueue 和 scheduleExecutor 利用的是堆结构. 排序后 getTask
其他定时器实现 : IBM Linux 下定时器的实现方式分析https://www.ibm.com/developerworks/cn/linux/l-cn-timers/
流程描述:
1. producer发消息,设置一个延迟level值.“设置消息延时 10s 消费”的 Producer 端代码如下:
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
Message msg = newMessage(topic, tags, keys, body); msg.setDelayTimeLevel(3); ... SendResult sendResult = getMQProducer().send(msg);
2. broker 保存消息时替换了topic,和queueId(一个level计算得到一个queueId,并将实际的topic和queueId作为properties保存).
3. broker有定时任务(其实是个consumer)消费延迟消息,如果到达延迟时间,将消息取出,改回原来的topic和queueId,放入到commitLog中,然后被真正的消费者.
疑难点:
问: 如何保证rocketMq的offset移动和延迟消息不冲突?答: rocketMq当消息真正要消费的时候才把消息放到对应的topic中. 中间先保存到其他地方(利用原有的存储,消费机制,自然是一个topic). rocketMq很巧妙的将用户配置的level和queueId进行了一对一映射. 这样就能保证同一个queue下的消息肯定是顺序消费的.
代码细节:
1. producer 发 消息,设置一个level值.服务端MessageStoreConfig.messageDelayLevel 默认值是
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
2. 服务端接受到消息后
MessageConst.PROPERTY_REAL_TOPIcommitLog.putMessage(MessageExtBrokerInner)里,会把配置了延迟level的消息,存到
ScheduleMessageService.SCHEDULE_TOPIC(值为SCHEDULE_TOPIC_XXXX) ,
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());中 真正的topic和queueId作为msg暂时存起来.
// Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
3.有个定时任务模拟消费者消费该queue
ScheduleMessageService.DeliverDelayedMessageTimerTask 内,判断是否可消息,可以就取出消息,将topic和quueeId还原,放到commitLog中
PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); //
其中
private static final long FIRST_DELAY_TIME = 1000L; //定时任务第一次启动时延迟时间 private static final long DELAY_FOR_A_WHILE = 100L; // 死循环轮训时延迟时间. 又生成一个task,这样避免很多线程一直在执行.比较好的死循环策略. private static final long DELAY_FOR_A_PERIOD = 10000L; // put到commitLog出错时的延迟时间
参考文献:http://www.tuicool.com/articles/aU7JRz7 主要根据该文章的关键字去看对应的源代码,https://github.com/alibaba/RocketMQ
相关文章推荐
- web开发-服务器Controller到前端中的数据传递
- js 函数定义三种方式
- [RFC1867] HTML中基于表单的文件上传
- CSS3实现32种基本图形
- css的div垂直居中的方法,百分比div垂直居中 2014年11月16日 19922次浏览 前言 我们都知道,固定高宽的div在网页中垂直居中很简单,相信大家也很容易的写出来,但是不是固定高宽的d
- js与jquery实时监听输入框值的oninput与onpropertychange方法
- 利用原生JavaScript获取样式的方式小结
- gulp使用技巧-删除node_modules文件夹,解决目录层次太深删除报错的问题
- JS Date parse
- Javascript 面向对象编程:(1)封装;(2)构造函数的继承;(3)非构造函数的继承
- js和jquery中ajax使用
- JavaScript prototype
- HTML加载顺序总结测试
- JSP学习
- 用jquery.form.js 实现ajax提交含有上传文件和普通字段的表单
- 提高CSS开发能力的技巧集
- A first chance exception of type 'System.NullReferenceException' occurred发生的时候你确实是遇到了Bug
- 原生JS实现的简单“瀑布流”布局
- HTML5 在canvas中绘制复杂形状
- 四,细说 HTML5 之 新增的非主题结构元素