Spark1.3从创建到提交:10)任务提交源码分析
2017-01-05 10:26
399 查看
接着上一节的提交最前stage和其任务集,先看下TaskScheduler.submitTaskst的源码,该方法具体由TaskScheduler的子类TaskSchedulerImpl实现
该方法把任务集中的任务序列化后依次发送给executor,下面去CoarseGrainedExecutorBackend类中的receiveWithLogging方法
executor把接收到的task反序列化后,调用executor.launchTask进行处理,代码如下
接下来,实现了Runnable接口的TaskRunner.run方法会被调用
override def submitTasks(taskSet: TaskSet) { //只保留关心的代码.... backend.reviveOffers() }这里的backend为SparkDeploySchedulerBackend(SchedulerBackend的实现类),但是SparkDeploySchedulerBackend没有方法reviveOffers的实现,其实现在其父类CoarseGrainedSchedulerBackend中
override def reviveOffers() { driverActor ! ReviveOffers }其给DriverActor(DriverActor是CoarseGrainedSchedulerBackend的一个内部类)发送了一个ReviveOffers的消息,接下来去DriverActor类中的receiveWithLogging看下这个方法
case ReviveOffers => makeOffers()再看下makeOffers这个方法
//Make fake resource offers on all executors def makeOffers() { launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) => new WorkerOffer(id, executorData.executorHost, executorData.freeCores) }.toSeq)) }该方法启开始向executor分派任务,继续看下launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { //获得当前sparkEnv的序列化器 val ser = SparkEnv.get.closureSerializer.newInstance() //序列化任务 val serializedTask = ser.serialize(task) if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) { //..which exceeds max allowed } else { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK //给excutor发送序列化好的task executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask)) } } }
该方法把任务集中的任务序列化后依次发送给executor,下面去CoarseGrainedExecutorBackend类中的receiveWithLogging方法
//drive->executor启动计算任务 case LaunchTask(data) => if (executor == null) { logError("Received LaunchTask command but executor was null") System.exit(1) } else { val ser = env.closureSerializer.newInstance() //反序列化接收到的task val taskDesc = ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " + taskDesc.taskId) //taskDesc.serializedTask是才是真正task的序列化内容 executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
executor把接收到的task反序列化后,调用executor.launchTask进行处理,代码如下
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int, taskName: String, serializedTask: ByteBuffer) { //使用TaskRunner封装task,TaskRunner实现了Runnable接口 val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask) runningTasks.put(taskId, tr) //把task交给线程池执行 threadPool.execute(tr) }
接下来,实现了Runnable接口的TaskRunner.run方法会被调用
override def run() { val deserializeStartTime = System.currentTimeMillis() Thread.currentThread.setContextClassLoader(replClassLoader) val ser = env.closureSerializer.newInstance() execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) var taskStart: Long = 0 startGCTime = gcTime try { //第1次反序列获获得taskFiles, taskJars, taskBytes val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) //第2次序列化获得可以执行的task task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) taskStart = System.currentTimeMillis() //调用Task的run方法执行该task val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber) //some other code..... } }上面的代码最终反序列化出了可以执行的task,并调用了Task.run方法进行执行,其代码如下
final def run(taskAttemptId: Long, attemptNumber: Int): T = { context = new TaskContextImpl(stageId = stageId, partitionId = partitionId, taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false) TaskContextHelper.setTaskContext(context) context.taskMetrics.setHostname(Utils.localHostName()) taskThread = Thread.currentThread() if (_killed) { kill(interruptThread = false) } try { runTask(context) } finally { context.markTaskCompleted() TaskContextHelper.unset() } }该方法是一个不重写的方法,最终调用了该类中的 runTask(context)方法,runTask是一个抽象方法,其实现类有ResultTask和ShuffleMapTask,下面看ResultTask中的runTask方法
override def runTask(context: TaskContext): U = { // Deserialize the RDD and the func using the broadcast variables. val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) metrics = Some(context.taskMetrics) func(context, rdd.iterator(partition, context)) }上面代码通过反序列化广播变量中的task获得了任务中的rdd和在该rdd上的操作func,然后使用func作用于该rdd获得新分区的数据,其具体原理可以参看之前的文章:Spark核心RDD:计算函数compute
相关文章推荐
- Spark1.3从创建到提交:3)任务调度初始化源码分析
- Spark1.3从创建到提交:2)spark-submit和SparkContext源码分析
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- Spark1.3从创建到提交:5)Executor启动源码分析
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- Spark1.3从创建到提交:4)资源分配源码分析
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- spark源码分析之任务提交(一)Rdd#collect方法分析
- Spark2.2源码之Task任务提交源码分析
- spark源码分析之Executor启动与任务提交篇
- spark 1.6.0 core源码分析1 集群启动及任务提交过程
- spark core源码分析1 集群启动及任务提交过程
- spark core源码分析6 Spark job的提交
- spark源码学习(三):job的提交以及runJob函数的分析
- spark内核揭秘-10-RDD源码分析
- jstorm源码分析:提交任务过程
- Spark技术内幕:Stage划分及提交源码分析
- Spark源码分析之四:Stage提交
- Hadoop2.*源码分析之Job任务提交与执行