您的位置:首页 > 其它

Spark1.3从创建到提交:10)任务提交源码分析

2017-01-05 10:26 399 查看
接着上一节的提交最前stage和其任务集,先看下TaskScheduler.submitTaskst的源码,该方法具体由TaskScheduler的子类TaskSchedulerImpl实现

override def submitTasks(taskSet: TaskSet) {
//只保留关心的代码....
backend.reviveOffers()
}
这里的backend为SparkDeploySchedulerBackend(SchedulerBackend的实现类),但是SparkDeploySchedulerBackend没有方法reviveOffers的实现,其实现在其父类CoarseGrainedSchedulerBackend中
override def reviveOffers() {
driverActor ! ReviveOffers
}
其给DriverActor(DriverActor是CoarseGrainedSchedulerBackend的一个内部类)发送了一个ReviveOffers的消息,接下来去DriverActor类中的receiveWithLogging看下这个方法
case ReviveOffers => makeOffers()
再看下makeOffers这个方法
//Make fake resource offers on all executors
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
该方法启开始向executor分派任务,继续看下launchTasks
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//获得当前sparkEnv的序列化器
val ser = SparkEnv.get.closureSerializer.newInstance()
//序列化任务
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
//..which exceeds max allowed
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//给excutor发送序列化好的task
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}

该方法把任务集中的任务序列化后依次发送给executor,下面去CoarseGrainedExecutorBackend类中的receiveWithLogging方法

//drive->executor启动计算任务
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val ser = env.closureSerializer.newInstance()
//反序列化接收到的task
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//taskDesc.serializedTask是才是真正task的序列化内容
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}


executor把接收到的task反序列化后,调用executor.launchTask进行处理,代码如下

def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer) {
//使用TaskRunner封装task,TaskRunner实现了Runnable接口
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)
runningTasks.put(taskId, tr)
//把task交给线程池执行
threadPool.execute(tr)
}

接下来,实现了Runnable接口的TaskRunner.run方法会被调用

override def run() {
val deserializeStartTime = System.currentTimeMillis()
Thread.currentThread.setContextClassLoader(replClassLoader)
val ser = env.closureSerializer.newInstance()
execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
var taskStart: Long = 0
startGCTime = gcTime
try {
//第1次反序列获获得taskFiles, taskJars, taskBytes
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
//第2次序列化获得可以执行的task
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
taskStart = System.currentTimeMillis()
//调用Task的run方法执行该task
val value = task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)
//some other code.....
}
}
上面的代码最终反序列化出了可以执行的task,并调用了Task.run方法进行执行,其代码如下
final def run(taskAttemptId: Long, attemptNumber: Int): T = {
context = new TaskContextImpl(stageId = stageId, partitionId = partitionId,
taskAttemptId = taskAttemptId, attemptNumber = attemptNumber, runningLocally = false)
TaskContextHelper.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
taskThread = Thread.currentThread()
if (_killed) {
kill(interruptThread = false)
}
try {
runTask(context)
} finally {
context.markTaskCompleted()
TaskContextHelper.unset()
}
}
该方法是一个不重写的方法,最终调用了该类中的 runTask(context)方法,runTask是一个抽象方法,其实现类有ResultTask和ShuffleMapTask,下面看ResultTask中的runTask方法
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
}
上面代码通过反序列化广播变量中的task获得了任务中的rdd和在该rdd上的操作func,然后使用func作用于该rdd获得新分区的数据,其具体原理可以参看之前的文章:Spark核心RDD:计算函数compute
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  SPARK