您的位置:首页 > 运维架构

Storm-源码分析-Topology Submit-Executor-mk-threads

2013-08-05 17:22 901 查看
对于executorthread是整个storm最为核心的代码,因为在这个thread里面真正完成了大部分工作,而其他的如supervisor,worker都是封装调用.

对于executor的mk-threads,是通过mutilmethods对spout和bolt分别定义不同的逻辑

1.SpoutThread

(defmethodmk-threads:spout[executor-datatask-datas]
(let[{:keys[storm-confcomponent-idworker-contexttransfer-fnreport-errorsampleropen-or-prepare-was-called?]}executor-data
;;1.1定义pending

^ISpoutWaitStrategyspout-wait-strategy(init-spout-wait-strategystorm-conf)
max-spout-pending(executor-max-spout-pendingstorm-conf(counttask-datas))
^Integermax-spout-pending(ifmax-spout-pending(intmax-spout-pending))
last-active(atomfalse)
spouts(ArrayList.(map:object(valstask-datas)))
rand(Random.(Utils/secureRandomLong))
pending(RotatingMap.
2;;microoptimizeforperformanceof.sizemethod
(reifyRotatingMap$ExpiredCallback
(expire[thismsg-id[task-idspout-idtuple-infostart-time-ms]]
(let[time-delta(ifstart-time-ms(time-delta-msstart-time-ms))];;start-time-ms是取样赋值的,一般为null,只有有start-time-ms,才会产生time-delta
(fail-spout-msgexecutor-data(gettask-datastask-id)spout-idtuple-infotime-delta)
))))


;;1.2定义tuple-action-fn
tuple-action-fn(fn[task-id^TupleImpltuple]
(let[stream-id(.getSourceStreamIdtuple)]
(condp=stream-id
Constants/SYSTEM_TICK_STREAM_ID(.rotatepending)
Constants/METRICS_TICK_STREAM_ID(metrics-tickexecutor-datatask-datastuple)
(let[id(.getValuetuple0);;tuplevalues,values[0]为id
[stored-task-idspout-idtuple-finished-infostart-time-ms](.removependingid)];;从pending中删除tuple,重要!
(whenspout-id
(when-not(=stored-task-idtask-id)
(throw-runtime"Fatalerror,mismatchedtaskids:"task-id""stored-task-id))
(let[time-delta(ifstart-time-ms(time-delta-msstart-time-ms))]
(condp=stream-id
ACKER-ACK-STREAM-ID(ack-spout-msgexecutor-data(gettask-datastask-id);;ack
spout-idtuple-finished-infotime-delta)
ACKER-FAIL-STREAM-ID(fail-spout-msgexecutor-data(gettask-datastask-id);;fail
spout-idtuple-finished-infotime-delta)
)))
;;TODO:onfailure,emittupletofailurestream
))))
receive-queue(:receive-queueexecutor-data);;取得receivedisruptorqueue
event-handler(mk-task-receiverexecutor-datatuple-action-fn);;定义disruptor/clojure-handler,使用tuple-action-fn处理从receive-queue里面得到的tuple
has-ackers?(has-ackers?storm-conf)
emitted-count(MutableLong.0)
empty-emit-streak(MutableLong.0)

;;theoverflowbufferisusedtoensurethatspoutsneverblockwhenemitting
;;thisensuresthatthespoutcanalwayscleartheincomingbuffer(acksandfails),which
;;preventsdeadlockfromoccuringacrossthetopology(e.g.Spout->Bolt->Acker->Spout,andall
;;buffersfilledup)
;;whentheoverflowbufferisfull,spoutsstopcallingnextTupleuntilit'sabletocleartheoverflowbuffer
;;thislimitsthesizeoftheoverflowbuffertohowevermanytuplesaspoutemitsinonecallofnextTuple,
;;preventingmemoryissues
overflow-buffer(LinkedList.)]

;;1.3async-loopthread
[(async-loop
(fn[]
;;Iftopologywasstartedininactivestate,don'tcall(.openspout)untilit'sactivatedfirst.
(while(not@(:storm-active-atomexecutor-data))
(Thread/sleep100))

(log-message"Openingspout"component-id":"(keystask-datas))
(doseq[[task-idtask-data]task-datas
:let[^ISpoutspout-obj(:objecttask-data)
tasks-fn(:tasks-fntask-data)

;;1.3.1send-spout-msg
send-spout-msg(fn[out-stream-idvaluesmessage-idout-task-id]
(.incrementemitted-count)
(let[out-tasks(ifout-task-id
(tasks-fnout-task-idout-stream-idvalues);;directgrouping
(tasks-fnout-stream-idvalues));;调用grouper产生targettasks
rooted?(andmessage-idhas-ackers?);;指定messageid并且有acker,说明需要track该message,root?意思需要track的DAG的root
root-id(ifrooted?(MessageId/generateIdrand));;rand.nextLong,随机long,产生root-id
out-ids(fast-list-for[tout-tasks](ifrooted?(MessageId/generateIdrand)))];;对于发送到的每个task,产生一个out-id(out-edgeid)
(fast-list-iter[out-taskout-tasksidout-ids]
(let[tuple-id(ifrooted?
(MessageId/makeRootIdroot-idid);;返回包含hashmap{root-id,out-id}的MessageId对象

(MessageId/makeUnanchored));;返回包含hashmap{}的MessageId对象

out-tuple(TupleImpl.worker-context;;生成tuple对象

values
task-id
out-stream-id
tuple-id)]
(transfer-fnout-task;;调用executor->transfer-fn将tuple发送到spout的发送queue
out-tuple
overflow-buffer)))
(ifrooted?
(do;;如果需要跟踪

(.putpendingroot-id[task-id;;往pendingqueue增加需要track的tuple信息

message-id
{:streamout-stream-id:valuesvalues}
(if(sampler)(System/currentTimeMillis))]);;只有sampler为true,才会设置starttime,后面才会更新metrics和stats
(task/send-unanchoredtask-data;;往ACKER-INIT-STREAM发送message,告诉ackertrack该message
ACKER-INIT-STREAM-ID
[root-id(bit-xor-valsout-ids)task-id]
overflow-buffer))
(whenmessage-id;;rooted?为false,而有message-id,意味着没有acker(has-ackers?为false)
(ack-spout-msgexecutor-datatask-datamessage-id;;既然没有acker,就直接ack
{:streamout-stream-id:valuesvalues}
(if(sampler)0))))
(orout-tasks[]);;send-spout-msg返回值,发送的tasklists或空[]
))]]
(builtin-metrics/register-all(:builtin-metricstask-data)storm-conf(:user-contexttask-data));;注册builtin-metrics

;;1.3.2spout.open
(.openspout-obj
storm-conf
(:user-contexttask-data)
(SpoutOutputCollector.
(reifyISpoutOutputCollector;;实现ISpoutOutputCollector
(^Listemit[this^Stringstream-id^Listtuple^Objectmessage-id];;实现emit
(send-spout-msgstream-idtuplemessage-idnil)
)
(^voidemitDirect[this^intout-task-id^Stringstream-id
^Listtuple^Objectmessage-id]
(send-spout-msgstream-idtuplemessage-idout-task-id)
)
(reportError[thiserror]
(report-errorerror)
)))))
(reset!open-or-prepare-was-called?true)
(log-message"Openedspout"component-id":"(keystask-datas))

;;1.3.3setup-metrics!
(setup-metrics!executor-data);;使用schedule-recurring定期给自己发送METRICS_TICKtuple

(disruptor/consumer-started!(:receive-queueexecutor-data));;设置queue上面的consumerStartedFlag表示consumer已经启动
;;1.3.4fn

(fn[]
;;Thisdesignrequiresthatspoutsbenon-blocking
(disruptor/consume-batchreceive-queueevent-handler);;从recieve-queue取出batchtuples,并使用tuple-action-fn处理

;;trytocleartheoverflow-buffer,将overflow-buffer里面的数据放到发送的缓存queue里面
(try-cause
(while(not(.isEmptyoverflow-buffer))
(let[[out-taskout-tuple](.peekoverflow-buffer)]
(transfer-fnout-taskout-tuplefalsenil)
(.removeFirstoverflow-buffer)))
(catchInsufficientCapacityExceptione
))

(let[active?@(:storm-active-atomexecutor-data)
curr-count(.getemitted-count)]
(if(and(.isEmptyoverflow-buffer);;只有当overflow-buffer为空,并且pending没有达到上限的时候,spout可以继续emittuple
(or(notmax-spout-pending)
(<(.sizepending)max-spout-pending)))
(ifactive?;;storm集群是否active
(do;;stormactive
(when-not@last-active;;如果当前spout出于unactive状态
(reset!last-activetrue)
(log-message"Activatingspout"component-id":"(keystask-datas))
(fast-list-iter[^ISpoutspoutspouts](.activatespout)));;先activespout

(fast-list-iter[^ISpoutspoutspouts](.nextTuplespout)));;调用nextTuple,产生新的tuple
(do;;stormunactive
(when@last-active;;如果spout出于active状态
(reset!last-activefalse)
(log-message"Deactivatingspout"component-id":"(keystask-datas))
(fast-list-iter[^ISpoutspoutspouts](.deactivatespout)));;deactivespout并休眠
;;TODO:logthatit'sgettingthrottled
(Time/sleep100))))
(if(and(=curr-count(.getemitted-count))active?);;没有能够emit新的tuple(前后emitted-count没有变化)
(do(.incrementempty-emit-streak)
(.emptyEmitspout-wait-strategy(.getempty-emit-streak)));;调用spout-wait-strategy进行sleep
(.setempty-emit-streak0)
))
0));;返回0,表示async-loop的sleep时间为0
:kill-fn(:report-error-and-dieexecutor-data)
:factory?true
:thread-namecomponent-id)]))


