您的位置:首页 > 其它

Spark动态分配资源

2017-10-23 19:43 239 查看

Spark动态分配资源

标签(空格分隔): spark

作业调度(Job Scheduling)

OverView

在一个集群中的每一个spark application(一个sparkContext的实例)维持着一系列独立的executor集合。集群管理着这些spark application之间的资源调度。

在一个spark application之内,可能同时运行着多个job,spark使用公平调度器来为这些job分配资源。

不同application之间的资源调度

spark application维持着一些独立的executor jvms,这些jvm用于applcaiton的数据存储和task的运行。

在不同applicationzh 之间分配资源的最简单的方案是使用静态资源分配,即每一个application给予一个最大的可用资源,在这个applicaiton的生命周期中,这部分资源都不会被释放。

standalone模式:在此模式下,applicaiton会按照FIFO的顺序执行,在执行时,applicaiton会尝试使用所有的可用节点.可以通过参数
spark.core.max
设置,或者通过
spark.deploy.defaultCores
设置使用的核数,最后,对于每个核的内存控制,使用
spark.executor.memeory
进行控制。

Mesos模式:在此模式下,将
spark.mesos.coarse
设置为true来启用这种粗粒度的控制方式,和standalone模式一样,使用
spark.core.max
spark.executor.memory
来分别控制core和memory。

Yarn模式:在这种模式下,使用
--num-executors
来设置分配给这个applicaiton的executor,使用
--executor-memory
来是指每个executor的内存,使用
--executor-cores
来设置每个executor的核数。

在Mesos模式下,spark还可以共享一台机器上不同的core,但是,所有的模式下,内存都是不可共享的,

动态资源分配

Spark1.2之后,有了动态分配资源的特性,每个applicaiton从资源池里申请资源,当不再使用时,放回到资源池,可以继续被其他的applicaiton使用。

Spark的动态资源分配是在executor层面上进行的,也就是说,从资源池申请和放回到资源池的是一个个executor,而不会对executor内的core和memory动态分配和共享。

通过修改spark/conf/spark-default.conf文件,添加
spark.dynamicAllocation.enabled true
来开启。

启用external shuffle service

启用此服务的目的是保护executor所写的shuffle文件,以便shuffle可以安全的删除。

在spark配置文件中,设置
spark.shffle.service.enabled true
在Yarn模式下,这个shuffle服务是通过运行在每个nodemanager上的org.apache.spark.yarn.network.YarnShuffleService实现的。通过以下步骤来启用服务。

在spark的lib下找到spark--yarn-shuffle.jar文件。

将这个文件加入到nodemanager的classpath。

在每个节点的yarn-site.xml中,修改

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>


重启nodeManager。

资源分配策略

请求资源策略

当现有的excutor不足够同时执行已提交的task时,便会向集群申请新的executor。

真正触发申请操作的是,当集群pending的task达到
spark.dynamicAllocation.schedulerBacklogTimeout
时间时,便会触发申请操作。然后每
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
秒重新检测,如果还有pending的task,再次申请资源。每次申请的资源是成指数递增的,比如,1,2,4,8这种。和tcp慢启动的原理很类似。

删除资源策略

spark application 删除那些经过
spark.dynamicAllocation.executorIdleTimeout
时间未使用的executor。

如何释放不使用的executor

在动态分配资源时,当释放掉不使用的executor之后,applicaiton可能还需要访问存储在那个executor上的文件。所以,spark需要在释放executor的时候保留相关的executor状体和文件。

上述的这种情况在shuffle过程中是常见的,shuffle时,每个excutor首先将自己的map输出写到自身的本地硬盘中。当别的executor想要使用时,这个executor作为server向其他executor提供数据。

有可能出现这样一种情况,当shuffle还没完成时,某个executor被释放掉,这时候,如果想要使用他map的输出,就需要重复的进行计算,这是很没有必要的。

解决方案是引入第三方的服务来保存这些文件和相应的状态信息。称之为extern shuffle service。这是一个在每个节点长时间独立运行的服务,独立于spark的application和executor之外。当启用这个服务时,executor通过这个服务来获取相应的文件和状态信息,而不是直接从executor上拿。

Applicaiton内的调度

一个spark application(sparkContext instance)中可能同时运行着多个job。比如,通过thrift执行spark-sql时,thrift的进程是一个applicaiton,thrfit可以接受多个sql进行查询,这一个个的sql就是job。

默认的调度器是FIFO的,每个job被分为不同的stage(如,map阶段和reduce阶段)在队列头的会优先执行,执行完之后才会轮到下一个job执行。

可以配置使用fair scheduler,不同的job会轮转的获取集群上的资源,而不用等到前面的job都执行完毕后才能执行。通过设置
spark.scheduler.mode FAIR
来启用。还可以在配置sparkContext的conf来设置

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)


参考

http://spark.apache.org/docs/1.5.0/job-scheduling.html#graceful-decommission-of-executors

http://lxw1234.com/archives/2015/12/593.htm
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark