Storm-源码分析-EventManager (backtype.storm.event)
2013-06-24 17:37
330 查看
Protocol and DataType
大体结构,
定义protocol EventManager, 其实就是定义interface
函数event-manager, 主要做2件事
1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行
2. 返回实现EventManager的匿名record(reify部分, 实现protocol)
这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner
使用的时候很简单, 如下
可以直接调用add或其他的function
相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型
比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们
大体结构,
定义protocol EventManager, 其实就是定义interface
函数event-manager, 主要做2件事
1. 启动event queue的处理线程, 不断从queue中取出event-fn并执行
2. 返回实现EventManager的匿名record(reify部分, 实现protocol)
这里使用了reify的close over特性, reify会将用到的局部变量打包到闭包内, 包含queue, runner
(ns backtype.storm.event (:use [backtype.storm log util]) (:import [backtype.storm.utils Time Utils]) (:import [java.util.concurrent LinkedBlockingQueue TimeUnit]) ) (defprotocol EventManager (add [this event-fn]) (waiting? [this]) (shutdown [this])) (defn event-manager "Creates a thread to respond to events. Any error will cause process to halt" [daemon?] (let [added (atom 0) processed (atom 0) ^LinkedBlockingQueue queue (LinkedBlockingQueue.) running (atom true) runner (Thread. (fn [] (try-cause (while @running (let [r (.take queue)] (r) (swap! processed inc))) (catch InterruptedException t (log-message "Event manager interrupted")) (catch Throwable t (log-error t "Error when processing event") (halt-process! 20 "Error when processing an event")) )))] (.setDaemon runner daemon?) (.start runner) (reify EventManager (add [this event-fn] ;; should keep track of total added and processed to know if this is finished yet (when-not @running (throw (RuntimeException. "Cannot add events to a shutdown event manager"))) (swap! added inc) (.put queue event-fn) ) (waiting? [this] (or (Time/isThreadWaiting runner) (= @processed @added) )) (shutdown [this] (reset! running false) (.interrupt runner) (.join runner) ) )))
使用的时候很简单, 如下
let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] (.add processes-event-manager sync-processes)
可以直接调用add或其他的function
相当于给event-manager增加EventManager protocol, 反过来说, 给add或其他接口functions增加对event-manager record的support, 因为protocol函数的第一个参数总是类型
比较神奇的是, 闭包产生的效果, 可以在完全没有queue, runner定义或声明的情况下, 方便的操作他们
相关文章推荐
- Storm-源码分析-acker (backtype.storm.daemon.acker)
- Storm-源码分析- bolt (backtype.storm.task)
- Storm-源码分析-Stats (backtype.storm.stats)
- Storm-源码分析-LocalState (backtype.storm.utils)
- Storm-源码分析- Scheduler (backtype.storm.scheduler)
- Storm-源码分析- Messaging (backtype.storm.messaging)
- Storm-源码分析-Streaming Grouping (backtype.storm.daemon.executor)
- Storm-源码分析- spout (backtype.storm.spout)
- Storm-源码分析- timer (backtype.storm.timer)
- Storm-源码分析- hook (backtype.storm.hooks)
- Storm-源码分析-Topology Submit-Task-TopologyContext (backtype.storm.task)
- Storm-源码分析-LocalState (backtype.storm.utils)
- backtype.storm.event [ERROR] Error when processing event
- storm事件管理器EventManager源码分析-event.clj
- storm事件管理器EventManager源码分析-event.clj
- 【libevent】源码分析(4)--与event相关的一些函数和操作
- Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理
- Android触摸屏事件派发机制详解与源码分析一(View篇)onTouch,onClick,ontouchevent
- perf_event源码分析(一)——cmd_record
- Storm【Storm0.9.3】- 源码分析:ACK 框架之 Acker Bolt的实现分析