您的位置:首页 > 其它

Spark2.2 Task原理分析及源码解析

2017-11-27 17:08 495 查看

Task是什么

task是一个执行单元。我们有两种任务:

org.apache.spark.scheduler.ShuffleMapTask

org.apache.spark.scheduler.ResultTask

Spark作业由一个或多个stages组成。

job的最后一个阶段是由多个ResultTasks组成的,之前的stages由ShuffleMapTasks组成。

ResultTask执行task并将task输出返回给driver Application。

ShuffleMapTask执行task,并将task输出分配给多个bucket(基于task的partitioner个数)。

Task源码解析

前提:Executor接收到LaunchTask(),运行TaskRunner(),执行Executor.run()方法;

updateDependencies()为task运行准备工作,加载配置文件,jars

反序列化task,调用Task.run( )方法

Task.run( )方法,其实是调用Task的抽象方法runTask( )

ShuffleMapTask.runTask( )

ResultTask.runTask( )

伪代码:Executor的run方法

override def run(): Unit = {

//拷贝配置文件 jar 资源 之类 拷贝过来
updateDependencies(taskDescription.addedFiles, taskDescription.addedJars)

/**
* 调用 task的 run()方法
* 返回 task【ShuffleMapTask/ResultTask】结果数据的存储位置
*/
val value = try {
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
res
}

/**
* leen
* 调用executor所在的 CoarseGrainedExecutorBackend 的 statusUpdate()方法
*/
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
}


伪代码:Task的run方法

final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem): T = {
/** 创建一个taskcontext,里边封装了
* task属于哪一个stage
* task要处理的rdd的哪一个partition
* task的重试次数
* ....
*/
context = new TaskContextImpl(stageId,partitionId,taskAttemptId,attemptNumber,
taskMemoryManager,localProperties,metricsSystem,metrics)
try {
/**
* 调用抽象方法,runtask()
* 这个类,只是一个抽象父类
* 仅仅封装了一些子类通用的数据和操作

4000
* task的子类:shuffleMapTask,ResultTask
* 则,要运行子类的runTask(),才能执行我们需要的算子与操作
*/
runTask(context)
}
}


ShuffleMapTask的runTask方法

多个tasks运行在多个executors上,对同一个Rdd进行操作,则通过共享变量的方式获得Rdd;

首先获取shuffleManager,从shuffleManager中获得shuffleWriter ;

调用RDD的iterator()方法,并且传入当前task所要处理的partition,在iterator()的方法中,针对RDD的partition执行自己定义的算子或者函数;

执行完之后,会通过shuffleWriter,经过HashPartitioner进行分区之后,写入自己对应的bucket ;

返回结果 MapStatus,MapStatus里边封装了ShuffleMapTask计算后的 结果数据的存储位置 ;

调用executor所在的 CoarseGrainedExecutorBackend 的 statusUpdate()方法,更新状态。

/**
* 非常重要的一点就是runTask方法
* @param context
* @return MapStatus 返回值 里边封装了ShuffleMapTask计算后的 结果数据的存储位置
*/
override def runTask(context: TaskContext): MapStatus = {
/** Deserialize the RDD using the broadcast variable.
* 使用 广播变量 反序列化RDD。
* RDD是怎么拿到的呢???
* 多个task运行在多个executor上,都是并行或者并发运行的
* 但是都不在一个地方;
* 一个stage的task要处理的RDD是一样的
* 这里是通过broadcast variable 这个广播变量直接拿到的
*/
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L

var writer: ShuffleWriter[Any, Any] = null
try {
// 首先获取shuffleManager
// 从shuffleManager中获得shuffleWriter
val manager = SparkEnv.get.shuffleManager
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

/**
* 重中之重:
* 调用RDD的iterator()方法,并且传入当前task所要处理的partition
* 在iterator()的方法中,针对RDD的partition执行自己定义的算子或者函数
* -------------------
* 执行完之后;
* 都会通过shuffleWriter,经过HashPartitioner进行分区之后,写入自己对应的bucket
* -------------------
* 最终:返回结果 MapStatus
* MapStatus里边封装了ShuffleMapTask计算后的 结果数据的存储位置 ;
*
* 至于存储在什么地方,是由BlockManager进行管理的,
* BlockManager是spark底层的内存数据、磁盘数据的管理组件
*/
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: