您的位置:首页 > 大数据

Spark定制班第7课:Spark Streaming源码解读之JobScheduler内幕实现和深度思考

2016-06-07 22:07 435 查看
In last course, we learned "How the Spark Streaming Job is generated dynamically".

From that course, we have known there are 3 key classes for Spark Streaming Job:

JobScheduler: schedules Spark Streaming Jobs
JobGenerator: generates Spark Streaming Job
ReceiverTracker: manages the execution of the receivers of ReceiverInputDStreams

In this course, we will learn more about the inside of Spark Streaming JobScheduler.

We need to always remember that JobScheduler class is the center of Spark Streaming Job scheduling, you can regard it as Spark Core's DAGScheduler.

Let's check some important member variables, innder classes and member functions of JobScheduler class.

Here is the whole picture of JobScheduler class:



JobScheduler class has following important member variables:

jobGenerator
At each batchDuration interval, the start of jobGenerator will call DStreamGraph to generate RDD graph, and finally generate Job

receiverTracker
The start of receiverTracker will start the Receiver on Executor of Spark Cluster, and have it to be ready to accept input streaming

inputInfoTracker
A tracker to track all input streaming information (a.k.a meta data) as well as processed record number

jobSets
A set of Jobs belong to the same batch. There are information stored such as time of the batch, jobs of the batch and a map of input stream id to its input info

jobExecutor
A thread pool including fixed number of threads to execute all of jobs in jobSet
The number of threads can be configured by spark.streaming.concurrentJobs configuration parameter, default is 1

eventLoop
An event loop to receive events from the caller and process all events in the event thread
The events all are instances of JobSchedulerEvent trait

listenerBus
An instance of StreamingListenerBus class
It will asynchronously pass StreamingListenerEvents to registered StreamingListeners

JobScheduler class has following important inner classes:

JobSchedulerEvent
A private sealed trait interface class which can only be extended by other inner class(s) within JobScheduler class
Here it is used as alternative to enums of event types for JobScheduler
The parameter of JobScheduler eventLoop's onReceive function is declared as JobSchedulerEvent class

JobStarted
A private class which extends JobSchedulerEvent
It has 2 parameters in its constructor function: job and start time
This event type will call handleJobStarted function

JobCompleted
A private class which extends JobSchedulerEvent
It has 2 parameters in its constructor function: job and completed time
This event type will call handleJobCompleted function

ErrorReported
A private class which extends JobSchedulerEvent
It has 2 parameters in its constructor function: error message and throwable exception
This event type will call handleError function



JobHandler
A private class which extends Runnable interface
It implements function: run()
It has 1 parameter in its constructor function: job
In its run() function, it sets job description, posts JobStarted event to eventLoop, calls job's run() function which calls job's func passed in job's constructor function, and posts JobCompleted event to eventLoop



JobScheduler class has following important memeber functions:

start()
It is only called by a new thread named "streaming-start" in StreamingContext#start function
Here is the StreamingContext#start function:



In JobScheduler#start function, the most 2 key things are to start a EventLoop to listen JobSchedulerEvent and call related function, and start a JobGenerator to generate a Spark Streaming Job

Here is the JobScheduler#start function:



processEvent(JobSchedulerEvent)
In JobScheduler#processEvent function, different incoming JobSchedulerEvent post by JobHandler such as JobStarted/JobCompleted/ErrorReported leads to call different handler function such as handleJobStart/handleJobCompletion/handleError
Here is the JobScheduler#processEvent function:



submitJobSet(JobSet)
In JobScheduler#submitJobSet function, the jobSet with its specific time will be put into a jobSets map, and each Job in the jobSet will be executed by a thread in jobExecutor thread pool, which actually calls JobHandler's run function
Also post StreamingListenerBatchSubmitted streaming event to streaming listener bus
Here is the JobScheduler#submitJobSet function:



handleJobStart(Job, Long)
Set processingStartTime of the jobSet if it is not set
Set startTime of the job
Also post StreamingListenerBatchStarted and StreamingListenerOutputOperationStarted streaming events to streaming listener bus
Here is the JobScheduler#handleJobStart function:



handleJobCompletion(Job, Long)
Set processingEndTime of the jobSet if it is not set
Set completedTime of the job
If jobSet has completed, remove it from jobSets map, and call jobGenerator#onBatchCompletion to clear related meta data
Also post StreamingListenerOutputOperationCompleted and StreamingListenerBatchCompleted streaming events to streaming listener bus
Here is the JobScheduler#handleJobCompletion function:



handleError(String, Throwable)
Here is the JobScheduler#handleError function:



stop(Boolean)
In JobScheduler#stop function, please note the order to stop/shutdown receiverTracker#stop -> jobGenerator#stop -> jobExecutor#shutdown -> listenerBus#stop -> eventLoop#stop
Here is the JobScheduler#stop function:






内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息