JStorm - ACK机制
2017-03-16 00:00
134 查看
摘要: 对jstorm的ack机制的整理
JStorm的ack机制可以追踪totology作业流中的每一个环节是否被正常处理,从而决定作为源头的spout是否需要重发tuple.
所以,该机制适用于对数据完整性与正确性有一定要求的topology,如果认为偶尔一两条数据的处理失败无伤大雅,则大可以取消该机制以获得性能上的提升.
emit: 在emit下一个tuple的时候,需要anchor输入的tuple,具体为调用OutputCollector类中带anchor参数的emit方法.
ack: 在执行完毕后需要ack输入的tuple,可以直接参考下BaseBasicBolt里对ack的处理:
可以看到,在业务逻辑执行完毕后,调用ack()方法提交状态,如果发生异常,则调用fail()方法。 同时可以发现,继承BaseBasicBolt时,如果执行体抛出的不是FailedException的话,任务处理线程不会自动fail(),这时线程会发生什么?
调用conf.setNumAckers(0),当acker num为0时,spout发出的tuple会被立即ack,storm不会追踪数据的处理结果.
当spout发射tuple时,不为其指定msgId.
Storm的ack机制是利用异或运算的特殊性质来实现的,异或运算相关性质:
在Storm中,每一个作业流都有一个初始状态值ack-val:0,每一个tuple在被发射的时候会有一个anchor操作,完成时有一个ack操作, 这两个操作实际上都是与ack-val进行异或操作,由异或的性质可以知道,如果所有tuple都被成功处理,则ack-val的最终值一定会等于初始值,也就是0:
这样通过检测ack-val的最终值就能得知任务流的执行结果,从而决定是ack还是fail.
bolt确认结果为fail时,spout会进行tuple重发,但上一轮处理过程中被持久化到第三方存储系统的数据无法回滚,需要自己增加额外的措施来保证数据不会被重复写入。JStorm自己实现了一套事务机制,还未研究。
spout为了实现数据重发,需要自己维护一份缓存信息,处理ack和fail方法。需要有重试次数的限制,否则一直失败的消息会占用大量的内存。
适用场景
为了确保每条数据仅会被正确的处理一次,JStorm提供了ack机制.JStorm的ack机制可以追踪totology作业流中的每一个环节是否被正常处理,从而决定作为源头的spout是否需要重发tuple.
所以,该机制适用于对数据完整性与正确性有一定要求的topology,如果认为偶尔一两条数据的处理失败无伤大雅,则大可以取消该机制以获得性能上的提升.
使用方式
在描述具体的启停实现前,需要先熟悉两个Bolt : BaseBasicBolt, BaseRichBolt, 他们的区别在于继承了BaseBasicBolt的bolt,每一个tuple处理完会被自动ack, 而BaseRichBolt的子类则需要自己处理ack相关的逻辑。启用ack
设置acker数量
首先需要确保topology的acker数量不能为0, 调用以下方法可以修改拓扑的acker数量:conf.setNumAckers(1);
为spout发射的tuple指定msgId
spout在emit一个tuple的时候,必须为每一个tuple指定一个全局唯一的msgId作为状态监控的依据,如果未指定msgId,则该tuple不会被ack或fail,可以看下SpoutOutputCollector类里对相关方法的说明:带有msgId的: /** * Emits a new tuple to the default output stream with the given message ID. * When Storm detects that this tuple has been fully processed, or has * failed to be fully processed, the spout will receive an ack or fail * callback respectively with the messageId as long as the messageId was not * null. If the messageId was null, Storm will not track the tuple and no * callback will be received. The emitted values must be immutable. * * @return the list of task ids that this tuple was sent to */ public List<Integer> emit(List<Object> tuple, Object messageId) { return emit(Utils.DEFAULT_STREAM_ID, tuple, messageId); } 不带msgId的: /** * Emits a tuple to the default output stream with a null message id. Storm * will not track this message so ack and fail will never be called for this * tuple. The emitted values must be immutable. */ public List<Integer> emit(List<Object> tuple) { eturn emit(tuple, null); }
bolt的处理
在满足前面两个条件的情况下,bolt任务直接继承BaseBasicBolt即可保证每一个tuple可以被正确监控。如果继承了BaseRichBolt,想自己处理ack的业务逻辑,也很简单,需要关注两点:emit: 在emit下一个tuple的时候,需要anchor输入的tuple,具体为调用OutputCollector类中带anchor参数的emit方法.
ack: 在执行完毕后需要ack输入的tuple,可以直接参考下BaseBasicBolt里对ack的处理:
public void execute(Tuple input) { _collector.setContext(input); try { _bolt.execute(input, _collector); _collector.getOutputter().ack(input); } catch (FailedException e) { if (e instanceof ReportedFailedException) { _collector.reportError(e); } _collector.getOutputter().fail(input); } }
可以看到,在业务逻辑执行完毕后,调用ack()方法提交状态,如果发生异常,则调用fail()方法。 同时可以发现,继承BaseBasicBolt时,如果执行体抛出的不是FailedException的话,任务处理线程不会自动fail(),这时线程会发生什么?
停用ack
全部停用
要停止对整条topology任务流的追踪有两个方案,任选其一即可:调用conf.setNumAckers(0),当acker num为0时,spout发出的tuple会被立即ack,storm不会追踪数据的处理结果.
当spout发射tuple时,不为其指定msgId.
部分停用
如果不关心某一bolt之后的处理结果,只想确保之前的任务流能被成功处理,则处于转折点的bolt需要继承BaseRichBolt,并且在emit新的tuple时,不与之前的tuple锚定,方式为调用OutputCollector的相关方法:/** * Emits a new unanchored tuple to the specified stream. Because it's * unanchored, if a failure happens downstream, this new tuple won't affect * whether any spout tuples are considered failed or not. The emitted values * must be immutable. * * @param streamId the stream to emit to * @param tuple the new output tuple from this bolt * @return the list of task ids that this new tuple was sent to */ public List<Integer> emit(String streamId, List<Object> tuple) { return emit(streamId, (List) null, tuple); }
实现原理
ack的实现原理十分精妙,作者大才...Storm的ack机制是利用异或运算的特殊性质来实现的,异或运算相关性质:
a ^ a = 0 a ^ 0 = a a ^ b ^ a = b a ^ b = b ^ a a ^ b ^ c = a ^ (b ^ c) = (a ^ b) ^ c;
在Storm中,每一个作业流都有一个初始状态值ack-val:0,每一个tuple在被发射的时候会有一个anchor操作,完成时有一个ack操作, 这两个操作实际上都是与ack-val进行异或操作,由异或的性质可以知道,如果所有tuple都被成功处理,则ack-val的最终值一定会等于初始值,也就是0:
ack-val ^ 1001(emit) ^ 1002(emit) ^ 1001(ack) ^ 1003(emit) ^ 1002(ack) ^ 1003(ack) = ack-val
这样通过检测ack-val的最终值就能得知任务流的执行结果,从而决定是ack还是fail.
需要关注的点
继承BaseBasicBolt时,如果方法抛出的异常不是FailedException,storm不会调用fail方法,task线程执行失败。bolt确认结果为fail时,spout会进行tuple重发,但上一轮处理过程中被持久化到第三方存储系统的数据无法回滚,需要自己增加额外的措施来保证数据不会被重复写入。JStorm自己实现了一套事务机制,还未研究。
spout为了实现数据重发,需要自己维护一份缓存信息,处理ack和fail方法。需要有重试次数的限制,否则一直失败的消息会占用大量的内存。
相关文章推荐
- 大数据实时处理storm,jstorm
- JStorm之Topology提交服务端
- jstorm 0.9.6.2安装配置
- jstorm
- jstorm系列-2:入门
- jstorm简介(转)
- JStorm集群的部署
- RabbitMQ消息队列:ACK机制
- JStorm如何保证消息不丢失
- Spark streaming vs JStorm
- JStorm学习笔记-基于Kafka、ElasticSearch、HBase简单实例
- [置顶] JStorm与Storm源码分析(五)--SpoutOutputCollector与代理模式
- JStorm集群的部署
- CentOS 6.8 实战部署JStorm集群
- JStorm注意事项
- 大数据之jstorm,storm,hbase,hadoop and so on
- (一个)kafka-jstorm集群实时日志分析 它 ---------kafka实时日志处理
- 阿里巴巴加入Apache基金会并捐赠项目JStorm
- kafka + jstorm版本升级安装(三)