您的位置:首页 > 其它

spark2.2.0源码学习过程记录:Day7

2017-09-09 18:12 127 查看
1、读《apache spark 源码剖析》第四章第3节、第五章
上面读的这些部分中,暂时只关心第五章部署方式分析中的第4节的内容(5.4 原生集群Standalone Cluster),所以就一直读到了这章结束,下面开始看源码

2、源码学习
Standalone Cluster方式启动集群时,只要有两条线:Master、Worker(其中还有Executor的启动)

Master启动过程分析:
首先看start-master.sh脚本
看脚本发现调用了spark-daemon.sh start "org.apache.spark.deploy.master.Master" 1

看这个脚本,发现调用了"${SPARK_HOME}"/bin/spark-class "$command" "$@"
于是再看spark-class,其中调用了java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
上述中的"$@"为参数列表

下面看launcher.Main,因为最早分析spark-shell时已经看过这个类,这次只看关注的点
首先创建了SparkClassCommandBuilder,然后调用他的buildCommand方法,在其中看到当要启动的类是Master时,为其添加相关配置。在这个方法里也可以看出:Master、Worker、HistoryServer、CoarseGrainedExecutorBackend等进程也是从这里启动

类org.apache.spark.deploy.master.Master
上面的从Main中打印出了执行脚本,在spark-class类中执行,所以Master的最终入口在Master类中,下面分析这个类。

Object Master
创建SparkConf
创建MasterArguments,其中封装了master的host、port、webUiPort等
调用startRpcEnvAndEndpoint方法

这个方法中首先创建SecurityManager(负责安全相关),然后调用RpcEnv.create创建rpcEnv,然后调用rpcEnv.setupEndpoint(在这个方法中创建了Master)创建master 的RpcEndpoint,然后调用masterEndpoint.askSync(具体目的不清楚,后面再分析)

类RpcEnv
create方法首先创建一个RpcEnvConfig用来封装配置信息
然后调用new NettyRpcEnvFactory().create(config)返回一个RpcEnv对象(是一个NettyRpcEnv),如果不是clientMode模式,则这个RpcEnv对象在返回前就启动服务(调用nettyEnv.startServer)
然后生成一个Master对象,并且将其作为参数调用rpcEnv.setupEndpoint方法,返回一个RpcEndpointRef

类NettyRpcEnvFactory
create方法负责创建和启动NettyRpcEnv实例

类NettyRpcEnv
setupEndpoint方法中调用了dispatcher.registerRpcEndpoint,这个方法中创建一个RpcEndpointRef,然后创建EndpointData,并且存下相关的关联信息,然后调用receivers.offer(data)用于触发master的onStart方法
dispathcer是一个消息分发器,将消息分发给相应的endpoint

类Master
onStart方法首先启动webUi
启动检查deadWorker的任务
启动restServer
启动资源监控功能
初始化持久化引擎(persistenceEngine)和leader选举代理(leaderElectionAgent)
receive方法处理各种事件,包括RegisterWorker、RegisterApplication、ExecutorStateChanged、Heartbeat、MasterChangeAcknowledged等

至此,Master的大致启动过程分析完毕

Worker启动过程分析:
类似的,查看start-slave.sh脚本,最后发现启动的类是org.apache.spark.deploy.worker.Worker
大致看了几个类,启动过程和master非常相似,所以就只看一下Worker类

类Worker
receive方法处理各种事件,包括SendHeartbeat、WorkDirCleanup、MasterChanged、ReconnectWorker、LaunchExecutor、KillExecutor、LaunchDriver、KillDriver等
onStart启动shuffleService、webUi、向master注册自己、启动资源监控功能

Executor启动过程分析:
当Worker的receive方法收到LaunchExecutor信息时
创建临时文件夹
创建ExecutorRunner,用于管理executor 进程
调用刚刚创建的ExecutorRunner实例的start方法启动executor
向master汇报ExecutorStateChanged的消息
在ExecutorRunner的start方法中并没有找到具体的执行命令,只是看出是启动的新进程,回到master中寻找,

类Master
receive方法中的RegisterApplication事件调用了registerApplication,registerApplication中调用了createApplication的方法创建ApplicationInfo
但是其中封装了命令行参数的ApplicationDescription类是从外部传来的,还要继续看
用intellij寻找这个类相关的类,查到StandaloneSchedulerBackend类中有初始化过cmd为org.apache.spark.executor.CoarseGrainedExecutorBackend

类CoarseGrainedExecutorBackend
看一下这个类的main方法,首先解析参数,然后执行run方法
在其中创建了一个CoarseGrainedExecutorBackend的env
receive方法中会处理RegisteredExecutor、RegisterExecutorFailed、LaunchTask、KillTask、StopExecutor、Shutdown等
在onStart方法中向driver注册自己,注册成功后会生成一个Executor对象,用于执行任务
LaunchTask事件会调用Executor对象的launchTask方法

至此就和之前看过的spark作业提交的部分联系上了,后面会开始看spark sql的解析,spark streaming会读一下,暂时不会看源码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: