您的位置:首页 > 其它

Spark 源码解析之SparkContext都做了些什么

2017-02-09 23:35 155 查看
SparkContext作为Spark应用程序连接Spark集群的入口,对于学习Spark的运行机制与原理有重要的作用。

SparkContext初始化时序图

下面是阅读源码后做的SparkContext初始化时序图:



结合源码,厘清SparkContext的初始化

这里只列举了SparkContext初始化中的部分源码。

设置SparkContext是否允许多个共存:

// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)


克隆SparkConf,并获取相关配置等:

/**
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
* changed at runtime.
*/
def getConf: SparkConf = conf.clone()

def jars: Seq[String] = _jars
def files: Seq[String] = _files
def master: String = _conf.get("spark.master")
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
def appName: String = _conf.get("spark.app.name")


获取JobProgressListener

private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener


创建SparkEnv

// This function allows components created by SparkEnv to be mocked in unit tests:
private[spark] def createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

private[spark] def env: SparkEnv = _env


创建UI

_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}

_ui =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
_env.securityManager, appName, startTime = startTime))
} else {
// For tests, do not enable the UI
None
}


创建TaskScheduler

// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()


初始化blockManager

_env.blockManager.initialize(_applicationId)


启动metricsSystem

_env.metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
_env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))


实例化ExecutorAllocationManager

_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: