spark内核揭秘-06-TaskSceduler启动源码解析初体验
2015-01-19 18:15
387 查看
TaskScheduler实例对象启动源代码如下所示:
从上面代码可以看出来,taskScheduler的启动是在SparkContext
找到TaskSchedulerImpl实现类中的start方法实现:
1、从上代码看到,先启动CoarseGrainedSchedulerBackend,
从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会先运行preStart()方法
从上面代码可以看到,初始化了akka客户端监听,还有最重要的是调用了系统的scheduler调度,参数函数是立即执行调度,间隔1000毫秒,运行ReviveOffers方法
进入makeOffers()方法:
运行launchTask方法:
这段代码是spark序列号任务大小超过akkaFrameSize - AkkaUtils.reservedSizeBytes大小,那就报错为”Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values.“ ,此刻会将该任务终止,并将任务从任务列表中移除,这样推荐使用broadcast广播方式
否则,将获取执行任务数据,并减少空闲cpu数,发送消息执行 LaunchTask(new SerializableBuffer(serializedTask))方法,即CoarsedGrainedExecutorBackend类的LaunchTask方法:
上面代码 会运行executor 的launchTask方法:
TaskRunner就是一个多线程:
代码太多,我就不截图了,其实实际就是根据机器状况,运行task任务
2、然后我们回到TaskSchedulerImpl实现类中的start方法
如果isLocal=false and spark.speculation=true,不是local模式,那就要dispatcher分发任务了,默认是100毫秒后立即启动,并间隔100毫秒循环运行,
CoarseGrainedSchedulerBackend的reviveOffers:
从上面代码可以看出来,taskScheduler的启动是在SparkContext
找到TaskSchedulerImpl实现类中的start方法实现:
1、从上代码看到,先启动CoarseGrainedSchedulerBackend,
从上面CoarseGrainedSchedulerBackend类的代码,可以看出spark启动了DriverActor,名称为CoarseGrainedScheduler,这是一个akka消息通信类,会先运行preStart()方法
从上面代码可以看到,初始化了akka客户端监听,还有最重要的是调用了系统的scheduler调度,参数函数是立即执行调度,间隔1000毫秒,运行ReviveOffers方法
进入makeOffers()方法:
运行launchTask方法:
这段代码是spark序列号任务大小超过akkaFrameSize - AkkaUtils.reservedSizeBytes大小,那就报错为”Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values.“ ,此刻会将该任务终止,并将任务从任务列表中移除,这样推荐使用broadcast广播方式
否则,将获取执行任务数据,并减少空闲cpu数,发送消息执行 LaunchTask(new SerializableBuffer(serializedTask))方法,即CoarsedGrainedExecutorBackend类的LaunchTask方法:
上面代码 会运行executor 的launchTask方法:
TaskRunner就是一个多线程:
代码太多,我就不截图了,其实实际就是根据机器状况,运行task任务
2、然后我们回到TaskSchedulerImpl实现类中的start方法
如果isLocal=false and spark.speculation=true,不是local模式,那就要dispatcher分发任务了,默认是100毫秒后立即启动,并间隔100毫秒循环运行,
CoarseGrainedSchedulerBackend的reviveOffers:
相关文章推荐
- spark内核揭秘-06-TaskSceduler启动源码解析初体验
- spark内核揭秘-05-SparkContext核心源码解析初体验
- spark内核揭秘-05-SparkContext核心源码解析初体验
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- spark内核揭秘-07-DAGScheduler源码解读初体验
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- spark内核揭秘-07-DAGScheduler源码解读初体验
- spark内核揭秘-13-Worker中Executor启动过程源码分析
- spark内核揭秘-10-RDD源码分析
- spark内核揭秘-11-Driver中AppClient源码分析
- Scala中隐式转换内幕操作规则揭秘、最佳实践及其在Spark中的应用源码解析之Scala学习笔记-55
- Spark Streaming揭秘 Day23 启动关闭源码图解
- Scala深入浅出进阶经典 第65讲:Scala中隐式转换内幕操作规则揭秘、最佳实践及其在Spark中的应用源码解析
- spark内核揭秘-01-spark内核核心术语解析
- 第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析
- Scala深入浅出进阶经典第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析
- 第66讲:Scala并发编程实战初体验及其在Spark源码中的应用解析
- Spark Streaming揭秘 Day25 StreamingContext和JobScheduler启动源码详解
- Scala 深入浅出实战经典 第65讲:Scala中隐式转换内幕揭秘、最佳实践及其在Spark中的应用源码解析
- Scala中隐式转换初体验实战详解以及隐式转换在Spark中的应用源码解析之Scala学习笔记-49