【Apache Storm系列之四】Storm Topology生命周期【翻译】
2014-01-26 15:23
323 查看
前阵子写的文章大部分都是以实践为主,接下来我们来看下Topology生命周期,也就是实现流程这层的东西;
包含:一个Topology从运行"storm jar"命令-->上传Topology到Nimbus-->supervisors启动/停止workers-->workers和tasks的自身设置
同时也会告诉你Nimbus如何监控topologies以及当我们执行kill的时候,topologies是如何shutdown的;
首先先说下关于Topology几个重要的点:
1、实际运行的拓扑结构不同于用户指定的拓扑。实际拓扑结构的隐式流和隐式“acker“ blot添加到acking 框架中进行管理(用于保证数据处理),隐式Topology结构是由 system-topology! 函数创建的;
2、system-topology! 在两个地方使用:
当Nimbus为Topology创建tasks code
当工人知道他需要将消息传递到某处的时候 code
当你代码中使用StormSubmitter.submitTopology,StormSubmitter会执行以下流程:
第一,如果jar包没有上传,StormSubmiiter首先会将jar包上传
jar包的上传是通过Nimbus的Thrift接口
beginUploadFile返回一个文件路径到Nimbus inbox中
15KB以内的文件可以通过uploadChunk一次性上传
当文件上传完毕之后程序会调用finishFileUpload
以下是上面几个方法的实现:
第二,StormSubmitter通过Nimbus thirft接口调用submitTopology code
Topology配置使用JSON进行序列化
通知Thrift接口submitTopology获取在Nimbus inbox中存放jar包的路径
Nimbus接受Topology提交 code
Nimbus规范Topology配置。规范化主要目的是确保每一项工作都会有同样的序列化注册,这是正确序列化工作的关键所在 code
Nimbus设置Topylogy状态为静态的
Jar包以及配置存放在本地系统,原因是比较大,不适合放到zookeeper中;jar包和configs文件会拷贝到{nimbus local dir}/stormdist/{topology id}这个目录下;
setup-storm-static:写任务-->组件映射到Zookeeper
setup-heartbeats:创建一个Zookeeper"directory" 使得任务可以心跳
Nimbus调用mk-assignment 将任务分配给各个节点机器
分配记录定义请点击code查看
作业包含了以下几个步骤:
master-code-dir:由supervisors从Nimbus下载正确的jars/configs
task->note+port:由一个task id决定应该由哪个task来运行任务(一个worker由一个节点/端口 定义)
node->host:节点ID到hostname的一个映射;这是用来使workers知道连接与其他工作人员沟通的机器。节点ID被用来确定supervisor,supervisor可以运行在多个机器
task->start-time-secs:包含一个从任务ID映射到其推出的Nimbus任务的时间戳;这是使用Nimbus监控topology时,当我们第一次启动的时候指定了一个更长的超时 (启动超时由“nimbus.task.launch配置。 秒”配置)
一旦Topologies分配,他们初始化是一个失效模式(deactivated mode)。start-storm写入数据到zookeeper,以便集群知道Topology是活跃的和可以从spouts中emit元组(tuple) code
TODO集群状态图
Supervisor 在后台运行的两个函数:
synchronize-supervisor:当zookeeper分配发生变化会调用,或者固定每10秒调用一次; code
当节点机器不存在代码的时候,从Nimbus下载代码以提供给topologies分配
写入到本地文件系统这个节点应该是运行的。它写入一个port->LocalAssignment的映射。LocalAssignment包含一个Topology ID 以及Worker的任务ID列表
sync-processes:从LFS读取数据看synchronized-supervisor都写了些什么,与实际运行的机器上的数据做比较。然后启动/停止工作进程同步;
worker通过mk-worker函数启动进程
Worker连接到其他workers启动一个线程用于监控变化。因此,如果一个工人被重新分配,工人将自动重新连接到另一个工人的新位置 code
监控Topology是否是active状态和存储storm-active-atom变量的状态。这个变量用于让task确认是否在spouts上调用nextTuple code
worker是以线程形式下发实际任务的 code
Tasks通过mk-task函数启动 code
Task 设置路由函数,它接受一个流和一个输出元组,并返回一个任务id列表发送的元组 code
任务设置spout-specific或bolt-specific代码 code
调度周期性任务的计时器线程检查拓扑 code
Nimbus的这种行为被表示为一个有限状态机 code
监控时间在一个Topology的每一次"nimbus.monitor.freq.secs"时调用,通过reassign-transition调用reassign-topology code
reassign-topology调用mk-assignments,相同的功能能用来分配第一次Topology。mk-assignments还能够不断的更新topology
mk-assignments 检查心跳以及必要时重新分配worker
任何的重新分配将会改变zookeeper中的状态,这将触发supervisor去同步和停止/启动workers。
Nimbus接受kill命令 code
Nimbus将"kill"事务应用到topology上 code
kill转换函数修改Topology的状态为"killed"以及将“remove”事件列入到未来几秒钟的计划中,即未来几秒后会触发remove时间;code
默认kill的等待时间是Topology消息的超时时间,但是可以通过storm kill命令中的-w标志对其进行重写
设置了以上上面的参数之后,topology会在你指定的等待时间停止运行。这样给了Topology一个机会在shutdown workers之后完成当前没有处理完成的任务;
在启动时,如果Topology的状态为“killed”,那么Nimbus将会在等待几秒之后触发remove时间 code
删除Topology以及清理zookeeper中的分配信息和静态信息 code
单独清理线程运行do-cleanup函数将会清理存储在本地的心跳dir和jar/configs code
翻译参考文献:https://github.com/nathanmarz/storm/wiki/Lifecycle-of-a-topology
包含:一个Topology从运行"storm jar"命令-->上传Topology到Nimbus-->supervisors启动/停止workers-->workers和tasks的自身设置
同时也会告诉你Nimbus如何监控topologies以及当我们执行kill的时候,topologies是如何shutdown的;
首先先说下关于Topology几个重要的点:
1、实际运行的拓扑结构不同于用户指定的拓扑。实际拓扑结构的隐式流和隐式“acker“ blot添加到acking 框架中进行管理(用于保证数据处理),隐式Topology结构是由 system-topology! 函数创建的;
(defn system-topology! [storm-conf ^StormTopology topology] (validate-basic! topology) (let [ret (.deepCopy topology)] (add-acker! (storm-conf TOPOLOGY-ACKERS) ret) (add-system-streams! ret) (validate-structure! ret) ret ))
2、system-topology! 在两个地方使用:
当Nimbus为Topology创建tasks code
当工人知道他需要将消息传递到某处的时候 code
启动一个Topology
"Storm jar"命令用于指定使用指定的参数运行的类。唯一特别的是”storm jar“设置了"storm.jar"的环境变量提供StormSubmiiter后续使用;def jar(jarfile, klass, *args): //第一个参数jar文件名称,第二个参数是class名称,第三个是class类所需要的参数 """Syntax: [storm jar topology-jar-path class ...] Runs the main method of class with the specified arguments. The storm jars and configs in ~/.storm are put on the classpath. The process is configured so that StormSubmitter (http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html) will upload the jar at topology-jar-path when the topology is submitted. """ exec_storm_class( klass, jvmtype="-client", extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"], args=args, childopts="-Dstorm.jar=" + jarfile)
当你代码中使用StormSubmitter.submitTopology,StormSubmitter会执行以下流程:
第一,如果jar包没有上传,StormSubmiiter首先会将jar包上传
jar包的上传是通过Nimbus的Thrift接口
beginUploadFile返回一个文件路径到Nimbus inbox中
15KB以内的文件可以通过uploadChunk一次性上传
当文件上传完毕之后程序会调用finishFileUpload
以下是上面几个方法的实现:
(beginFileUpload [this] (let [fileloc (str (inbox nimbus) "/stormjar-" (uuid) ".jar")] (.put (:uploaders nimbus) fileloc (Channels/newChannel (FileOutputStream. fileloc))) (log-message "Uploading file from client to " fileloc) fileloc )) (^void uploadChunk [this ^String location ^ByteBuffer chunk] (let [uploaders (:uploaders nimbus) ^WritableByteChannel channel (.get uploaders location)] (when-not channel (throw (RuntimeException. "File for that location does not exist (or timed out)"))) (.write channel chunk) (.put uploaders location channel) )) (^void finishFileUpload [this ^String location] (let [uploaders (:uploaders nimbus) ^WritableByteChannel channel (.get uploaders location)] (when-not channel (throw (RuntimeException. "File for that location does not exist (or timed out)"))) (.close channel) (log-message "Finished uploading file from client: " location) (.remove uploaders location) ))
第二,StormSubmitter通过Nimbus thirft接口调用submitTopology code
Topology配置使用JSON进行序列化
通知Thrift接口submitTopology获取在Nimbus inbox中存放jar包的路径
Nimbus接受Topology提交 code
Nimbus规范Topology配置。规范化主要目的是确保每一项工作都会有同样的序列化注册,这是正确序列化工作的关键所在 code
Nimbus设置Topylogy状态为静态的
Jar包以及配置存放在本地系统,原因是比较大,不适合放到zookeeper中;jar包和configs文件会拷贝到{nimbus local dir}/stormdist/{topology id}这个目录下;
setup-storm-static:写任务-->组件映射到Zookeeper
setup-heartbeats:创建一个Zookeeper"directory" 使得任务可以心跳
Nimbus调用mk-assignment 将任务分配给各个节点机器
分配记录定义请点击code查看
作业包含了以下几个步骤:
master-code-dir:由supervisors从Nimbus下载正确的jars/configs
task->note+port:由一个task id决定应该由哪个task来运行任务(一个worker由一个节点/端口 定义)
node->host:节点ID到hostname的一个映射;这是用来使workers知道连接与其他工作人员沟通的机器。节点ID被用来确定supervisor,supervisor可以运行在多个机器
task->start-time-secs:包含一个从任务ID映射到其推出的Nimbus任务的时间戳;这是使用Nimbus监控topology时,当我们第一次启动的时候指定了一个更长的超时 (启动超时由“nimbus.task.launch配置。 秒”配置)
一旦Topologies分配,他们初始化是一个失效模式(deactivated mode)。start-storm写入数据到zookeeper,以便集群知道Topology是活跃的和可以从spouts中emit元组(tuple) code
TODO集群状态图
Supervisor 在后台运行的两个函数:
synchronize-supervisor:当zookeeper分配发生变化会调用,或者固定每10秒调用一次; code
当节点机器不存在代码的时候,从Nimbus下载代码以提供给topologies分配
写入到本地文件系统这个节点应该是运行的。它写入一个port->LocalAssignment的映射。LocalAssignment包含一个Topology ID 以及Worker的任务ID列表
sync-processes:从LFS读取数据看synchronized-supervisor都写了些什么,与实际运行的机器上的数据做比较。然后启动/停止工作进程同步;
worker通过mk-worker函数启动进程
Worker连接到其他workers启动一个线程用于监控变化。因此,如果一个工人被重新分配,工人将自动重新连接到另一个工人的新位置 code
监控Topology是否是active状态和存储storm-active-atom变量的状态。这个变量用于让task确认是否在spouts上调用nextTuple code
worker是以线程形式下发实际任务的 code
Tasks通过mk-task函数启动 code
Task 设置路由函数,它接受一个流和一个输出元组,并返回一个任务id列表发送的元组 code
任务设置spout-specific或bolt-specific代码 code
Topology 监控
Nimbus监控Topology的整个生命周期调度周期性任务的计时器线程检查拓扑 code
Nimbus的这种行为被表示为一个有限状态机 code
监控时间在一个Topology的每一次"nimbus.monitor.freq.secs"时调用,通过reassign-transition调用reassign-topology code
reassign-topology调用mk-assignments,相同的功能能用来分配第一次Topology。mk-assignments还能够不断的更新topology
mk-assignments 检查心跳以及必要时重新分配worker
任何的重新分配将会改变zookeeper中的状态,这将触发supervisor去同步和停止/启动workers。
Kill 一个Topology
运行“storm kill”这个命令,仅仅只是调用Nimbus的Thirft接口去kill掉相对应的Topology codeNimbus接受kill命令 code
Nimbus将"kill"事务应用到topology上 code
kill转换函数修改Topology的状态为"killed"以及将“remove”事件列入到未来几秒钟的计划中,即未来几秒后会触发remove时间;code
默认kill的等待时间是Topology消息的超时时间,但是可以通过storm kill命令中的-w标志对其进行重写
设置了以上上面的参数之后,topology会在你指定的等待时间停止运行。这样给了Topology一个机会在shutdown workers之后完成当前没有处理完成的任务;
在启动时,如果Topology的状态为“killed”,那么Nimbus将会在等待几秒之后触发remove时间 code
删除Topology以及清理zookeeper中的分配信息和静态信息 code
单独清理线程运行do-cleanup函数将会清理存储在本地的心跳dir和jar/configs code
翻译参考文献:https://github.com/nathanmarz/storm/wiki/Lifecycle-of-a-topology
相关文章推荐
- 远程调试Storm Topology,官方Debugging an Apache Storm topology(翻译)
- Storm编程入门API系列之Storm的Topology多个Workers数目控制实现
- Apache Storm提交Topology时的默认CLASSPATH问题
- apache中http_connection系列api接口功能翻译
- Apache Storm技术实战之1 -- WordCountTopology
- MQ消息队列系列(5)ActiviteMQ Getting Started Guide(apache官网翻译)
- Apache Storm技术实战之2 -- BasicDRPCTopology
- Storm概念学习系列之Topology拓扑
- Storm系列(三)Topology提交过程
- 【Apache Storm系列之五】Stream Grouping:不同组件之间的tuples传递
- Storm编程入门API系列之Storm的Topology多个tasks数目控制实现
- Storm官方文档翻译之在生产环境集群中运行Topology
- 【大数据系列】apache hive 官方文档翻译
- Storm【0.9.3】-官方翻译 系列 3:常用的一些基本命令
- Storm Topology的生命周期——浅析(一)
- Storm应用系列之——Topology部署
- Storm Topology的生命周期过程分析
- Apache Storm 翻译简介
- Apache Storm 之使用JAVA API远程提交Topology(非使用storm命令)
- Apache Shiro官方文档翻译系列