JobScheduler, Job, JobSet 详解
2017-11-12 22:24
232 查看
前面在 Spark Streaming 实现思路与模块概述 和 DStream 生成 RDD 实例详解 里我们分析了
的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。
在 Spark Streaming 程序的入口,我们都会定义一个
RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是
Spark Streaming 程序在
start() 运行起来。
Spark Streaming 的 Job 总调度者。
batch 的 RDD DAG 具体生成工作委托给
batch 生成 RDD DAG 的实例。 具体的,根据我们在 DStream 生成 RDD 实例详解 中的解析,
此时,
JobSet(如上图
JobScheduler(如上图 (4) )。
那么
这里最重要的处理逻辑是
先来看 JobHandler 针对 Job 的主要处理逻辑:
也就是说,
DStream 生成 RDD 实例详解 里分析的对应起来了: 在
上面
具体的,
也就是,
这里
进一步说,这里
batch 产生一个
所以,确切的,*有几个 *output 操作,就调用几次
为了验证这个结果,我们做一个简单的小测试:先设置
在上面的设定下,我们很容易知道,能够同时在处理的 batch 有
下面的就是刚才测试代码的运行结果,验证了我们前面的分析和计算:
最后,我们专门拿出一个小节,辨别一下这 Spark Streaming 的 JobSet, Job,与 Spark Core 的 Job, Stage, TaskSet, Task 这几个概念。
Spark Core 的 Job, Stage, Task 就是我们“日常”谈论 Spark 任务时所说的那些含义,而且在 Spark 的 WebUI 上有非常好的体现,比如下图就是 1 个
3 个
个
8, 2, 4 个
Spark Core 的内部代码里用的类,是
而 Spark Streaming 里也有一个
Streaming 里的
可以:
直接调用
1 个或多个 Spark Core 的
先打印一行表头;然后调用
—— 这正是
也可以是任何用户定义的 code,甚至整个 Spark Streaming 执行过程都不产生任何 Spark Core 的
如上一小节所展示的测试代码,其
:)
最后,Spark Streaming 的
如果对上面 5 个概念做一个层次划分的话(上一层与下一层多是一对多的关系,但不完全准确),就应该是下表的样子:
转自:GitHub
https://github.com/proflin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/2.1%20JobScheduler,%20Job,%20JobSet%20%E8%AF%A6%E8%A7%A3.md
转载请注明:人人都是数据咖 » JobScheduler, Job, JobSet 详解
DStreamGraph和
DStream具有能够实例化
RDD和
RDDDAG
的能力,下面我们来看 Spark Streaming 是如何将其动态调度的。
在 Spark Streaming 程序的入口,我们都会定义一个
batchDuration,就是需要每隔多长时间就比照静态的
DStreamGraph来动态生成一个
RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是
JobScheduler,在
Spark Streaming 程序在
ssc.start()开始运行时,会生成一个
JobScheduler的实例,并被
start() 运行起来。
// 来自 StreamingContext def start(): Unit = synchronized { ... ThreadUtils.runInNewThread("streaming-start") { sparkContext.setCallSite(startSite.get) sparkContext.clearJobGroup() sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") scheduler.start() // 【这里调用了 JobScheduler().start()】 } state = StreamingContextState.ACTIVE ... }
Spark
Streaming 的 Job 总调度者 JobScheduler
JobScheduler是
Spark Streaming 的 Job 总调度者。
JobScheduler有两个非常重要的成员:
JobGenerator和
ReceiverTracker。
JobScheduler将每个
batch 的 RDD DAG 具体生成工作委托给
JobGenerator,而将源头输入数据的记录工作委托给
ReceiverTracker。
JobScheduler 的全限定名是:org.apache.spark.streaming.scheduler.JobScheduler JobGenerator 的全限定名是:org.apache.spark.streaming.scheduler.JobGenerator ReceiverTracker 的全限定名是:org.apache.spark.streaming.scheduler.ReceiverTracker
JobGenerator维护了一个定时器,周期就是我们刚刚提到的
batchDuration,定时为每个
batch 生成 RDD DAG 的实例。 具体的,根据我们在 DStream 生成 RDD 实例详解 中的解析,
DStreamGraph.generateJobs(time)将返回一个
Seq[Job],其中的每个
Job是一个
ForEachDStream实例的
generateJob(time)返回的结果。
此时,
JobGenerator拿到了
Seq[Job]后(如上图
(2)),就将其包装成一个
JobSet(如上图
(3)),然后就调用
JobScheduler.submitJobSet(jobSet)来交付回
JobScheduler(如上图 (4) )。
那么
JobScheduler收到
jobSet后是具体如何处理的呢?我们看其实现:
// 来自 JobScheduler.submitJobSet(jobSet: JobSet) if (jobSet.jobs.isEmpty) { logInfo("No jobs added for time " + jobSet.time) } else { listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo)) jobSets.put(jobSet.time, jobSet) // 【下面这行是最主要的处理逻辑:将每个 job 都在 jobExecutor 线程池中、用 new JobHandler 来处理】 jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) }
这里最重要的处理逻辑是
job => jobExecutor.execute(new JobHandler(job)),也就是将每个 job 都在 jobExecutor 线程池中、用 new JobHandler 来处理。
JobHandler
先来看 JobHandler 针对 Job 的主要处理逻辑:// 来自 JobHandler def run() { ... // 【发布 JobStarted 消息】 _eventLoop.post(JobStarted(job)) PairRDDFunctions.disableOutputSpecValidation.withValue(true) { // 【主要逻辑,直接调用了 job.run()】 job.run() } _eventLoop = eventLoop if (_eventLoop != null) { // 【发布 JobCompleted 消息】 _eventLoop.post(JobCompleted(job)) } ... }
也就是说,
JobHandler除了做一些状态记录外,最主要的就是调用
job.run()!这里就与我们在
DStream 生成 RDD 实例详解 里分析的对应起来了: 在
ForEachDStream.generateJob(time)时,是定义了
Job的运行逻辑,即定义了
Job.func。而在
JobHandler这里,是真正调用了
Job.run()、将触发
Job.func的真正执行!
Job
运行的线程池 jobExecutor
上面 JobHandler是解决了做什么的问题,本节
jobExecutor是解决
Job在哪里做。
具体的,
jobExecutor是
JobScheduler的成员:
// 来自 JobScheduler private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { ... private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") ... }
也就是,
ThreadUtils.newDaemonFixedThreadPool()调用将产生一个名为
"streaming-job-executor"的线程池,所以,
Job将在这个线程池的线程里,被实际执行
func。
spark.streaming.concurrentJobs
参数
这里 jobExecutor的线程池大小,是由
spark.streaming.concurrentJobs参数来控制的,当没有显式设置时,其取值为
1。
进一步说,这里
jobExecutor的线程池大小,就是能够并行执行的
Job数。而回想前文讲解的
DStreamGraph.generateJobs(time)过程,一次
batch 产生一个
Seq[Job},里面可能包含多个
Job——
所以,确切的,*有几个 *output 操作,就调用几次
ForEachDStream.generatorJob(time),就产生出几个
Job**。
为了验证这个结果,我们做一个简单的小测试:先设置
spark.streaming.concurrentJobs = 10,然后在每个 batch 里做
2次
foreachRDD()这样的 output 操作:
// 完整代码可见本文最后的附录 val BLOCK_INTERVAL = 1 // in seconds val BATCH_INTERVAL = 5 // in seconds val CURRENT_JOBS = 10 // in seconds ... // DStream DAG 定义开始 val inputStream = ssc.receiverStream(...) inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 1 inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 2 // DStream DAG 定义结束 ...
在上面的设定下,我们很容易知道,能够同时在处理的 batch 有
10 / 2 = 5个,其余的 batch 的
Job只能处于等待处理状态。
下面的就是刚才测试代码的运行结果,验证了我们前面的分析和计算:
Spark
Streaming 的 JobSet, Job,与 Spark Core 的 Job, Stage, TaskSet, Task
最后,我们专门拿出一个小节,辨别一下这 Spark Streaming 的 JobSet, Job,与 Spark Core 的 Job, Stage, TaskSet, Task 这几个概念。[Spark Streaming] JobSet 的全限定名是:org.apache.spark.streaming.scheduler.JobSet Job 的全限定名是:org.apache.spark.streaming.scheduler.Job [Spark Core] Job 没有一个对应的实体类,主要是通过 jobId:Int 来表示一个具体的 job Stage 的全限定名是:org.apache.spark.scheduler.Stage TaskSet 的全限定名是:org.apache.spark.scheduler.TaskSet Task 的全限定名是:org.apache.spark.scheduler.Task
Spark Core 的 Job, Stage, Task 就是我们“日常”谈论 Spark 任务时所说的那些含义,而且在 Spark 的 WebUI 上有非常好的体现,比如下图就是 1 个
Job包含
3 个
Stage;3
个
Stage各包含
8, 2, 4 个
Task。而
TaskSet则是
Spark Core 的内部代码里用的类,是
Task的集合,和
Stage是同义的。
而 Spark Streaming 里也有一个
Job,但此
Job非彼
Job。Spark
Streaming 里的
Job更像是个
Java里的
Runnable,可以
run()一个自定义的
func函数。而这个
func,
可以:
直接调用
RDD的 action,从而产生
1 个或多个 Spark Core 的
Job
先打印一行表头;然后调用
firstTen = RDD.collect(),再打印
firstTen的内容;最后再打印一行表尾
—— 这正是
DStream.print()的
Job实现
也可以是任何用户定义的 code,甚至整个 Spark Streaming 执行过程都不产生任何 Spark Core 的
Job——
如上一小节所展示的测试代码,其
Job的
func实现就是:
Thread.sleep(Int.MaxValue),仅仅是为了让这个
Job一直跑在
jobExecutor线程池里,从而测试
jobExecutor的并行度
:)
最后,Spark Streaming 的
JobSet就是多个
Job的集合了。
如果对上面 5 个概念做一个层次划分的话(上一层与下一层多是一对多的关系,但不完全准确),就应该是下表的样子:
Spark Core | Spark Streaming | |
lv 5 | RDD DAGs | DStreamGraph |
lv 4 | RDD DAG | JobSet |
lv 3 | Job | Job |
lv 2 | Stage | ← |
lv 1 | Task | ← |
https://github.com/proflin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/2.1%20JobScheduler,%20Job,%20JobSet%20%E8%AF%A6%E8%A7%A3.md
转载请注明:人人都是数据咖 » JobScheduler, Job, JobSet 详解
相关文章推荐
- 2.1 JobScheduler, Job, JobSet 详解
- sparkstreaming-JobScheduler, Job, JobSet 详解
- Oracle中job的使用详解
- Linux cron job 详解
- Java定时任务调度工具详解(4)— Quartz 之 Job/JobDetail/JobExecutionContext/JobDataMap
- Oracle中job使用详解
- JOB中的定时频率repeat_interval详解
- JobService和JobScheduler机制在Android5.0以上保活
- oracle中的job使用详解
- oracle job详解
- Oracle_JOB参数详解
- Job流程:Shuffle详解
- elastic-job详解(三):Job的手动触发功能
- Oracle 使用Job详解
- JobScheduler和JobService机制保活进程
- scikit-learn使用joblib.dump()持久化模型过程中的问题详解--python
- oracle job使用详解及job不运行的检查方法(1)
- SparkStream源码分析:JobScheduler的JobStarted、JobCompleted是怎么被调用的
- Oracle_JOB参数详解
- Jenkins学习(四)job界面详解