storm是如何保证at least once语义的
2015-07-24 13:57
453 查看
问题导读
1.spout、bolt、acker的关系是什么?
2.storm如何如何追踪消息(tuple)的处理?
3.2.storm示例说明什么问题?
背景
[align=left]本篇看看storm是通过什么机制来保证消息至少处理一次的语义的。[/align]storm中的一些原语
要说明上面的问题,得先了解storm中的一些原语,比如:
tuple和message
在storm中,消息是通过tuple来抽象表示的,每个tuple知道它从哪里来,应往哪里去,包含了其在tuple-tree(如果是anchored的话)或者DAG中的位置,等等信息。
spout
spout充当了tuple的发送源,spout通过和其它消息源,比如kafka交互,将消息封装为tuple,发送到流的下游。
bolt
bolt是tuple的实际处理单元,通过从spout或者另一个bolt接收tuple,进行业务处理,将自己加入tuple-tree(通过在emit方法中设置anchors)或DAG,然后继续将tuple发送到流的下游。
acker
acker是一种特殊的bolt,其接收来自spout和bolt的消息,主要功能是追踪tuple的处理情况,如果处理完成,会向tuple的源头spout发送确认消息,否则,会发送失败消息,spout收到失败的消息,根据配置和自定义的情况会进行消息的丢弃、重放处理。
spout、bolt、acker的关系
spout将tuple发送给流的下游的bolts.
bolt收到tuple,处理后发送给下游的bolts.
spout向acker发送请求ack的消息.
bolt向acker发送请求ack的消息.
acker向bolt和spout发送确认ack的消息.
[align=left]简单的关系如下所示:[/align]
[align=left]上图展示了spout、bolts等形成了一个DAG,如何追踪这个DAG的执行过程,就是storm保证仅处理一次消息的语义的机制所在。[/align]storm如何追踪消息(tuple)的处理
[align=left]spout在调用emit/emitDirect方法发送tuple时,会以单播或者广播的方式,将消息发送给流的下游的component/task/bolt,如果配置了acker,那么会在每次emit调用之后,向acker发送请求ack的消息:[/align]
[Bashshell]纯文本查看复制代码
[align=left]从上面的代码可以看出,每次emittuple后,spout会向acker发送一个流ID为ACKER-INIT-STREAM-ID的消息,用于将DAG或者tuple-tree中的节点信息交给acker,acker会利用这个信息来追踪tuple-tree或DAG的完成。[/align]
[align=left]而spout调用emit/emitDirect方法,将tuple发到下游的bolts,也同时会发送用于追踪DAG完成情况的信息:[/align]
[Bashshell]纯文本查看复制代码
[align=left]这个追踪信息是什么呢?[/align]
[align=left]如果是spout->bolt或者bolt->bolt,这个信息就是tuple的MessageId,其内部维护一个哈希表:[/align]
[Bashshell]纯文本查看复制代码
[align=left]键为root-id,表示spout,值表示tuple在tuple-tree或者DAG的根(spout)或者经过的边(bolt),但这里没有利用任何常规意义上的“树”的算法,而是采用异或的方式来存储这个值:[/align]
spout->bolt,值被初始化为一个long型64位整数.
bolt->bolt,值被初始化为一个long型64位整数,并和_anchorsToIds中的旧值进行按位异或,将结果更新到_anchorsToIds中.
[align=left]如果是spout->acker,或者bolt->acker,那么用于追踪的是tuple的values:[/align]
spout->acker:[root-id(bit-xor-valsout-ids)task-id]bolt->acker:[root(bit-xoridack-val)..]
[align=left]下面给出上面调用的bit-xor-vals和bit-xor方法的代码:[/align]
[Bashshell]纯文本查看复制代码
示例
[align=left]说起来有点抽象,看个例子。[/align][align=left]假设我们有1个spout,n个bolt,1个acker:[/align]1.spout
[align=left]spout发送tuple到下游的bolts:[/align]
[Bashshell]纯文本查看复制代码
2.bolt
[align=left]bolt收到tuple,在execute方法中进行必要的处理,然后调用emit方法,最后调用ack方法:[/align]
[Bashshell]纯文本查看复制代码
[align=left]以上,可以看出bolt进行了emit-ack组合后,其自身在异或链中的作用消失了,也就是说tuple在此bolt得到了处理。[/align][align=left](当然,此时的ack还没有得到acker的确认,假设acker确认了,那么上面所说的tuple在bolt得到了处理就成立了。)[/align][align=left]来看看acker的确认。[/align]3.acker
[align=left]acker收到来自spout的tuple:[/align]
[Bashshell]纯文本查看复制代码
[align=left]acker收到来自bolt的tuple:[/align]
[Bashshell]纯文本查看复制代码
[align=left]可以看出,bolt_1向acker请求ack,acker收到请求ack,异或之后,id_1的作用消失。也就是说,bolt_1已处理完毕这个tuple。[/align]
[align=left]所以,在acker看来,如果某个bolt的处理完成,则此bolt在异或链中的作用就消失了。[/align]
[align=left]如果所有的bolt都得到处理,那么acker将会观察到ackVal值变成了0:[/align]
[Bashshell]纯文本查看复制代码
[align=left]如果出现了ackVal=0,说明两个可能:[/align]
spout发送的tuple都处理完成,tuple-tree或者DAG已完成。
概率性出错,也就是说在极小的概率下,即使不按上面的确认流程来走,异或链的结果也可能出现0.但这个概率极小,小到什么程度呢?
用官方的话说就是,如果每秒发送1万个ack消息,50,000,000年时才可能发生这种情况。
[align=left]如果ackVal不为0,说明tuple-tree或DAG没有完成。如果长时间不为0,通过超时,可以触发一个超时回调,在这个回调中调用spout的fail方法,来进行重放。[/align][align=left]如此,就保证了消息处理不会漏掉,但可能会重复。[/align]
结语
以上,就是storm保证消息至少处理一次的语义的机制。
1.spout、bolt、acker的关系是什么?
2.storm如何如何追踪消息(tuple)的处理?
3.2.storm示例说明什么问题?
背景
[align=left]本篇看看storm是通过什么机制来保证消息至少处理一次的语义的。[/align]storm中的一些原语
要说明上面的问题,得先了解storm中的一些原语,比如:
tuple和message
在storm中,消息是通过tuple来抽象表示的,每个tuple知道它从哪里来,应往哪里去,包含了其在tuple-tree(如果是anchored的话)或者DAG中的位置,等等信息。
spout
spout充当了tuple的发送源,spout通过和其它消息源,比如kafka交互,将消息封装为tuple,发送到流的下游。
bolt
bolt是tuple的实际处理单元,通过从spout或者另一个bolt接收tuple,进行业务处理,将自己加入tuple-tree(通过在emit方法中设置anchors)或DAG,然后继续将tuple发送到流的下游。
acker
acker是一种特殊的bolt,其接收来自spout和bolt的消息,主要功能是追踪tuple的处理情况,如果处理完成,会向tuple的源头spout发送确认消息,否则,会发送失败消息,spout收到失败的消息,根据配置和自定义的情况会进行消息的丢弃、重放处理。
spout、bolt、acker的关系
spout将tuple发送给流的下游的bolts.
bolt收到tuple,处理后发送给下游的bolts.
spout向acker发送请求ack的消息.
bolt向acker发送请求ack的消息.
acker向bolt和spout发送确认ack的消息.
[align=left]简单的关系如下所示:[/align]
[align=left]上图展示了spout、bolts等形成了一个DAG,如何追踪这个DAG的执行过程,就是storm保证仅处理一次消息的语义的机制所在。[/align]storm如何追踪消息(tuple)的处理
[align=left]spout在调用emit/emitDirect方法发送tuple时,会以单播或者广播的方式,将消息发送给流的下游的component/task/bolt,如果配置了acker,那么会在每次emit调用之后,向acker发送请求ack的消息:[/align]
[Bashshell]纯文本查看复制代码
01 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;; |
02 | ;;spout向acker发送请求ack消息 |
03 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;; |
04 |
05 | ;;rooted?表示是否设置了acker |
06 | ( if (androoted? |
07 | (not(.isEmptyout-ids))) |
08 | ( do |
09 | (.putpendingroot- id [task- id |
10 | message- id |
11 | {:streamout-stream- id :valuesvalues} |
12 | ( if (sampler)(System/currentTimeMillis))]) |
13 | (task/send-unanchoredtask-data |
14 | ;;表示这是一个流初始化的消息 |
15 | ACKER-INIT-STREAM-ID |
16 | ;;将下游组件的out- id 和0组成一个异或链,发送给acker用于追踪 |
17 | [root- id (bit-xor-valsout-ids)task- id ] |
18 | overflow-buffer)) |
19 |
20 | ;;如果没有配置acker,则调用自身的ack方法 |
21 | (whenmessage- id |
22 | (ack-spout-msgexecutor-datatask-datamessage- id |
23 | {:streamout-stream- id :valuesvalues} |
24 | ( if (sampler)0) "0:" ))) |
[align=left]而spout调用emit/emitDirect方法,将tuple发到下游的bolts,也同时会发送用于追踪DAG完成情况的信息:[/align]
[Bashshell]纯文本查看复制代码
01 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;; |
02 | ;;spout向流的下游emit消息 |
03 | ;;;;;;;;;;;;;;;;;;;;;;;;;;;; |
04 |
05 | ( let [tuple- id ( if rooted? |
06 | ;;如果有acker,tuple的MessageId会包含一个<root- id , id >的哈希表 |
07 | ;;root- id 和 id 都是long型64位整数 |
08 | (MessageId/makeRootIdroot- id id ) |
09 | (MessageId/makeUnanchored)) |
10 | ;;实例化tuple |
11 | out-tuple(TupleImpl.worker-context |
12 | values |
13 | task- id |
14 | out-stream- id |
15 | tuple- id )] |
16 |
17 | ;;发送至队列,最终发送给流的下游的task/bolt |
18 | (transfer-fnout-task |
19 | out-tuple |
20 | overflow-buffer) |
21 | )) |
[align=left]如果是spout->bolt或者bolt->bolt,这个信息就是tuple的MessageId,其内部维护一个哈希表:[/align]
[Bashshell]纯文本查看复制代码
01 | //mapanchorto id |
02 | privateMap<Long,Long>_anchorsToIds; |
spout->bolt,值被初始化为一个long型64位整数.
bolt->bolt,值被初始化为一个long型64位整数,并和_anchorsToIds中的旧值进行按位异或,将结果更新到_anchorsToIds中.
[align=left]如果是spout->acker,或者bolt->acker,那么用于追踪的是tuple的values:[/align]
spout->acker:[root-id(bit-xor-valsout-ids)task-id]bolt->acker:[root(bit-xoridack-val)..]
[align=left]下面给出上面调用的bit-xor-vals和bit-xor方法的代码:[/align]
[Bashshell]纯文本查看复制代码
01 | (defnbit-xor-vals |
02 | [vals] |
03 | (reducebit-xor0vals)) |
04 |
05 | (defnbit-xor |
06 | "Bitwiseexclusiveor" |
07 | {:inline(nary-inline'xor) |
08 | :inline-arities>1? |
09 | :added "1.0" } |
10 | ([xy](.clojure.lang.Numbersxorxy)) |
11 | ([xy& more ] |
12 | (reduce1bit-xor(bit-xorxy) more ))) |
[align=left]说起来有点抽象,看个例子。[/align][align=left]假设我们有1个spout,n个bolt,1个acker:[/align]1.spout
[align=left]spout发送tuple到下游的bolts:[/align]
[Bashshell]纯文本查看复制代码
01 | ;;id_1是发送到bolt_1的tuple- id ,依此类推 |
02 | spout: |
03 | ->bolt_1:id_1 |
04 | ->bolt_2:id_2 |
05 | .. |
06 | ->bolt_n:id_n |
[align=left]bolt收到tuple,在execute方法中进行必要的处理,然后调用emit方法,最后调用ack方法:[/align]
[Bashshell]纯文本查看复制代码
01 | ;;bolt_1调用emit方法,追踪消息的这样一个值:让id_1和bid_1按位进行异或. |
02 | ;;bid_1和id_1类似,是个long型的64位随机整数,在emit这一步生成 |
03 | bolt_1emit:id_1^bid_1 |
04 |
05 | ;;bolt_1调用ack方法,并将值表达为如下方式的异或链的结果 |
06 | bolt_1ack:0^bid_1^id_1^bid_1=0^id_1 |
[align=left]acker收到来自spout的tuple:[/align]
[Bashshell]纯文本查看复制代码
01 | ;;spout发消息给acker,tuple的MessageId包含下面的异或链的结果 |
02 | spout->acker:0^id_1^id_2^..^id_n |
03 |
04 | ;;acker收到来spout的消息,对tuple的ackVal进行处理,如下所示: |
05 | acker:0^(0^id_1^id_2^..^id_n)=0^id_1^id_2^..^id_n |
[Bashshell]纯文本查看复制代码
01 | ;;bolt_1发消息给acker: |
02 | bolt_1->acker:0^id_1 |
03 |
04 | ;;acker维护的对应此tuple的源spout的ackVal: |
05 | ackVal:0^id_1^id_2^..^id_n |
06 |
07 | ;;acker进行确认,也就是拿上面的两个值进行异或: |
08 | acker:(0^id_1)^(0^id_1^id_2^..^id_n)=0^id_2^..^id_n |
[align=left]所以,在acker看来,如果某个bolt的处理完成,则此bolt在异或链中的作用就消失了。[/align]
[align=left]如果所有的bolt都得到处理,那么acker将会观察到ackVal值变成了0:[/align]
[Bashshell]纯文本查看复制代码
01 | ackVal=0 |
02 | =(0^id_1)^(0^id_1^..^id_n)^..(0^id_n) |
03 | =(0^0)^(id_1^id_1)^(id_2^id_2)^..^(id_n^id_n) |
spout发送的tuple都处理完成,tuple-tree或者DAG已完成。
概率性出错,也就是说在极小的概率下,即使不按上面的确认流程来走,异或链的结果也可能出现0.但这个概率极小,小到什么程度呢?
用官方的话说就是,如果每秒发送1万个ack消息,50,000,000年时才可能发生这种情况。
[align=left]如果ackVal不为0,说明tuple-tree或DAG没有完成。如果长时间不为0,通过超时,可以触发一个超时回调,在这个回调中调用spout的fail方法,来进行重放。[/align][align=left]如此,就保证了消息处理不会漏掉,但可能会重复。[/align]
结语
以上,就是storm保证消息至少处理一次的语义的机制。
相关文章推荐
- 微信公众平台和微信开放平台的区别
- 相对路径 System.Web HttpServerUtilityBase Server.MapPath("~/")
- JS设计模式
- 该 App 的 Info.plist 不能包含阻碍在任何 iOS 设备上打开此 App 的 UIRequiredDeviceCapabilities 键值
- Linux下高性能网络编程中的几个TCP/IP选项_SO_REUSEADDR、SO_RECVBUF、SO_SNDBUF、SO_KEEPALIVE、SO_LINGER、TCP_CORK、TCP_NODE
- 测试
- Angular的一些知识点
- 同步机制摘要
- 创业项目招一起发展的java
- Swift Standard Library: Documented and undocumented built-in functions in the Swift standard library – the complete list with all 74 functions
- php计算两个经纬度地点之间的距离(转)
- 网络上一些基本知识的理解
- 【Caffe】训练MNIST数据集模型
- iOS开发UI篇—UIScrollView控件介绍
- 2015 Multi-University Training Contest 2
- HDU 5308 I Wanna Become A 24-Point Master
- 字典树 hdu 1251
- [JS学习笔记]Event对象
- C#关键字列表
- 2015 Multi-University Training Contest 2