您的位置:首页 > 其它

Spark技术内幕:Shuffle Map Task运算结果的处理

2015-01-18 21:41 351 查看
这个结果的处理,分为两部分,一个是在Executor端是如何直接处理Task的结果的;还有就是Driver端,如果在接到Task运行结束的消息时,如何对Shuffle Write的结果进行处理,从而在调度下游的Task时,下游的Task可以得到其需要的数据。

Executor端的处理

在解析BasicShuffle Writer时,我们知道ShuffleMap Task在Executor上运行时,最终会调用org.apache.spark.scheduler.ShuffleMapTask的runTask:

[java]
view plaincopyprint?





override def runTask(context: TaskContext): MapStatus = { 

   // 反序列化广播变量taskBinary得到RDD 

   val ser = SparkEnv.get.closureSerializer.newInstance() 

   val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])]( 

     ByteBuffer.wrap(taskBinary.value),Thread.currentThread.getContextClassLoader) 

//省略一些非核心代码 
val manager =SparkEnv.get.shuffleManager
//获得Shuffle Manager 
    //获得Shuffle Writer 
    writer= manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) 

//首先调用rdd .iterator,如果该RDD已经cache了或者checkpoint了,那么直接读取 

//结果,否则开始计算计算的结果将调用Shuffle Writer写入本地文件系统 

writer.write(rdd.iterator(partition,context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) 

// 返回数据的元数据信息,包括location和size 

returnwriter.stop(success = true).get 

override def runTask(context: TaskContext): MapStatus = {
// 反序列化广播变量taskBinary得到RDD
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value),Thread.currentThread.getContextClassLoader)
//省略一些非核心代码
val manager =SparkEnv.get.shuffleManager //获得Shuffle Manager
//获得Shuffle Writer
writer= manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//首先调用rdd .iterator,如果该RDD已经cache了或者checkpoint了,那么直接读取
//结果,否则开始计算计算的结果将调用Shuffle Writer写入本地文件系统
writer.write(rdd.iterator(partition,context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
// 返回数据的元数据信息,包括location和size
returnwriter.stop(success = true).get


那么这个结果最终是如何处理的呢?特别是下游的Task如何获取这些Shuffle的数据呢?还要从Task是如何开始执行开始讲起。在Worker上接收Task执行命令的是org.apache.spark.executor.CoarseGrainedExecutorBackend。它在接收到LaunchTask的命令后,通过在Driver创建SparkContext时已经创建的org.apache.spark.executor.Executor的实例的launchTask,启动Task:

[java]
view plaincopyprint?





def launchTask( 
    context:ExecutorBackend, taskId: Long, taskName: String,serializedTask: ByteBuffer) { 

  val tr = newTaskRunner(context, taskId, taskName, serializedTask) 

runningTasks.put(taskId, tr) 
threadPool.execute(tr) // 开始在executor中运行 



def launchTask(
context:ExecutorBackend, taskId: Long, taskName: String,serializedTask: ByteBuffer) {
val tr = newTaskRunner(context, taskId, taskName, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr) // 开始在executor中运行
}


最终Task的执行是在org.apache.spark.executor.Executor.TaskRunner#run。
在Executor运行Task时,得到计算结果会存入org.apache.spark.scheduler.DirectTaskResult。

[java]
view plaincopyprint?





//开始执行Task,最终得到的是org.apache.spark.scheduler.ShuffleMapTask#runTask 

//返回的org.apache.spark.scheduler.MapStatus 

val value = task.run(taskId.toInt) 
val resultSer = env.serializer.newInstance() //获得序列化工具 

val valueBytes = resultSer.serialize(value)
//序列化结果 
//首先将结果直接放入org.apache.spark.scheduler.DirectTaskResult 

val directResult = new DirectTaskResult(valueBytes,accumUpdates, task.metrics.orNull) 

val ser = env.closureSerializer.newInstance() 
val serializedDirectResult = ser.serialize(directResult)//序列化结果 

val resultSize = serializedDirectResult.limit //序列化结果的大小 

//开始执行Task,最终得到的是org.apache.spark.scheduler.ShuffleMapTask#runTask
//返回的org.apache.spark.scheduler.MapStatus
val value = task.run(taskId.toInt)
val resultSer = env.serializer.newInstance() //获得序列化工具
val valueBytes = resultSer.serialize(value) //序列化结果
//首先将结果直接放入org.apache.spark.scheduler.DirectTaskResult
val directResult = new DirectTaskResult(valueBytes,accumUpdates, task.metrics.orNull)
val ser = env.closureSerializer.newInstance()
val serializedDirectResult = ser.serialize(directResult)//序列化结果
val resultSize = serializedDirectResult.limit //序列化结果的大小


在将结果回传到Driver时,会根据结果的大小有不同的策略:

1)       如果结果大于1GB,那么直接丢弃这个结果。这个是Spark1.2中新加的策略。可以通过spark.driver.maxResultSize来进行设置。
2)       对于“较大”的结果,将其以taskid为key存入org.apache.spark.storage.BlockManager;如果结果不大,那么直接回传给Driver。那么如何判定这个阈值呢?
这里的回传是直接通过akka的消息传递机制。因此这个大小首先不能超过这个机制设置的消息的最大值。这个最大值是通过
bf2a
spark.akka.frameSize设置的,单位是MBytes,默认值是10MB。除此之外,还有200KB的预留空间。因此这个阈值就是conf.getInt("spark.akka.frameSize",10) * 1024 *1024 – 200*1024。
3)       其他的直接通过AKKA回传到Driver。
实现源码解析如下:

