Apache Storm 2.0.0-SNAPSHOT源码分析笔记(一)
2016-04-27 19:25
645 查看
Topology提交过程
Storm client负责将用户创建的topology提交到nimbus
Nimbus通过thrift接口接收用户提交的topology
Supervisor通过zookeeper提示的消息下载最新的任务安排,并负责启动worker
Worker内可以运行task,task类型为spout或bolt
Executor是一个个运行的线程,同一个executor运行同一种类型的task,即为spout或者bolt
从WordCountTopology入手分析
先来看一段位于example/storm-starter目录下的WordCountTopology.java文件中的代码
Storm client通过bin/storm命令提交topology,具体可以查看bin/storm.py中的jar函数以及exec_storm_class函数,命令解析完后拼接一个”java -cp”命令开始运行topology中的main函数
首先需要实例化TopologyBuilder, TopologyBuilder类位于包org.apache.storm.topology下,用一个HashMap记录具有唯一id的spout或者bolt(例如上面的“spout”就是唯一id),获得spout以及bolt的相关配置(并行度等)
StormSubmitter调用submitTopologyWithProgressBar方法提交topology,通过查看位于org.apache.storm包下的StormSubmitter类可以发现,submitTopologyWithProgressBar函数调用submitTopologyAs函数,而submitTopologyAs函数主要的工作如下:
将命令行参数放入到stormConf, 从conf/storm.yaml读取配置文件到conf中,然后再把stormConf放入到conf中(由此可见,命令行参数的优先级更高),将stormConf转化为Json, 并发送到服务器
调用submitJarAs方法上传Jar文件,StormSubmitter本质上是一个Thrift client, 而Nimbus则是Thrift server, 所有操作都是通过Thrift RPC来完成,submitJarAs首先创建client, 然后调用nimbus thrift server的beginFileUpload方法获得nimbus存法Jar文件的目录,通过uploadChunk方法上传文件
生成thrift client, 并调用nimbus thrift server的submitTopologyWithOpts方法(位于org.apache.storm.daemon包下的nimbus.clj文件)上传topology(参数包括toplogy-name, uploadJarLocatio, serConf, thrift-topology), topology结构定义在storm.thrift文件中
Storm client负责将用户创建的topology提交到nimbus
Nimbus通过thrift接口接收用户提交的topology
Supervisor通过zookeeper提示的消息下载最新的任务安排,并负责启动worker
Worker内可以运行task,task类型为spout或bolt
Executor是一个个运行的线程,同一个executor运行同一种类型的task,即为spout或者bolt
从WordCountTopology入手分析
先来看一段位于example/storm-starter目录下的WordCountTopology.java文件中的代码
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } }
Storm client通过bin/storm命令提交topology,具体可以查看bin/storm.py中的jar函数以及exec_storm_class函数,命令解析完后拼接一个”java -cp”命令开始运行topology中的main函数
首先需要实例化TopologyBuilder, TopologyBuilder类位于包org.apache.storm.topology下,用一个HashMap记录具有唯一id的spout或者bolt(例如上面的“spout”就是唯一id),获得spout以及bolt的相关配置(并行度等)
StormSubmitter调用submitTopologyWithProgressBar方法提交topology,通过查看位于org.apache.storm包下的StormSubmitter类可以发现,submitTopologyWithProgressBar函数调用submitTopologyAs函数,而submitTopologyAs函数主要的工作如下:
将命令行参数放入到stormConf, 从conf/storm.yaml读取配置文件到conf中,然后再把stormConf放入到conf中(由此可见,命令行参数的优先级更高),将stormConf转化为Json, 并发送到服务器
调用submitJarAs方法上传Jar文件,StormSubmitter本质上是一个Thrift client, 而Nimbus则是Thrift server, 所有操作都是通过Thrift RPC来完成,submitJarAs首先创建client, 然后调用nimbus thrift server的beginFileUpload方法获得nimbus存法Jar文件的目录,通过uploadChunk方法上传文件
生成thrift client, 并调用nimbus thrift server的submitTopologyWithOpts方法(位于org.apache.storm.daemon包下的nimbus.clj文件)上传topology(参数包括toplogy-name, uploadJarLocatio, serConf, thrift-topology), topology结构定义在storm.thrift文件中
struct StormTopology { //ids must be unique across maps // #workers to use is in conf 1: required map<string, SpoutSpec> spouts; 2: required map<string, Bolt> bolts; 3: required map<string, StateSpoutSpec> state_spouts; 4: optional list<binary> worker_hooks; }
相关文章推荐
- Release Notes - Apache Storm - Version 0.9.2-incub
- C/C++实现对STORM运行信息查看及控制的方法
- 基于HBase Thrift接口的一些使用问题及相关注意事项的详解
- Thrift框架学习整理
- 基于Storm的Nginx log实时监控系统
- Storm配置属性和操作命令
- Storm集群的搭建
- storm topology优化之lib库分离
- 从storm-jdbc谈谈component的生命周期
- Storm 实时云计算 学习使用 包括基本api 以及 高层次api trident 的基本使用
- 整合Kafka到Spark Streaming——代码示例和挑战
- sparksql与hive整合
- 大白话storm
- tns cluster 简介
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- Storm DRPC实现机制分析剖析
- storm
- Storm配置项详解