Jstorm可靠性分析
2016-05-31 10:08
387 查看
去掉storm可靠性有三种方式:
1、Config.TOPOLOGY_ACKERS 设置为0;
2、在发送数据时不带上mesage id;
3、将tuple不做anchor发送到下一个节点,因为没有anchor到任何spout的tuple,就算没有成功处理被ack,也不会造成tuple fail
通过SpoutCollector, emit方法最后会调用sendMsg方法,其中判断条件为:
boolean needAck = (message_id != null ) && (ackNum > 0);
如果设置storm的信息处理不需要可靠性保证,spout的等待队列一直为空,因为无需缓存数据。
保证storm可靠性,需要构建tuple树,如果整个树在规定超时时间内不能没有处理完成,则认为是失败。为保证tuple树的构造:
1、在Spout发送数据时带上messageId(collector.emit(new Values(“test),msgId)),这样,当storm认为该tuple被成功处理,将调用spout的ack方法,并将msgId传给spout
2、在每个bolt处理tuple的时候,因tuple树需要生长,所以在bolt处理完tuple并发送数据的时候,需要通知storm在tuple树添加新的链路,通过anchor即可,anchor链接一个节点时,emit(tuple, new Value(“test”));如果需要同时链接两个节点,通过emit(List<Tupple>, new Value(“test”))
因为storm通过内存追踪元组的状态,所以需要保证可靠性时,一定要ack或者fail,不然会耗尽内存, messageId在发射数据时,根据anchors去生成新的messageId
参考 BoltCollector代码:
实现Bolt有两种方式,一种是实现IRichBolt,一种是继承实现IBasicBolt的BasicBolt,在TopologyBuilder中,可以看到在调用setBolt的方法时,对比两种类型的实现方法:
IRichBolt:
在该方法中,直接将IRichBolt作为参数传入topologyBuilder中初始化。
IBasicBolt:
将IBasicBolt包装成BasicBoltExecutor,这个类中的execute方法将会自动ack tuple,这也就是为什么继承BasicBolt会自动ack tuple而无需手动ack:
1、Config.TOPOLOGY_ACKERS 设置为0;
2、在发送数据时不带上mesage id;
3、将tuple不做anchor发送到下一个节点,因为没有anchor到任何spout的tuple,就算没有成功处理被ack,也不会造成tuple fail
通过SpoutCollector, emit方法最后会调用sendMsg方法,其中判断条件为:
boolean needAck = (message_id != null ) && (ackNum > 0);
如果设置storm的信息处理不需要可靠性保证,spout的等待队列一直为空,因为无需缓存数据。
保证storm可靠性,需要构建tuple树,如果整个树在规定超时时间内不能没有处理完成,则认为是失败。为保证tuple树的构造:
1、在Spout发送数据时带上messageId(collector.emit(new Values(“test),msgId)),这样,当storm认为该tuple被成功处理,将调用spout的ack方法,并将msgId传给spout
2、在每个bolt处理tuple的时候,因tuple树需要生长,所以在bolt处理完tuple并发送数据的时候,需要通知storm在tuple树添加新的链路,通过anchor即可,anchor链接一个节点时,emit(tuple, new Value(“test”));如果需要同时链接两个节点,通过emit(List<Tupple>, new Value(“test”))
因为storm通过内存追踪元组的状态,所以需要保证可靠性时,一定要ack或者fail,不然会耗尽内存, messageId在发射数据时,根据anchors去生成新的messageId
参考 BoltCollector代码:
for (Integer t : out_tasks) { MessageId msgid = getMessageId(anchors); TupleImplExt tp = new TupleImplExt(topologyContext, values, task_id, out_stream_id, msgid); tp.setTargetTaskId(t); taskTransfer.transfer(tp); }
protected MessageId getMessageId(Collection<Tuple> anchors) { Map<Long, Long> anchors_to_ids = new HashMap<Long, Long>(); if (anchors != null) { for (Tuple a : anchors) { Long edge_id = MessageId.generateId(random); put_xor(pending_acks, a, edge_id); for (Long root_id : a.getMessageId().getAnchorsToIds().keySet()) { put_xor(anchors_to_ids, root_id, edge_id); } } }
实现Bolt有两种方式,一种是实现IRichBolt,一种是继承实现IBasicBolt的BasicBolt,在TopologyBuilder中,可以看到在调用setBolt的方法时,对比两种类型的实现方法:
IRichBolt:
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) { validateUnusedId(id); initCommon(id, bolt, parallelism_hint); _bolts.put(id, bolt); return new BoltGetter(id); }
在该方法中,直接将IRichBolt作为参数传入topologyBuilder中初始化。
IBasicBolt:
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) { return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint); }
将IBasicBolt包装成BasicBoltExecutor,这个类中的execute方法将会自动ack tuple,这也就是为什么继承BasicBolt会自动ack tuple而无需手动ack:
public void execute(Tuple input) { _collector.setContext(input); try { _bolt.execute(input, _collector); _collector.getOutputter().ack(input); } catch (FailedException e) { if (e instanceof ReportedFailedException) { _collector.reportError(e); } _collector.getOutputter().fail(input); } }
相关文章推荐
- js中的面向对象
- js弹出框、对话框、提示框、弹窗实现方法总结(推荐)
- Ajax 和 JavaScript 验证用户登录
- js和andorid之间相互调用
- JavaScript里面三个等号和两个等号有什么区别?
- javascript事件模型介绍
- 使用JavaScript实现弹出层效果的简单实例
- JVM虚拟机内存量详细显示--JSP网页版
- js正则表达式语法
- 浅析JSONP解决Ajax跨域访问问题的思路详解
- javascript事件处理模型实例说明
- 原生js实现的星级评分效果
- js文字循环滚动
- JS性能优化详细
- js 获取 视频时间
- 解决js中字符串格式的时间比较问题
- 浅谈JavaScript中小数和大整数的精度丢失
- 深入浅析Jsonp解决ajax跨域问题
- 为什么需要JS里常需要做两次encodeURI(url)
- javascript小数精度丢失的完美解决方法