SPARK task执行与资源分配的关系
2016-11-14 14:52
369 查看
SPARK task执行与资源分配的关系
问题背景
执行spark某个sparkjob申请的资源是150个Executor,但是直至job执行结束(大约30多分钟),分配给该job的Executor个数是49个;该job正常试行时间大约是14分钟,但由于此次分配该job的Executor个数少于150导致job执行过长大约30多分钟;
问题:为什么没有获得足够的资源,job就开始执行了?
问题分析
执行该job时候集群资源不足,导致执行job结束也没有分配足够的资源分配了部分Executor,该job就开始执行task,应该是task的调度线程和Executor资源申请是异步的;
该问题相关的conf
spark.scheduler.maxRegisteredResourcesWaitingTime :Maximum amount of time to wait for resources to register before scheduling begins. default:30s
spark.scheduler.minRegisteredResourcesRatio:
The minimum ratio of registered resources (registered resources / total expected resources) (resources are executors in yarn mode, CPU cores in standalone mode and Mesos coarsed-grained mode ['spark.cores.max' value is total expected resources for Mesos coarse-grained mode] ) to wait for before scheduling begins. Specified as a double between 0.0 and 1.0. Regardless of whether the minimum ratio of resources has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredResourcesWaitingTime. default : 0.8 for YARN mode; 0.0 for standalone mode and Mesos coarse-grained mode
关键代码分析
1._taskScheduler.postStartHook()->taskScheduler.waitBackendReady()
SparkContext 初始化的时候回调用_taskScheduler.postStartHook() postStartHook() : override def postStartHook() { waitBackendReady() }
2.taskScheduler.waitBackendReady()->SchedulerBackend.isReady()即CoarseGrainedSchedulerBackend.isReady()
override def isReady(): Boolean = { if (sufficientResourcesRegistered) { logInfo("SchedulerBackend is ready for scheduling beginning after " + s"reached minRegisteredResourcesRatio: $minRegisteredRatio") return true } //时间判断逻辑: //System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs;maxRegisteredWaitingTimeMs默认是30s if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTimeMs) { logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + s"maxRegisteredResourcesWaitingTime: $maxRegisteredWaitingTimeMs(ms)") return true } false }
上述方方法isReady中的sufficientResourcesRegistered,调用的是是其子类YarnSchedulerBackend.sufficientResourcesRegistered():
判断逻辑是:如果现在分配的Executor的个数>需要申请的Executor个数*spark.scheduler.minRegisteredResourcesRatio ;其中spark.scheduler.minRegisteredResourcesRatio 默认是0.8
override def sufficientResourcesRegistered(): Boolean = { totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio } //----------------------------------------------- override val minRegisteredRatio = if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) { 0.8 } else { super.minRegisteredRatio }
结论
如果想等待申请完所有的资源再执行job的:需要将spark.scheduler.maxRegisteredResourcesWaitingTime设置的很大;spark.scheduler.minRegisteredResourcesRatio 设置为1但设置成上述配置的的话有一个问题:
如果设置spark.scheduler.maxRegisteredResourcesWaitingTime为1个小时,假设期待的Executor个数是300个,则由于集群资源紧张,job在一个小时之内申请并占有200个Executor,但是最后还是在maxRegisteredResourcesWaitingTime结束没有达到预期的300个Executor,等待期间会浪费这些资源;
还是应该综合考虑具体情况设置这个两个参数!
Reference
https://issues.apache.org/jira/browse/SPARK-2635https://issues.apache.org/jira/browse/SPARK-1946
http://spark.apache.org/docs/latest/configuration.html
SPARK-2.0.0源码
相关文章推荐
- Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
- 理解storm的并行执行,workder,executor,task的关系以及调度算法
- Spark动态资源分配-Dynamic Resource Allocation
- Spark任务执行函数调用关系解析
- 第三十七课 Spark之Task执行原理及结果
- Spark 源码分析 -- task实际执行过程
- spark源码阅读3-Task运行期之函数调用关系分析
- Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
- Spark Streaming揭秘 Day17 资源动态分配
- spark&yarn&storm的资源管理分配对并发性的考量
- 【Spark Core】任务执行机制和Task源码浅析1
- finally不管有没有错都会运行 finally 块用于清除 try 块中分配的任何资源,以及运行任何即使在发生异常时也必须执行的代码
- Spark 动态资源分配(Dynamic Resource Allocation) 解析
- Spark执行样例报警告:WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources
- Spark资源分配异常闪Bug
- Apache Spark源码走读之3 -- Task运行期之函数调用关系分析
- [Spark源码剖析] Task的调度与执行源码剖析
- 大数据IMF传奇行动绝密课程第31课:Spark资源调度分配内幕天机彻底解密
- spark集群无法分配资源
- Spark在Yarn上的动态资源分配