Spark2.2 Task原理分析及源码解析
2017-11-27 17:08
495 查看
Task是什么
task是一个执行单元。我们有两种任务:org.apache.spark.scheduler.ShuffleMapTask
org.apache.spark.scheduler.ResultTask
Spark作业由一个或多个stages组成。
job的最后一个阶段是由多个ResultTasks组成的,之前的stages由ShuffleMapTasks组成。
ResultTask执行task并将task输出返回给driver Application。
ShuffleMapTask执行task,并将task输出分配给多个bucket(基于task的partitioner个数)。
Task源码解析
前提:Executor接收到LaunchTask(),运行TaskRunner(),执行Executor.run()方法;updateDependencies()为task运行准备工作,加载配置文件,jars
反序列化task,调用Task.run( )方法
Task.run( )方法,其实是调用Task的抽象方法runTask( )
ShuffleMapTask.runTask( )
ResultTask.runTask( )
伪代码:Executor的run方法
override def run(): Unit = { //拷贝配置文件 jar 资源 之类 拷贝过来 updateDependencies(taskDescription.addedFiles, taskDescription.addedJars) /** * 调用 task的 run()方法 * 返回 task【ShuffleMapTask/ResultTask】结果数据的存储位置 */ val value = try { val res = task.run( taskAttemptId = taskId, attemptNumber = taskDescription.attemptNumber, metricsSystem = env.metricsSystem) threwException = false res } /** * leen * 调用executor所在的 CoarseGrainedExecutorBackend 的 statusUpdate()方法 */ execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) }
伪代码:Task的run方法
final def run(taskAttemptId: Long,attemptNumber: Int,metricsSystem: MetricsSystem): T = { /** 创建一个taskcontext,里边封装了 * task属于哪一个stage * task要处理的rdd的哪一个partition * task的重试次数 * .... */ context = new TaskContextImpl(stageId,partitionId,taskAttemptId,attemptNumber, taskMemoryManager,localProperties,metricsSystem,metrics) try { /** * 调用抽象方法,runtask() * 这个类,只是一个抽象父类 * 仅仅封装了一些子类通用的数据和操作 4000 * task的子类:shuffleMapTask,ResultTask * 则,要运行子类的runTask(),才能执行我们需要的算子与操作 */ runTask(context) } }
ShuffleMapTask的runTask方法
多个tasks运行在多个executors上,对同一个Rdd进行操作,则通过共享变量的方式获得Rdd;首先获取shuffleManager,从shuffleManager中获得shuffleWriter ;
调用RDD的iterator()方法,并且传入当前task所要处理的partition,在iterator()的方法中,针对RDD的partition执行自己定义的算子或者函数;
执行完之后,会通过shuffleWriter,经过HashPartitioner进行分区之后,写入自己对应的bucket ;
返回结果 MapStatus,MapStatus里边封装了ShuffleMapTask计算后的 结果数据的存储位置 ;
调用executor所在的 CoarseGrainedExecutorBackend 的 statusUpdate()方法,更新状态。
/** * 非常重要的一点就是runTask方法 * @param context * @return MapStatus 返回值 里边封装了ShuffleMapTask计算后的 结果数据的存储位置 */ override def runTask(context: TaskContext): MapStatus = { /** Deserialize the RDD using the broadcast variable. * 使用 广播变量 反序列化RDD。 * RDD是怎么拿到的呢??? * 多个task运行在多个executor上,都是并行或者并发运行的 * 但是都不在一个地方; * 一个stage的task要处理的RDD是一样的 * 这里是通过broadcast variable 这个广播变量直接拿到的 */ val threadMXBean = ManagementFactory.getThreadMXBean val deserializeStartTime = System.currentTimeMillis() val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime } else 0L val ser = SparkEnv.get.closureSerializer.newInstance() val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader) _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) { threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime } else 0L var writer: ShuffleWriter[Any, Any] = null try { // 首先获取shuffleManager // 从shuffleManager中获得shuffleWriter val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) /** * 重中之重: * 调用RDD的iterator()方法,并且传入当前task所要处理的partition * 在iterator()的方法中,针对RDD的partition执行自己定义的算子或者函数 * ------------------- * 执行完之后; * 都会通过shuffleWriter,经过HashPartitioner进行分区之后,写入自己对应的bucket * ------------------- * 最终:返回结果 MapStatus * MapStatus里边封装了ShuffleMapTask计算后的 结果数据的存储位置 ; * * 至于存储在什么地方,是由BlockManager进行管理的, * BlockManager是spark底层的内存数据、磁盘数据的管理组件 */ writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } catch { case e: Exception => try { if (writer != null) { writer.stop(success = false) } } catch { case e: Exception => log.debug("Could not stop writer", e) } throw e } }
相关文章推荐
- Spark2.2 job触发流程原理剖析与源码分析
- Spark streaming源码分析之Job动态生成原理与源码解析
- Spark2.2-Task序列化源码解析
- Spark2.2 Executor原理剖析及源码分析
- Spark2.2源码之Task任务提交源码分析
- Spark2.2 TaskScheduler原理剖析与源码分析
- Thrift之TProtocol类体系原理及源码详细解析之类继承架构分析
- Spark streaming技术内幕6 : Job动态生成原理与源码解析
- 深入理解Spark 2.1 Core (一):RDD的原理与源码分析
- 深入理解Spark 2.1 Core (四):运算结果处理和容错的原理与源码分析
- Spark2.2 Worker、Driver和Executor向Master注册原理剖析图解及源码
- Spark内核源码深度剖析:SparkContext原理剖析与源码分析
- java ConcurrentSkipListMap原理分析及源码解析
- Android源码解析之新进程中启动自定义服务过程(startService)的原理分析
- spark 2.2 源码分析 Spark-shell 篇
- Spark技术内幕: Task向Executor提交的源码解析
- 深入理解Spark 2.1 Core (二):DAG调度器的原理与源码分析
- Spark内核源码深度剖析:Master主备切换机制原理剖析与源码分析
- Spark源码分析(1) 从WordCount示例看Spark延迟计算原理
- 深入理解Spark 2.1 Core (十二):TimSort 的原理与源码分析