Spark Context
2016-05-18 10:33
351 查看
1.创建LiveListenerBus用于构建Spark事件处理模型
2.JobProgressListener创建并向listenerBus注册
3.初始化SparkEnv
4.创建MetadataCleaner
5.创建SparkStatusTracker,通过JobProgressListener获取spark app中的Job和Stage信息
6.创建ConsoleProcessBar,通过从SparkStatusTracker获取Stage的执行进度并在控制台显示
7.创建HeartBeatReceiver
8.创建TaskScheduler,同时创建TaskSchedulerBackend
9.创建DAGScheduler
10.启动TaskScheduler
11.通过TaskScheduler获取applicationId
12.blockManager初始化
14.Attach driver metrics servlet handler to the web ui
15.创建并启动EventLoggingListener,listener向listenerBus注册
_eventLogger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
17.创建并启动ExecutorAllocationManager
_executorAllocationManager=new ExecutorAllocationManager(this, listenerBus, _conf)
_executorAllocationManager.start()
18.创建并启动ContextCleaner
19.设置并启动ListenerBus
20.postEnvironmentUpdate
21.postApplicationStart
22.
private[spark] val listenerBus = new LiveListenerBus
2.JobProgressListener创建并向listenerBus注册
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
3.初始化SparkEnv
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
4.创建MetadataCleaner
_metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
5.创建SparkStatusTracker,通过JobProgressListener获取spark app中的Job和Stage信息
_statusTracker = new SparkStatusTracker(this)
6.创建ConsoleProcessBar,通过从SparkStatusTracker获取Stage的执行进度并在控制台显示
_progressBar = if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None }
7.创建HeartBeatReceiver
_heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
8.创建TaskScheduler,同时创建TaskSchedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
9.创建DAGScheduler
_dagScheduler = new DAGScheduler(this)
10.启动TaskScheduler
_taskScheduler.start()
11.通过TaskScheduler获取applicationId
_applicationId = _taskScheduler.applicationId()
12.blockManager初始化
_env.blockManager.initialize(_applicationId)
13.启动MetricsSystem
metricsSystem.start()
14.Attach driver metrics servlet handler to the web ui
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
15.创建并启动EventLoggingListener,listener向listenerBus注册
_eventLogger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration) logger.start()
listenerBus.addListener(logger)
16.valdynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
17.创建并启动ExecutorAllocationManager
_executorAllocationManager=new ExecutorAllocationManager(this, listenerBus, _conf)
_executorAllocationManager.start()
18.创建并启动ContextCleaner
_cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this))
19.设置并启动ListenerBus
20.postEnvironmentUpdate
21.postApplicationStart
22.
// Post init _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") stop() }
相关文章推荐
- Spark env
- VMWare虚拟机提示:锁定文件失败,打不开磁盘的解决办法
- 设计模式之建造者模式
- Linux常用命令(九)上机操作操作题解答
- css水平居中(一)
- ref out
- 2007版的office软件下载及简介
- Fourth-更丰富的用户界面
- HashMap和Hashtable的区别
- android连接Mysql数据库之JDBC方式
- 漫谈代理模式
- Linux 之 shell 比较运算符
- Third-实现基本交互
- linux中开启和关闭防火墙
- Spark RpcEnv
- Spark中的事件处理分析
- Fragment学习关注网址
- Rxjava中toList操作符
- ios清除缓存3
- 尖峰日96万订单,59校园狂欢节技术架构剖析