1.1定义pending

spout在emittuple后,会等待ack或fail,所以这些tuple暂时不能直接从删掉,只能先放入pending队列,直到最终被ack或fail后,才能被删除

首先,tuplepending的个数是有限制的,p*num-tasks
p是TOPOLOGY-MAX-SPOUT-PENDING,num-tasks是spout的task数

max-spout-pending(executor-max-spout-pendingstorm-conf(counttask-datas))
(defnexecutor-max-spout-pending[storm-confnum-tasks]
(let[p(storm-confTOPOLOGY-MAX-SPOUT-PENDING)]
(ifp(*pnum-tasks))))


然后,spouts需要两种情况下需要wait,nextTuple为空,或达到maxSpoutPending上限

/**
*Thestrategyaspoutneedstousewhenitswaiting.Waitingis
*triggeredinoneoftwoconditions:
*
*1.nextTupleemitsnotuples
*2.ThespouthashitmaxSpoutPendingandcan'temitanymoretuples
*
*Thedefaultstrategysleepsforonemillisecond.
*/
publicinterfaceISpoutWaitStrategy{
voidprepare(Mapconf);
voidemptyEmit(longstreak);
}


默认的wait策略是,sleep1毫秒,可以在TOPOLOGY-SPOUT-WAIT-STRATEGY上配置特有的waitstrategyclass

^ISpoutWaitStrategyspout-wait-strategy(init-spout-wait-strategystorm-conf)


最后,定义pending的结构,并且pending是会设置超时的,不然万一后面的blot发生问题,会导致spoutblock

pending(RotatingMap.
2;;microoptimizeforperformanceof.sizemethod,buckets数为2
(reifyRotatingMap$ExpiredCallback
(expire[thismsg-id[task-idspout-idtuple-infostart-time-ms]]
(let[time-delta(ifstart-time-ms(time-delta-msstart-time-ms))]
(fail-spout-msgexecutor-data(gettask-datastask-id)spout-idtuple-infotime-delta)
))))



RotatingMap(backtype.storm.utils),是无cleaner线程版的TimeCacheMap(Stormstarter-SingleJoinExample)

其他的基本一致,主要数据结构为,LinkedList<HashMap<K,V>>_buckets;

最主要的操作是rotate,删除旧bucket,添加新bucket

publicMap<K,V>rotate(){
Map<K,V>dead=_buckets.removeLast();
_buckets.addFirst(newHashMap<K,V>());
if(_callback!=null){
for(Entry<K,V>entry:dead.entrySet()){
_callback.expire(entry.getKey(),entry.getValue());
}
}
returndead;
}


但RotatingMap需要外部的计数器来触发rotate,storm是通过SYSTEM_TICK来触发,下面会看到


