您的位置:首页 > 其它

SparkStreaming的运行流程解析(源码)

2017-02-06 14:06 507 查看
个人GitHub地址 :https://github.com/LinMingQiang

ssc.start()

启动receiverTracker.start去获取数据

启动jobGenerator.start去计算数据



receiverTracker.start



在这里的receiverinputStreams其实已经实例化了。在inputDStream的时候已经往ssc.graph中添加了实例(具体去看下我的Sparkstreaming中InputDStream的详解(源码)那篇博客)



receiverExecutor.start()

实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。

启动receiverExecutor.start(实际类是ReceiverLauncher继承线程,调用run),它主要负责启动Receiver



使用sc.makeRDD把receiver分发到各个work上去,看上面的英文解释

看上面英文注释,看上面英文注释,看上面英文注释,看上面英文注释





分发各个receiver,然后启动receiver来接收数据,然后在receiver里面将数据存储在blockManage里面

启动计算任务

启动jobGenerator。



第一次启动会调用startFirstTime()



启动一个定时器Timer。



timer定时器的定义

这里定时器传入了一个方法:

longTime => eventActor ! GenerateJobs(new Time(longTime)), “JobGenerator”

(定时器每隔一定时间向eventActor发送一个GenerateJobs。那个!是eventActor的一个方法,就是发送)



timer.start()



下面是线程的方法,循环调用了callback(也就是)上面的向eventActor发消息

线程的定义



thread.start()



如果ecentActor接到消息就会调用下面的方法



如果事件触发了,就调用下面对应的方法



启动generateJobs



1、DStreamGraph生成jobs。

2、从stream那里获取接收到的Block信息。

3、调用submitJobSet方法提交作业。(其实只是作业的完成情况,在 graph.generateJobs(time)里面其实把作业完成了)

4、4、提交完作业之后,做一个CheckPoint。

先看DStreamGraph是怎么生成的jobs:(不同的DStream调用的outputStrem.generateJob方法不一样。看看DStream里面的union和print等方法。print调用的是ForEachDStream的generateJob。union(UnionDStream)调用的是DStream自带的)



union(UnionDStream)调用的是:大部分是调用的下面的这个:(DStream原生的方法)



print调用的是:



生成一个RDD。调用重写的compute。我们可以看到下面的

generatedRDDs.get(time).orElse 。如果这个时间time段的rdd已经准备好了。就不用去执行compute了。而是直接取。这个generatedRDDs 是一个hashmap是每个时间段的rdd。



返回头看:



总结

到这里就算结束了,最后来个总结吧,图例在下一章补上,这一章只是过程分析:

一个Receiver监控一个ip和port,如果要多机子并行监控的话,肯定每台机子都是一样的,所以这边是一个Receiver的Array,里面是每个Receiver,如果在开始的时候只定义了一个inputstream,那其实是启动一个机子在监控ip和port,如果要多机子并行监控,只能是通过不同的ip或不同的port来创建多个inputstream来实现并行。(源码里面是把Receiver做成一个RDD来分发到不同的机子上的,这个RDD里面的Receiver就是你多个Receiver的集合)

1、可以有多个输入,我们可以通过StreamingContext定义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于print方法,还可以有别的方法,saveAsTextFiles和saveAsObjectFiles。这块的设计是支持共享StreamingContext的。

(可以从InputDStream源码的 ssc.graph.addInputStream(this))看出,可以有多个输入

所以一个sparkstreaming程序,可以有多个监控输入。只要增加一个生成inputDStream就可以

2、StreamingContext启动了JobScheduler,JobScheduler启动ReceiverTracker和JobGenerator。

(ReceiverTracker用于分发接收器,在work上实时接收数据;JobGenerator启动一个线程(这个线程在driver端)在每隔一段时间就去使用Receiver接收的数据(使用数据的方法是启动一个job,job操作的rdd是从inputDStream的compute里面产生的))

3、ReceiverTracker是通过把Receiver包装成RDD的方式,发送到Executor端运行起来的,Receiver起来之后向ReceiverTracker发送RegisterReceiver消息。

3、Receiver把接收到的数据,通过ReceiverSupervisor保存。

4、ReceiverSupervisorImpl把数据写入到BlockGenerator的一个ArrayBuffer当中。

5、BlockGenerator内部每个一段时间(默认是200毫秒)就把这个ArrayBuffer构造成Block添加到blocksForPushing当中。

6、BlockGenerator的另外一条线程则不断的把加入到blocksForPushing当中的Block写入到BlockManager当中,并向ReceiverTracker发送AddBlock消息。

7、JobGenerator内部有个定时器,定期生成Job,通过DStream的id,把ReceiverTracker接收到的Block信息从BlockManager上抓取下来进行处理,这个间隔时间是我们在实例化StreamingContext的时候传进去的那个时间,在这个例子里面是Seconds(1)。

GitHub地址:https://github.com/EatMedicineFeelLovely
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: