spark源码分析只: job 全过程
2015-04-30 21:33
344 查看
为了研究生命周期,往往需要action触发Job的运行,以RDD的count操作为例来说明
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
count命令实际上触发了SparkContext的runJob方法执行,然后以sum统计job执行的结果
层层跟踪runJob方法
def runJob[T, U: ClassTag](...) {
//确保DAG调度器不能为null,DagScheduler在创建SparkContext时被初始化,
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
//实际上执行DagScheduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
...
}
看看DAG调度器的runJob实现
def runJob[T, U: ClassTag](...)
{
val start = System.nanoTime
//进入执行流程:submitJob方法
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
...
}
//等待Job执行结果并记录
waiter.awaitResult() match {
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
}
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
submitJob方法实现
def submitJob[T, U](...): JobWaiter[U] =
{
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
//获取JobId
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//向事件处理器发送JobSubmitted(是个cass class)这个消息
eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
往下看看事件处理器初始化的地方
private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
implicit val timeout = Timeout(30 seconds)
val initEventActorReply =
dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
eventProcessActor = Await.result(initEventActorReply, timeout.duration).
asInstanceOf[ActorRef]
}
再往下看看dagSchedulerActorSupervisor的初始化过程
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
count命令实际上触发了SparkContext的runJob方法执行,然后以sum统计job执行的结果
层层跟踪runJob方法
def runJob[T, U: ClassTag](...) {
//确保DAG调度器不能为null,DagScheduler在创建SparkContext时被初始化,
if (dagScheduler == null) {
throw new SparkException("SparkContext has been shutdown")
}
//实际上执行DagScheduler的runJob方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
resultHandler, localProperties.get)
...
}
看看DAG调度器的runJob实现
def runJob[T, U: ClassTag](...)
{
val start = System.nanoTime
//进入执行流程:submitJob方法
val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
...
}
//等待Job执行结果并记录
waiter.awaitResult() match {
case JobSucceeded => {
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
}
case JobFailed(exception: Exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
throw exception
}
submitJob方法实现
def submitJob[T, U](...): JobWaiter[U] =
{
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
//获取JobId
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//向事件处理器发送JobSubmitted(是个cass class)这个消息
eventProcessActor ! JobSubmitted(
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
往下看看事件处理器初始化的地方
private def initializeEventProcessActor() {
// blocking the thread until supervisor is started, which ensures eventProcessActor is
// not null before any job is submitted
implicit val timeout = Timeout(30 seconds)
val initEventActorReply =
dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this))
eventProcessActor = Await.result(initEventActorReply, timeout.duration).
asInstanceOf[ActorRef]
}
再往下看看dagSchedulerActorSupervisor的初始化过程
相关文章推荐
- Spark中job、stage、task的划分+源码执行过程分析
- spark源码分析之DAGScheduler提交作业(job)过程、stage阶段说明
- 从源码剖析一个Spark WordCount Job执行的全过程
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- MapReduce执行过程源码分析(一)——Job任务的提交
- Spark源码分析之一:Job提交运行总流程概述
- spark-core_13:Worker源码分析1-Worker初始化过程
- Spark 源码分析 -- task实际执行过程
- spark core源码分析1 集群启动及任务提交过程
- Spark源码分析之二:Job的调度模型与运行反馈
- Spark streaming源码分析之Job动态生成原理与源码解析
- spark源码学习(四)---job执行分析
- Spark源码分析之二:Job的调度模型与运行反馈
- spark2.1源码分析2:从SparkPi分析一个job的执行
- spark2.1源码分析2:从SparkPi分析一个job的执行
- MapReduce Job本地提交过程源码跟踪及分析
- Hama框架学习(一) 从源码角度分析job的提交和运行过程
- 从源码剖析一个Spark WordCount Job执行的全过程
- spark 1.6.0 core源码分析1 集群启动及任务提交过程
- Hadoop源码分析(三)--------------job提交过程分析(3)之job的split过程