1.2定义tuple-action-fn

tuple-action-fn,处理不同stream的tuple

1.2.1SYSTEM_TICK_STREAM_ID

(.rotatepending)rotatepending列表

1.2.2METRICS_TICK_STREAM_ID

执行(metrics-tickexecutor-datatask-datastuple)

触发component发送builtin-metrics的data,到METRICS_STREAM,最终发送到metric-bolt统计当前的component处理tuples的情况

具体逻辑,就是创建task-info和data-points,并send到METRICS_STREAM

(defnmetrics-tick[executor-datatask-datas^TupleImpltuple]
(let[{:keys[interval->task->metric-registry^WorkerTopologyContextworker-context]}executor-data
interval(.getIntegertuple0)];;metricsticktuple的values[0]表示interval
(doseq[[task-idtask-data]task-datas
:let[name->imetric(->interval->task->metric-registry(getinterval)(gettask-id));;topologycontext的_registeredMetrics实际指向interval->task->metric-registry
task-info(IMetricsConsumer$TaskInfo.
(.(java.net.InetAddress/getLocalHost)getCanonicalHostName)
(.getThisWorkerPortworker-context)
(:component-idexecutor-data)
task-id
(long(/(System/currentTimeMillis)1000))
interval)
data-points(->>name->imetric
(map(fn[[nameimetric]]
(let[value(.getValueAndReset^IMetricimetric)]
(ifvalue
(IMetricsConsumer$DataPoint.namevalue)))))
(filteridentity)
(into[]))]]
(if(seqdata-points)
(task/send-unanchoredtask-dataConstants/METRICS_STREAM_ID[task-infodata-points])))));;将[task-infodata-points]发送到METRICS_STREAM


1.2.3default,普通tuple

对于spout而言,作为topology的source,收到的tuple只会是ACKER-ACK-STREAM或ACKER-FAIL-STREAM
所以收到tuple,取得msgid,从pending列表中删除

最终根据steamid,调用ack-spout-msg或fail-spout-msg

(defn-ack-spout-msg[executor-datatask-datamsg-idtuple-infotime-delta]
(let[storm-conf(:storm-confexecutor-data)
^ISpoutspout(:objecttask-data)
task-id(:task-idtask-data)]
(when(=true(storm-confTOPOLOGY-DEBUG))
(log-message"Ackingmessage"msg-id))
(.ackspoutmsg-id);;ack
(task/apply-hooks(:user-contexttask-data).spoutAck(SpoutAckInfo.msg-idtask-idtime-delta));;执行ackhook
(whentime-delta;;满足sample条件,更新builtin-metrics和stats
(builtin-metrics/spout-acked-tuple!(:builtin-metricstask-data)(:statsexecutor-data)(:streamtuple-info)time-delta)
(stats/spout-acked-tuple!(:statsexecutor-data)(:streamtuple-info)time-delta))))


以ack-spout-msg为例,fail基本一样,只是调用.fail而已

1.3async-loopthread

这是executor的主线程,没有使用disruptor.consume-loop来实现,是因为这里不仅仅包含对recievetuple的处理
所以使用async-loop来直接实现

前面也了解过,async-loop的实现是新开线程执行afn,返回为sleeptime,然后sleepsleeptime后继续执行afn……

这里的实现比较奇特,

在afn中只是做了准备工作,比如定义send-spout-msg,初始化spout…

