您的位置:首页 > 运维架构 > Apache

Apache Storm 2.0.0-SNAPSHOT源码分析笔记(一)

2016-04-27 19:25 645 查看
Topology提交过程

Storm client负责将用户创建的topology提交到nimbus

Nimbus通过thrift接口接收用户提交的topology

Supervisor通过zookeeper提示的消息下载最新的任务安排,并负责启动worker

Worker内可以运行task,task类型为spout或bolt

Executor是一个个运行的线程,同一个executor运行同一种类型的task,即为spout或者bolt

从WordCountTopology入手分析

先来看一段位于example/storm-starter目录下的WordCountTopology.java文件中的代码

public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}


Storm client通过bin/storm命令提交topology,具体可以查看bin/storm.py中的jar函数以及exec_storm_class函数,命令解析完后拼接一个”java -cp”命令开始运行topology中的main函数

首先需要实例化TopologyBuilder, TopologyBuilder类位于包org.apache.storm.topology下,用一个HashMap记录具有唯一id的spout或者bolt(例如上面的“spout”就是唯一id),获得spout以及bolt的相关配置(并行度等)

StormSubmitter调用submitTopologyWithProgressBar方法提交topology,通过查看位于org.apache.storm包下的StormSubmitter类可以发现,submitTopologyWithProgressBar函数调用submitTopologyAs函数,而submitTopologyAs函数主要的工作如下:

将命令行参数放入到stormConf, 从conf/storm.yaml读取配置文件到conf中,然后再把stormConf放入到conf中(由此可见,命令行参数的优先级更高),将stormConf转化为Json, 并发送到服务器

调用submitJarAs方法上传Jar文件,StormSubmitter本质上是一个Thrift client, 而Nimbus则是Thrift server, 所有操作都是通过Thrift RPC来完成,submitJarAs首先创建client, 然后调用nimbus thrift server的beginFileUpload方法获得nimbus存法Jar文件的目录,通过uploadChunk方法上传文件

生成thrift client, 并调用nimbus thrift server的submitTopologyWithOpts方法(位于org.apache.storm.daemon包下的nimbus.clj文件)上传topology(参数包括toplogy-name, uploadJarLocatio, serConf, thrift-topology), topology结构定义在storm.thrift文件中

struct StormTopology {
//ids must be unique across maps
// #workers to use is in conf
1: required map<string, SpoutSpec> spouts;
2: required map<string, Bolt> bolts;
3: required map<string, StateSpoutSpec> state_spouts;
4: optional list<binary> worker_hooks;
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  storm thrift