Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)
2013-07-26 11:16
344 查看
executor在发送outbounding message的时候, 需要决定发送到next component的哪些tasks
这里就需要用到streaming grouping,
direct grouping返回, :direct
使用tuple/list-hash-code, 对values list产生hash code
对num-tasks取mod, 并使用task-getter取出对应的target-tasks
Fields类, 除了存放fields的list, 还有个用于快速field读取的index
index的生成, 很简单, 就是记录fields以及自然排序
使用时调用select, 给出需要哪几个fields的value, 以及tuple
从index读出fields的index值, 直接从tuple中读出对应index的value (当然生成tuple的时候, 也必须安装fields的顺序生成)
默认选取第一个task
因为考虑到load balance, 所以采用下面这种伪随机的实现方式
对target-tasks, 先随机shuffle, 打乱次序
在acquire-random-range-id, 会依次读所有的task, 这样保证, 虽然顺序是随机的, 但是每个task都会被选中一次
当curr越界时, 清空curr, 并从新shuffle target-tasks
:custom-object 和:custom-serialized 的不同仅仅是, thrift-grouping是否被序列化过
没有就可以直接读出object, 否则需要反序列成object
Direct groupings can only be declared on streams that have been declared as direct streams.
这里直接返回:direct, 因为direct-grouping, 发送到哪个tasks, 是由producer产生tuple的时候已经决定了, 所以这里不需要做任何grouping相关工作
一个executor只会对应于一个component, 所以给出当前executor的component-id
getTargets, 可以得出所有outbound components, [streamid, [target-componentid, grouping]]
调用outbound-groupings,
最终返回[streamid [component grouper]]的hashmap, 并赋值给executor-data中的stream->component->grouper
task在最终发送message的时候, 就会通过stream->component->grouper来产生真正的target tasks list
outbound-groupings
对每个task不为空的target component调用mk-grouper
mk-grouper返回的是grouper fn, 所以, 最终的返回, [component, grouper]
这里就需要用到streaming grouping,
1. mk-grouper
除了direct grouping, 返回的是grouper function, 执行该grouper function得到target tasks listdirect grouping返回, :direct
(defn- mk-grouper "Returns a function that returns a vector of which task indices to send tuple to, or just a single task index." [^WorkerTopologyContext context component-id stream-id ^Fields out-fields thrift-grouping ^List target-tasks] (let [num-tasks (count target-tasks) random (Random.) target-tasks (vec (sort target-tasks))] (condp = (thrift/grouping-type thrift-grouping) :fields ;;1.1 fields-grouping, 根据某个field进行grouping (if (thrift/global-grouping? thrift-grouping) ;;1.2 fields为空时,代表global-grouping,所有tuple发到一个task (fn [task-id tuple] ;; It's possible for target to have multiple tasks if it reads multiple sources (first target-tasks)) ;;对于global-grouping,取排过序的第一个task, taskid最小的task (let [group-fields (Fields. (thrift/field-grouping thrift-grouping))] ;;取出group-fields (mk-fields-grouper out-fields group-fields target-tasks) )) :all (fn [task-id tuple] target-tasks) ;;1.3 all-grouping, 比较简单, 发送到所有task, 所以返回整个target-tasks :shuffle (mk-shuffle-grouper target-tasks) ;;1.4 shuffle-grouping :local-or-shuffle ;;1.5 local优先, 如果目标tasks有local的则shuffle到local的tasks (let [same-tasks (set/intersection (set target-tasks) (set (.getThisWorkerTasks context)))] (if-not (empty? same-tasks) (mk-shuffle-grouper (vec same-tasks)) (mk-shuffle-grouper target-tasks))) :none ;;1.6 简单的版本的random,从target-tasks随机取一个 (fn [task-id tuple] (let [i (mod (.nextInt random) num-tasks)] (.get target-tasks i) )) :custom-object (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))] (mk-custom-grouper grouping context component-id stream-id target-tasks)) :custom-serialized (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))] (mk-custom-grouper grouping context component-id stream-id target-tasks)) :direct :direct )))
1.1 fields-groups
使用.select取出group-fields在tuple中对应的values list, 你可以使用多个fields来进行group使用tuple/list-hash-code, 对values list产生hash code
对num-tasks取mod, 并使用task-getter取出对应的target-tasks
(defn- mk-fields-grouper [^Fields out-fields ^Fields group-fields ^List target-tasks] (let [num-tasks (count target-tasks) task-getter (fn [i] (.get target-tasks i))] (fn [task-id ^List values] (-> (.select out-fields group-fields values) tuple/list-hash-code (mod num-tasks) task-getter))))
Fields类, 除了存放fields的list, 还有个用于快速field读取的index
index的生成, 很简单, 就是记录fields以及自然排序
使用时调用select, 给出需要哪几个fields的value, 以及tuple
从index读出fields的index值, 直接从tuple中读出对应index的value (当然生成tuple的时候, 也必须安装fields的顺序生成)
public class Fields implements Iterable<String>, Serializable { private List<String> _fields; private Map<String, Integer> _index = new HashMap<String, Integer>(); private void index() { for(int i=0; i<_fields.size(); i++) { _index.put(_fields.get(i), i); } } public List<Object> select(Fields selector, List<Object> tuple) { List<Object> ret = new ArrayList<Object>(selector.size()); for(String s: selector) { ret.add(tuple.get(_index.get(s))); } return ret; } }
1.2 globle-groups
fields grouping, 但是field为空, 就代表globle grouping, 所有tuple都发送到一个task默认选取第一个task
1.3 all-groups
发送到所有的tasks1.4 shuffle-grouper
没有采用比较简单的直接用random取值的方式(区别于none-grouping)因为考虑到load balance, 所以采用下面这种伪随机的实现方式
对target-tasks, 先随机shuffle, 打乱次序
在acquire-random-range-id, 会依次读所有的task, 这样保证, 虽然顺序是随机的, 但是每个task都会被选中一次
当curr越界时, 清空curr, 并从新shuffle target-tasks
(defn- mk-shuffle-grouper [^List target-tasks] (let [choices (rotating-random-range target-tasks)] (fn [task-id tuple] (acquire-random-range-id choices))))
(defn rotating-random-range [choices] (let [rand (Random.) choices (ArrayList. choices)] (Collections/shuffle choices rand) [(MutableInt. -1) choices rand])) (defn acquire-random-range-id [[^MutableInt curr ^List state ^Random rand]] (when (>= (.increment curr) (.size state)) (.set curr 0) (Collections/shuffle state rand)) (.get state (.get curr)))
1.5 local-or-shuffle
local tasks优先选取, 并采用shuffle的方式1.6 none-grouping
不care grouping的方式, 现在的实现就是简单的random1.7 customing-grouping
可以自定义CustomStreamGrouping, 关键就是定义chooseTasks逻辑, 来实现自己的tasks choose策略(defn- mk-custom-grouper [^CustomStreamGrouping grouping ^WorkerTopologyContext context ^String component-id ^String stream-id target-tasks] (.prepare grouping context (GlobalStreamId. component-id stream-id) target-tasks) (fn [task-id ^List values] (.chooseTasks grouping task-id values) ))
public interface CustomStreamGrouping extends Serializable { /** * Tells the stream grouping at runtime the tasks in the target bolt. * This information should be used in chooseTasks to determine the target tasks. * * It also tells the grouping the metadata on the stream this grouping will be used on. */ void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks); /** * This function implements a custom stream grouping. It takes in as input * the number of tasks in the target bolt in prepare and returns the * tasks to send the tuples to. * * @param values the values to group on */ List<Integer> chooseTasks(int taskId, List<Object> values); }
:custom-object 和:custom-serialized 的不同仅仅是, thrift-grouping是否被序列化过
没有就可以直接读出object, 否则需要反序列成object
1.8 direct-grouping
producer of the tuple decides which task of the consumer will receive this tuple.Direct groupings can only be declared on streams that have been declared as direct streams.
这里直接返回:direct, 因为direct-grouping, 发送到哪个tasks, 是由producer产生tuple的时候已经决定了, 所以这里不需要做任何grouping相关工作
2 stream->component->grouper
outbound-components一个executor只会对应于一个component, 所以给出当前executor的component-id
getTargets, 可以得出所有outbound components, [streamid, [target-componentid, grouping]]
调用outbound-groupings,
最终返回[streamid [component grouper]]的hashmap, 并赋值给executor-data中的stream->component->grouper
task在最终发送message的时候, 就会通过stream->component->grouper来产生真正的target tasks list
(defn outbound-components "Returns map of stream id to component id to grouper" [^WorkerTopologyContext worker-context component-id] (->> (.getTargets worker-context component-id) ;;[streamid, [target-componentid, grouping]] clojurify-structure (map (fn [[stream-id component->grouping]] [stream-id (outbound-groupings worker-context component-id stream-id (.getComponentOutputFields worker-context component-id stream-id) component->grouping)])) (into {}) (HashMap.)))
outbound-groupings
对每个task不为空的target component调用mk-grouper
mk-grouper返回的是grouper fn, 所以, 最终的返回, [component, grouper]
(defn- outbound-groupings [^WorkerTopologyContext worker-context this-component-id stream-id out-fields component->grouping] (->> component->grouping (filter-key #(-> worker-context ;;component对应的tasks不为0 (.getComponentTasks %) count pos?)) (map (fn [[component tgrouping]] [component (mk-grouper worker-context this-component-id stream-id out-fields tgrouping (.getComponentTasks worker-context component) )])) (into {}) (HashMap.)))
相关文章推荐
- Storm-源码分析-acker (backtype.storm.daemon.acker)
- Storm-源码分析-Topology Submit-Task-TopologyContext (backtype.storm.task)
- Storm-源码分析- hook (backtype.storm.hooks)
- Storm-源码分析- Messaging (backtype.storm.messaging)
- Storm-源码分析-Stats (backtype.storm.stats)
- Storm-源码分析- bolt (backtype.storm.task)
- Storm-源码分析- spout (backtype.storm.spout)
- Storm-源码分析-LocalState (backtype.storm.utils)
- Storm-源码分析-LocalState (backtype.storm.utils)
- Storm-源码分析- Scheduler (backtype.storm.scheduler)
- Storm-源码分析- timer (backtype.storm.timer)
- Storm-源码分析-EventManager (backtype.storm.event)
- Storm-源码分析-Topology Submit-Executor-mk-threads
- Storm-源码分析- Component ,Executor ,Task之间关系<转>
- Storm-源码分析- Component ,Executor ,Task之间关系
- Storm-源码分析-Topology Submit-Executor
- storm-0.8.2源码分析2之topology启动
- docker 源码分析 二(基于1.8.2版本),docker client与daemon交互
- storm源码之理解Storm中Worker、Executor、Task关系
- storm-0.8.2源码分析之topology启动