Spark1.3从创建到提交:2)spark-submit和SparkContext源码分析
2016-12-29 21:32
465 查看
spark-sumit脚本
假如Spark提交的命令如下:bin/spark-submit --jar lib/spark-examples-1.3.1-hadoop2.6.0.jar --class cn.spark.WordCount --master spark://master:7077 --executor-memory 2g --total-executor-cores 4
其shell脚本的核心执行语句:
exec "$SPARK_HOME"/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"和上一节的Master节点的启动一样,它最最终也调用了spark-class
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
org.apache.spark.deploy.SparkSubmit
先看下其伴生对象的main方法def main(args: Array[String]): Unit = { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { printStream.println(appArgs) } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }通过参数识别出操作是提交Action,调用submit(appArgs)方法,在里面最终调用了一个比较重要的函数runMain
private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = { /* only keep the core codes */ //childMainClass就是前面spark-sumit中的--class cn.spark.WordCount mainClass = Class.forName(childMainClass, true, loader); //获得WordCount类中的main方法 val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) //触发了WordCount.main方法 mainMethod.invoke(null, childArgs.toArray) }
上面的代码运行后,通过反射机制,我们定义的代码被执行了,比如我的写的WordCount如下:
object WordCount { def main(args: Array[String]): Unit = { val conf=new SparkConf() val sc=new SparkContext(conf) val rdd=sc.textFile("/tmp/word.txt"); val rdd1=rdd.flatMap(_.split("\\s")).map((_,1)).reduceByKey(_+_) rdd.collect() sc.stop() } }
SparkContext
SparkContext是进入Spark的唯一入口,在SparkContext被初始化的时候主要做了如下的事情private[spark] val env = createSparkEnv(conf, isLocal, listenerBus) private[spark] var (schedulerBackend, taskScheduler) =SparkContext.createTaskScheduler(this, master) @volatile private[spark] var dagScheduler: DAGScheduler = _ dagScheduler = new DAGScheduler(this) taskScheduler.start()
1.创建了SparkEnv(ActorSystem),负责driver和excutor的通信
2.创建了DagScheduler
3.创建了TaskScheduler,并启动了taskScheduler
SparkEnv
sparkEnv主要负责Spark之间的通信,Spark1.3使用的是Akka,Spark1.6之后变成了Netty(其实akka的底层就是netty)private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus) }最终在 类sparkEnv中调用了create方法,又看见了熟悉的面孔actorSystem
private def create( conf: SparkConf, executorId: String, hostname: String, port: Int, isDriver: Boolean, isLocal: Boolean, listenerBus: LiveListenerBus = null, numUsableCores: Int = 0, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { // Listener bus is only used on the driver if (isDriver) { assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!") } val securityManager = new SecurityManager(conf) // Create the ActorSystem for Akka and get the port it binds to. val (actorSystem, boundPort) = { val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager) } //...... }
如下贴出Spark提交时的工作图,里面的有些内容在之后的章节中提及到
啰嗦下:最近忙着开题都没有时间玩Spark了,加上最近有点感冒,博客也没时间写了..
相关文章推荐
- Spark1.3从创建到提交:6)Executor和Driver互动源码分析
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- Spark1.3从创建到提交:10)任务提交源码分析
- Spark1.3从创建到提交:5)Executor启动源码分析
- Spark1.3从创建到提交:3)任务调度初始化源码分析
- Spark1.3从创建到提交:4)资源分配源码分析
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- SparkSubmit 提交作业源码流程粗略概述(含application中 driver、client、 executor的创建)
- Spark2.X源码学习--从SparkContext创建分析AppClient注册
- Spark技术内幕:Stage划分及提交源码分析
- Spark源码分析之四:Stage提交
- spark 1.6.0 core源码分析1 集群启动及任务提交过程
- Spark1.3从创建到提交:7)SparkContext.runJob源码分析
- 从Spark-Shell到SparkContext的函数调用路径过程分析(源码)
- Spark源码分析之Job提交运行总流程概述
- Spark技术内幕:Stage划分及提交源码分析
- spark core源码分析5 spark提交框架
- Spark技术内幕:Stage划分及提交源码分析
- Spark源码系列(一)spark-submit提交作业过程