然后afn,返回一个fn,真正重要的工作在这个fn里面执行了,因为sleeptime在作为函数参数的时候,也一定会先被evaluate

比较奇葩,为什么要这样...

1.3.1send-spout-msg

首先生成send-spout-msg函数,这个函数最终被emit,emitDirect调用,用于发送spoutmsg

所以逻辑就是首先根据message-id判断是否需要track,需要则利用MessageId生成root-id和out-id

然后生成tuple对象(TupleImpl)

先看看MessageId和TupleImpl的定义

这里的MessageId和emit传入的message-id没有什么关系,这个名字起的容易混淆

这里主要的操作就是通过generateId产生随机id,然后通过makeRootId,将[root-id,out-id]加入Map,anchorsToIds

packagebacktype.storm.tuple;


publicclassMessageId{
privateMap<Long,Long>_anchorsToIds;

publicstaticlonggenerateId(Randomrand){
returnrand.nextLong();
}

publicstaticMessageIdmakeUnanchored(){
returnmakeId(newHashMap<Long,Long>());
}

publicstaticMessageIdmakeId(Map<Long,Long>anchorsToIds){
returnnewMessageId(anchorsToIds);
}

publicstaticMessageIdmakeRootId(longid,longval){
Map<Long,Long>anchorsToIds=newHashMap<Long,Long>();
anchorsToIds.put(id,val);
returnnewMessageId(anchorsToIds);
}


publicclassTupleImplextendsIndifferentAccessMapimplementsSeqable,Indexed,IMeta,Tuple{
privateList<Object>values;
privateinttaskId;
privateStringstreamId;
privateGeneralTopologyContextcontext;
privateMessageIdid;
privateIPersistentMap_meta=null;

Long_processSampleStartTime=null;
Long_executeSampleStartTime=null;
}


后面做的事,使用transfer-fn将tuple发到发送queue,然后在pending中增加item用于tracking,并sendmessage到acker通知它track这个message

1.3.2spout.open,初始化spout

很简单,关键是实现ISpoutOutputCollector,emit,emitDirect

1.3.3setup-metrics!,METRICS_TICK的来源

使用schedule-recurring定期给自己发送METRICS_TICKtuple,以触发builtin-metrics的定期发送

1.3.4fn

里面做了spoutthread最关键的几件事,最终返回0,表示async-loop的sleep时间
handlerecieve-queue里面的tuple

调用nextTuple…

注意所有事情都是在一个线程里面顺序做的,所以不能有block的逻辑

2.BoltThread

(defmethodmk-threads:bolt[executor-datatask-datas]
(let[execute-sampler(mk-stats-sampler(:storm-confexecutor-data))
executor-stats(:statsexecutor-data)
{:keys[storm-confcomponent-idworker-contexttransfer-fnreport-errorsampler
open-or-prepare-was-called?]}executor-data
rand(Random.(Utils/secureRandomLong))


;;2.1tuple-action-fn
tuple-action-fn(fn[task-id^TupleImpltuple]
(let[stream-id(.getSourceStreamIdtuple)]
(condp=stream-id
Constants/METRICS_TICK_STREAM_ID(metrics-tickexecutor-datatask-datastuple)
(let[task-data(gettask-datastask-id)

^IBoltbolt-obj(:objecttask-data);;取出bolt对象

user-context(:user-contexttask-data)
sampler?(sampler)
execute-sampler?(execute-sampler)
now(if(orsampler?execute-sampler?)(System/currentTimeMillis))];;满足sample条件,记录当前时间

(whensampler?
(.setProcessSampleStartTimetuplenow))
(whenexecute-sampler?
(.setExecuteSampleStartTimetuplenow))
(.executebolt-objtuple);;调用Bolt的execute方法
(let[delta(tuple-execute-time-delta!tuple)];;只有上面生成了now,这里delta才不为空
(task/apply-hooksuser-context.boltExecute(BoltExecuteInfo.tupletask-iddelta));;执行boltExecutehook
(whendelta;;满足sample条件,则更新builtin-metrics和stats
(builtin-metrics/bolt-execute-tuple!(:builtin-metricstask-data)
executor-stats
(.getSourceComponenttuple)
(.getSourceStreamIdtuple)
delta)
(stats/bolt-execute-tuple!executor-stats
(.getSourceComponenttuple)
(.getSourceStreamIdtuple)
delta)))))))]

;;TODO:cangetanySubscribedStateobjectsoutofthecontextnow

;;2.2async-loop
[(async-loop
(fn[]
;;Iftopologywasstartedininactivestate,don'tcallprepareboltuntilit'sactivatedfirst.
(while(not@(:storm-active-atomexecutor-data))
(Thread/sleep100))

(log-message"Preparingbolt"component-id":"(keystask-datas))
(doseq[[task-idtask-data]task-datas
:let[^IBoltbolt-obj(:objecttask-data)
tasks-fn(:tasks-fntask-data)
user-context(:user-contexttask-data)

;;2.2.1bolt-emit
bolt-emit(fn[streamanchorsvaluestask]
(let[out-tasks(iftask
(tasks-fntaskstreamvalues);;directgrouping
(tasks-fnstreamvalues))]
(fast-list-iter[tout-tasks];;每个targetout-task
(let[anchors-to-ids(HashMap.)];;初始化,用于保存tuple上产生的edges和roots之间的关系
(fast-list-iter[^TupleImplaanchors];;每个anchor(源tuple)
(let[root-ids(->a.getMessageId.getAnchorsToIds.keySet)];;得到所有的root-ids,anchor可能来自多个源

(when(pos?(countroot-ids))
(let[edge-id(MessageId/generateIdrand)];;为每个anchor产生新的edge-id
(.updateAckValaedge-id);;和anchortuple的_outAckVal做异或,缓存新产生的edgeid
(fast-list-iter[root-idroot-ids]
(put-xor!anchors-to-idsroot-idedge-id));;生成新的anchors-to-ids,保存新edge和所有root-id的关系到anchors-to-ids
))))
(transfer-fnt
(TupleImpl.worker-context
values
task-id
stream
(MessageId/makeIdanchors-to-ids)))))
(orout-tasks[])))]];;返回值,targettaskids
(builtin-metrics/register-all(:builtin-metricstask-data)storm-confuser-context)


2.2.2prepare
(.preparebolt-obj
storm-conf
user-context
(OutputCollector.
(reifyIOutputCollector
(emit[thisstreamanchorsvalues]
(bolt-emitstreamanchorsvaluesnil))
(emitDirect[thistaskstreamanchorsvalues]
(bolt-emitstreamanchorsvaluestask))
(^voidack[this^Tupletuple]
(let[^TupleImpltupletuple
ack-val(.getAckValtuple)];;取出缓存的新edges
(fast-map-iter[[rootid](..tuplegetMessageIdgetAnchorsToIds)];;对于anchors-to-ids中记录的每个root进行ack
(task/send-unanchoredtask-data
ACKER-ACK-STREAM-ID
[root(bit-xoridack-val)]);;发送ack消息,ack和同步新edges
))
(let[delta(tuple-time-delta!tuple)];;更新metrics和stats
(task/apply-hooksuser-context.boltAck(BoltAckInfo.tupletask-iddelta))
(whendelta
(builtin-metrics/bolt-acked-tuple!(:builtin-metricstask-data)
executor-stats
(.getSourceComponenttuple)
(.getSourceStreamIdtuple)
delta)
(stats/bolt-acked-tuple!executor-stats
(.getSourceComponenttuple)
(.getSourceStreamIdtuple)
delta))))
(^voidfail[this^Tupletuple]
(fast-list-iter[root(..tuplegetMessageIdgetAnchors)]
(task/send-unanchoredtask-data
ACKER-FAIL-STREAM-ID
[root]));;对应fail比较简单,任意一个edge失败,都表示root失败
(let[delta(tuple-time-delta!tuple)]
(task/apply-hooksuser-context.boltFail(BoltFailInfo.tupletask-iddelta))
(whendelta
(builtin-metrics/bolt-failed-tuple!(:builtin-metricstask-data)
executor-stats
(.getSourceComponenttuple)
(.getSourceStreamIdtuple))
(stats/bolt-failed-tuple!executor-stats
(.getSourceComponenttuple)
(.getSourceStreamIdtuple)
delta))))
(reportError[thiserror]
(report-errorerror)
)))))
(reset!open-or-prepare-was-called?true)
(log-message"Preparedbolt"component-id":"(keystask-datas))
(setup-metrics!executor-data);;创建metricstick

(let[receive-queue(:receive-queueexecutor-data)
event-handler(mk-task-receiverexecutor-datatuple-action-fn)];;用tuple-action-fn创建receivequeue的event-handler
(disruptor/consumer-started!receive-queue);;标识consumer开始运行
(fn[]
(disruptor/consume-batch-when-availablereceive-queueevent-handler);;真正的consumereceive-queue
0)));;sleep0s
:kill-fn(:report-error-and-dieexecutor-data)
:factory?true
:thread-namecomponent-id)]))


2.1tuple-action-fn

先判断tuple的stream-id,对于METRICS_TICK的处理参考上面

否则,就是普通的tuple,用对应的task去处理
对于一个executor线程中包含多个task,其实就是这里根据task-id选择不同的task-data

并且最终调用bolt-obj的execute,这就是user定义的bolt逻辑

^IBoltbolt-obj(:objecttask-data)

(.executebolt-objtuple)

2.2async-loop,启动线程

2.2.1bolt-emit

类似send-spout-msg,被emit调用,用于发送tuple,Storm的命名风格不统一

调用task-fn产生out-tasks,以及调用transfer-fn,将tuples发送到发送队列都比较好理解


关键中一段对于anchors-to-ids的操作,刚开始有些费解...这个anchors-to-ids到底干吗用的?

用于记录的DAG图中,该tuple产生的edge,以及和root的关系

代码里面anchor表示的是源tuple,而理解上anchor更象是一种关系,所以有些confuse
所以上面的逻辑就是新产生edge-id,虽然相同的out-task,但不同的anchor会产生不同的edge-id

然后对每个anchor的root-ids,产生map[root-id,edge-id](上面的逻辑是异或,因为不同anchors可能有相同的root)

最终就是得到该tuple产生edges和所有相关的roots之间的关系







然后其中的(.updateAckValaedge-id)是干吗的?

为了节省一次向acker的消息发送,理论上,应该在创建edge的时候发送一次消息去acker上注册一下,然后在ack的时候再发送一次消息去acker完成ack

但是storm做了优化,节省了在创建edge的这次消息发送

优化的做法是,

将新创建的edge-id,缓存在父tuple的_outAckVal上,因为处理完紧接着会去ack父tuple,所以在这个时候将新创建的edge信息一起同步到acker,具体看下面的ack实现

所以这里调用updateAckVal去更新父tuple的_outAckVal(做异或),而没有向acker发送消息



关于storm跟踪所有tuple的方法

传统的方法,在spout的时候,生成rootid,之后每次emittuple,产生一条edgeid,就可以记录下整个DAG

然后在ack的时候,只需要标记或删除这些edgeid,表明已经处理完就ok.

这样的问题在于,如果DAG图比较复杂,那么这个结构会很大,可扩展性不好

storm采用的方法是,不需要记录具体的每条edge,因为实际上他并不关心有哪些edge,他只关心每条edge是否都被ack了,所以只需要不停的做异或,成对的异或结果为0



2.2.1prepare

主要在于OutputCollector的实现,

其中emit和emitDirect都是直接调用bolt-emit,很简单

重点就是ack和fail的实现

其中比较难理解的是,发送ack消息是不是直接发送本身的edge-id,而是(bit-xoridack-val)

其实做了两件事,ack当前tuple和同步新的edges

因为acker拿到id和ack-val也是和acker记录的值做异或,所以这里先直接做异或,省得在消息中需要发送两个参数

总结

如果有耐心看到这儿,再附送两幅图...







内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐
章节导航