spark1.2源码学习之任务调度1
2015-01-26 10:37
316 查看
写在最前,真的很少写博客,周五答应群里的哥们要写一写,硬着头皮写吧。如果有不对不好的地方,欢迎各种打脸。哈哈下面开始正文
1、任务调度从创建sparkContext开始
2、进入createTaskScheduler方法
注意,这里调用的是TaskSchedulerImpl的start方法
3、先看上面提到的scheduler.initialize(backend)方法
4、下面继续看start方法。
进入SparkDeploySchedulerBackend类找start方法,这个方法比较长,一步步的看吧。
但是中间涉及到很多的调用和代码,所以这个方法将被我从中间断开。
下面看父类的的start方法
进入client的start方法
大家都知道,实例化一个actor首先会自动运行里面的preStart方法,那么看看这个方法
1、任务调度从创建sparkContext开始
class SparkContext(config: SparkConf) // 初始化schedulerBackend和taskScheduler private[spark] var (schedulerBackend, taskScheduler) = SparkContext.createTaskScheduler(this, master)
2、进入createTaskScheduler方法
//此处指追踪standAlone模式 case SPARK_REGEX(sparkUrl) => //scheduler是TaskSchedulerImpl类 val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) //backend是SparkDeploySchedulerBackend val backend = new SparkDeploySchedulerBackend(scheduler, sc,masterUrls) //初始化scheduler,入参是SparkDeploySchedulerBackend的实例 scheduler.initialize(backend) //实例化完毕,返回。 (backend, scheduler)
//实例化dagScheduler dagScheduler = new DAGScheduler(this) // 调用taskScheduler的start方法,初始化运行环境 taskScheduler.start()
注意,这里调用的是TaskSchedulerImpl的start方法
3、先看上面提到的scheduler.initialize(backend)方法
def initialize(backend: SchedulerBackend) { //set方法,不解释了。 this.backend = backend // 以下内容是建立一个任务的调度池,并指定调度规则,默认是FIFO方式 //调度方式由spark.scheduler.mode参数确定 rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }
4、下面继续看start方法。
override def start() { 这里的backend就是initialize方法传入的,继续追踪backend的start方法。 这里要啰嗦一句,start方法下面的代码块我还没看懂,所以略过。 backend.start() if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { Utils.tryOrExit { checkSpeculatableTasks() } } } }
进入SparkDeploySchedulerBackend类找start方法,这个方法比较长,一步步的看吧。
但是中间涉及到很多的调用和代码,所以这个方法将被我从中间断开。
override def start() { 首先调用的是父类的start方法,继续杀入父类CoarseGrainedSchedulerBackend super.start()
下面看父类的的start方法
override def start() { val properties = new ArrayBuffer[(String, String)] for ((key, value) <- scheduler.sc.conf.getAll) { if (key.startsWith("spark.")) { properties += ((key, value)) } } //这里,实例化了一个driverActor是CoarseGrainedSchedulerBackend中的DriverActor driverActor = actorSystem.actorOf( Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME) }
// 记住这个url,这个是master的akka同学 val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( SparkEnv.driverActorSystemName, conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}", "{{WORKER_URL}}") val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") .map(Utils.splitCommandString).getOrElse(Seq.empty) val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } // Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") //这里实例化一个ApplicationDescription val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,appUIAddress, sc.eventLogDir) //这里实例化一个AppClient client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start()
进入client的start方法
def start() { // 实例化一个actor,当然就在AppClient类里面 actor = actorSystem.actorOf(Props(new ClientActor)) }
大家都知道,实例化一个actor首先会自动运行里面的preStart方法,那么看看这个方法
override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) try { 重点在这里,向Master注册 registerWithMaster() } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() context.stop(self) } }
相关文章推荐
- spark源码学习(八):spark具体是如何使用集群的资源去运行任务
- spark源码之Job执行(2)任务调度taskscheduler
- [spark] TaskScheduler 任务提交与调度源码解析
- spark源码学习(十一):资源的调度Schedule
- spark源码之Job执行(2)任务调度taskscheduler
- Spark2.2 任务调度机制schedule()源码剖析
- spark源码之Job执行(2)任务调度taskscheduler
- Spark1.3从创建到提交:3)任务调度初始化源码分析
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- TBSchedule源码学习笔记-线程组任务调度
- Spark源码阅读笔记之任务调度(一)
- Spark学习笔记 --- 任务调度的逻辑图
- spark源码之Job执行(2)任务调度taskscheduler
- 【Spark篇】---Spark中资源和任务调度源码分析与资源配置参数应用
- spark[源码]-任务调度源码分析[三]
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- spark源码之Job执行(2)任务调度taskscheduler
- Spark的任务调度学习