您的位置:首页 > 其它

Spark Context

2016-05-18 10:33 351 查看
1.创建LiveListenerBus用于构建Spark事件处理模型

private[spark] val listenerBus = new LiveListenerBus


2.JobProgressListener创建并向listenerBus注册

_jobProgressListener = new JobProgressListener(_conf)

listenerBus.addListener(jobProgressListener)


3.初始化SparkEnv

_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)


4.创建MetadataCleaner

_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)


5.创建SparkStatusTracker,通过JobProgressListener获取spark app中的Job和Stage信息

_statusTracker = new SparkStatusTracker(this)


6.创建ConsoleProcessBar,通过从SparkStatusTracker获取Stage的执行进度并在控制台显示

_progressBar =
if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
Some(new ConsoleProgressBar(this))
} else {
None
}


7.创建HeartBeatReceiver

_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))


8.创建TaskScheduler,同时创建TaskSchedulerBackend

val (sched, ts) = SparkContext.createTaskScheduler(this, master)


9.创建DAGScheduler

_dagScheduler = new DAGScheduler(this)


10.启动TaskScheduler

_taskScheduler.start()


11.通过TaskScheduler获取applicationId

_applicationId = _taskScheduler.applicationId()


12.blockManager初始化

_env.blockManager.initialize(_applicationId)

13.启动MetricsSystem

metricsSystem.start()

14.Attach driver metrics servlet handler to the web ui

metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

15.创建并启动EventLoggingListener,listener向listenerBus注册
_eventLogger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()

listenerBus.addListener(logger)
16.valdynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)


17.创建并启动ExecutorAllocationManager
_executorAllocationManager=new ExecutorAllocationManager(this, listenerBus, _conf)
_executorAllocationManager.start()

18.创建并启动ContextCleaner

_cleaner =
if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
Some(new ContextCleaner(this))

19.设置并启动ListenerBus
20.postEnvironmentUpdate
21.postApplicationStart
22.

// Post init
_taskScheduler.postStartHook()
_env.metricsSystem.registerSource(_dagScheduler.metricsSource)
_env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
_executorAllocationManager.foreach { e =>
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}

// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
_shutdownHookRef = ShutdownHookManager.addShutdownHook(
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
logInfo("Invoking stop() from shutdown hook")
stop()
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: