Spark 源码解析之SparkContext都做了些什么
2017-02-09 23:35
363 查看
SparkContext作为Spark应用程序连接Spark集群的入口,对于学习Spark的运行机制与原理有重要的作用。
设置SparkContext是否允许多个共存:
克隆SparkConf,并获取相关配置等:
获取JobProgressListener
创建SparkEnv
创建UI
创建TaskScheduler
初始化blockManager
启动metricsSystem
实例化ExecutorAllocationManager
SparkContext初始化时序图
下面是阅读源码后做的SparkContext初始化时序图:结合源码,厘清SparkContext的初始化
这里只列举了SparkContext初始化中的部分源码。设置SparkContext是否允许多个共存:
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false)
克隆SparkConf,并获取相关配置等:
/** * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be * changed at runtime. */ def getConf: SparkConf = conf.clone() def jars: Seq[String] = _jars def files: Seq[String] = _files def master: String = _conf.get("spark.master") def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client") def appName: String = _conf.get("spark.app.name")
获取JobProgressListener
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener
创建SparkEnv
// This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } private[spark] def env: SparkEnv = _env
创建UI
_progressBar = if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None }
创建TaskScheduler
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start()
初始化blockManager
_env.blockManager.initialize(_applicationId)
启动metricsSystem
_env.metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
实例化ExecutorAllocationManager
_executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf)) case _ => None } } else { None }
相关文章推荐
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么