[java]
view plaincopyprint?





     val serializedResult = { 
          if (maxResultSize >
0 &&resultSize > maxResultSize) { 
// 如果结果的大小大于1GB,那么直接丢弃, 

// 可以在spark.driver.maxResultSize设置 

ser.serialize(newIndirectTaskResult[Any](TaskResultBlockId(taskId), 

    resultSize)) 
          } else
if (resultSize >=akkaFrameSize - AkkaUtils.reservedSizeBytes) { 

// 如果不能通过AKKA的消息传递,那么放入BlockManager 

// 等待调用者以网络的形式来获取。AKKA的消息的默认大小可以通过 

//  spark.akka.frameSize来设置,默认10MB。 

            val blockId =TaskResultBlockId(taskId) 

            env.blockManager.putBytes( 
              blockId, serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER) 

            ser.serialize(newIndirectTaskResult[Any](blockId, resultSize)) 

          } else { 

            //结果可以直接回传到Driver 

            serializedDirectResult 
          } 
        } 
        // 通过AKKA向Driver汇报本次Task的已经完成 

        execBackend.statusUpdate(taskId,TaskState.FINISHED, serializedResult) 

val serializedResult = {
if (maxResultSize > 0 &&resultSize > maxResultSize) {
// 如果结果的大小大于1GB,那么直接丢弃,
// 可以在spark.driver.maxResultSize设置
ser.serialize(newIndirectTaskResult[Any](TaskResultBlockId(taskId),
resultSize))
} else if (resultSize >=akkaFrameSize - AkkaUtils.reservedSizeBytes) {
// 如果不能通过AKKA的消息传递,那么放入BlockManager
// 等待调用者以网络的形式来获取。AKKA的消息的默认大小可以通过
//  spark.akka.frameSize来设置,默认10MB。
val blockId =TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId, serializedDirectResult,StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(newIndirectTaskResult[Any](blockId, resultSize))
} else {
//结果可以直接回传到Driver
serializedDirectResult
}
}
// 通过AKKA向Driver汇报本次Task的已经完成
execBackend.statusUpdate(taskId,TaskState.FINISHED, serializedResult)


而execBackend是org.apache.spark.executor.ExecutorBackend的一个实例,它实际上是Executor与Driver通信的接口:

[java]
view plaincopyprint?





private[spark] trait ExecutorBackend { 

  def statusUpdate(taskId:Long, state: TaskState, data: ByteBuffer) 



private[spark] trait ExecutorBackend {
def statusUpdate(taskId:Long, state: TaskState, data: ByteBuffer)
}
TaskRunner会将Task执行的状态汇报给Driver(org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverActor)。 而Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。

Driver的处理

TaskRunner将Task的执行状态汇报给Driver后,Driver会转给org.apache.spark.scheduler.TaskSchedulerImpl#statusUpdate。而在这里不同的状态有不同的处理:
1)       如果类型是TaskState.FINISHED,那么调用org.apache.spark.scheduler.TaskResultGetter#enqueueSuccessfulTask进行处理。
2)       如果类型是TaskState.FAILED或者TaskState.KILLED或者TaskState.LOST,调用org.apache.spark.scheduler.TaskResultGetter#enqueueFailedTask进行处理。对于TaskState.LOST,还需要将其所在的Executor标记为failed,并且根据更新后的Executor重新调度。
enqueueSuccessfulTask的逻辑也比较简单,就是如果是IndirectTaskResult,那么需要通过blockid来获取结果:sparkEnv.blockManager.getRemoteBytes(blockId);如果是DirectTaskResult,那么结果就无需远程获取了。然后调用
1)       org.apache.spark.scheduler.TaskSchedulerImpl#handleSuccessfulTask
2)       org.apache.spark.scheduler.TaskSetManager#handleSuccessfulTask
3)       org.apache.spark.scheduler.DAGScheduler#taskEnded
4)       org.apache.spark.scheduler.DAGScheduler#eventProcessActor
5)       org.apache.spark.scheduler.DAGScheduler#handleTaskCompletion
进行处理。核心逻辑都在第5个调用栈。
如果task是ShuffleMapTask,那么它需要将结果通过某种机制告诉下游的Stage,以便于其可以作为下游Stage的输入。这个机制是怎么实现的?
实际上,对于ShuffleMapTask来说,其结果实际上是org.apache.spark.scheduler.MapStatus;其序列化后存入了DirectTaskResult或者IndirectTaskResult中。而DAGScheduler#handleTaskCompletion通过下面的方式来获取这个结果:
val status=event.result.asInstanceOf[MapStatus]
通过将这个status注册到org.apache.spark.MapOutputTrackerMaster,就完成了结果处理的漫长过程
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: