spark源码解析-从提交任务到jar的加载运行(基于2.1.0版本)
2017-05-19 13:30
731 查看
本文分析的源码基于Spark2.1.0版本,如果有理解不当的地方欢迎批评指正。
在之前的一篇文章中我们分析了Spark-submit脚本,发现该脚本会调用spark-class脚本检查参数设置,以及提交任务。最后发现,提交任务的入口类是org.apache.spark.deploy.SparkSubmit 我们接下来深入这个类,看看从提交任务到执行用户jar包之间都发生了什么;
首先找到org.apache.spark.deploy.SparkSubmit类的main方法:
def
main(args: Array[String]):
Unit = {
val appArgs =
new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match
{
case SparkSubmitAction.SUBMIT
=> submit(appArgs)
case SparkSubmitAction.KILL
=> kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS
=> requestStatus(appArgs)
}
}
main方法很简单,首先利用参数创建了一个SparkSubmitArguments 这个类是SparkSubmitArgumentsParser的子类,主要工作就是对Spark应用的参数进行解析,以及加载当前和Spark相关的环境变量。
在SparkSubmitArguments中有一个action成员,用于表示spark-submit的动作,一般来说使用spark-submit有三个目的,第一是提交应用(这也是最常用的),第二是可以通过spark-submit杀死某个任务,第三是获取某个正在执行的任务的状态。这个action是通过参数指定的,默认值为submit即提交一个任务。
我们可以跳到submit方法看看,该方法定义如下:
仅接受一个SparkSubmitArguments实例作为参数,这个方法执行两个步骤,首先是基于集群管理器和部署模式设置合适的classpath、系统属性和应用程序参数,以此为运行用户的main方法做环境准备。
然后,使用第一步准备好的环境来启动main方法,这是通过反射完成的,我们下面再看。
Submit方法最终执行了其内部定义的doRunMain,而doRunMain方法会调用runMain(line 169)
在runMain方法中可以看到这三行代码:分别是line 695:
这行代码利用反射加载main方法所在类,Utils.classForName方法最终还是调用的Class.forName方法;
line 722:
以及line 738:
上述两行分别获取main方法,然后执行main方法。这些动作都是在driver端完成的。
再理一下思路,spark提交jar的过程如下:
Spark-submit -> Spark-class -> org.apache.spark.deploy.SparkSubmit-> { main -> submit -> doRunMan -> runMain}
org.apache.spark.deploy.SparkSubmit主要负责准备运行环境以及通过反射获取app的main方法并执行。
为了方便理解,接下来我们对Spark利用反射加载运行用户应用程序的main方法做一个简易实现:
新建一个scala工程,在com.load包下新建一个Main类:
这个用来模拟我们自己开发的Spark应用,然后打jar包,生成ScalaTest.jar
再创建另一个工程,将刚才的jar作为依赖添加进来,并写如下类:
这个用来模拟Spark本身,运行即可发现输出如下:
这就是Spark提交jar的机制,我们也可以发现这里的main方法仅作为普通方法执行,只不过Spark会检查该mian方法是不是静态的,如果不是静态就抛出异常拒绝执行,如果修改722行的“main”字串也可以实现以任意方法名为Spark app执行入口。
在之前的一篇文章中我们分析了Spark-submit脚本,发现该脚本会调用spark-class脚本检查参数设置,以及提交任务。最后发现,提交任务的入口类是org.apache.spark.deploy.SparkSubmit 我们接下来深入这个类,看看从提交任务到执行用户jar包之间都发生了什么;
首先找到org.apache.spark.deploy.SparkSubmit类的main方法:
def
main(args: Array[String]):
Unit = {
val appArgs =
new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match
{
case SparkSubmitAction.SUBMIT
=> submit(appArgs)
case SparkSubmitAction.KILL
=> kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS
=> requestStatus(appArgs)
}
}
main方法很简单,首先利用参数创建了一个SparkSubmitArguments 这个类是SparkSubmitArgumentsParser的子类,主要工作就是对Spark应用的参数进行解析,以及加载当前和Spark相关的环境变量。
在SparkSubmitArguments中有一个action成员,用于表示spark-submit的动作,一般来说使用spark-submit有三个目的,第一是提交应用(这也是最常用的),第二是可以通过spark-submit杀死某个任务,第三是获取某个正在执行的任务的状态。这个action是通过参数指定的,默认值为submit即提交一个任务。
我们可以跳到submit方法看看,该方法定义如下:
private def submit(args: SparkSubmitArguments): Unit
仅接受一个SparkSubmitArguments实例作为参数,这个方法执行两个步骤,首先是基于集群管理器和部署模式设置合适的classpath、系统属性和应用程序参数,以此为运行用户的main方法做环境准备。
然后,使用第一步准备好的环境来启动main方法,这是通过反射完成的,我们下面再看。
Submit方法最终执行了其内部定义的doRunMain,而doRunMain方法会调用runMain(line 169)
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
在runMain方法中可以看到这三行代码:分别是line 695:
mainClass = Utils.classForName(childMainClass)
这行代码利用反射加载main方法所在类,Utils.classForName方法最终还是调用的Class.forName方法;
line 722:
valmainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
以及line 738:
mainMethod.invoke(null, childArgs.toArray)
上述两行分别获取main方法,然后执行main方法。这些动作都是在driver端完成的。
再理一下思路,spark提交jar的过程如下:
Spark-submit -> Spark-class -> org.apache.spark.deploy.SparkSubmit-> { main -> submit -> doRunMan -> runMain}
org.apache.spark.deploy.SparkSubmit主要负责准备运行环境以及通过反射获取app的main方法并执行。
为了方便理解,接下来我们对Spark利用反射加载运行用户应用程序的main方法做一个简易实现:
新建一个scala工程,在com.load包下新建一个Main类:
package com.load /** * Created by hunan on 2017/5/19. */ object Main { def main(args: Array[String]): Unit = { println("Run main method success!") } }
这个用来模拟我们自己开发的Spark应用,然后打jar包,生成ScalaTest.jar
再创建另一个工程,将刚才的jar作为依赖添加进来,并写如下类:
package com.example /** * Created by hunan on 2017/5/19. */ object Test { def main(args: Array[String]): Unit = { var mainClass:Class[_]=null try { mainClass = Class.forName("com.load.Main", true, Thread.currentThread().getContextClassLoader) }catch{ case e:Exception=> e.printStackTrace() } if(mainClass!=null){ val mainMethod = mainClass.getMethod("main",new Array[String](0).getClass) mainMethod.invoke(null,Array[String]()) } } }
这个用来模拟Spark本身,运行即可发现输出如下:
这就是Spark提交jar的机制,我们也可以发现这里的main方法仅作为普通方法执行,只不过Spark会检查该mian方法是不是静态的,如果不是静态就抛出异常拒绝执行,如果修改722行的“main”字串也可以实现以任意方法名为Spark app执行入口。
相关文章推荐
- Spark源码解析之任务提交(spark-submit)篇
- 版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码
- Spark 源码解析:彻底理解TaskScheduler的任务提交和task最佳位置算法
- 版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码
- Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法
- [spark] TaskScheduler 任务提交与调度源码解析
- (版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码
- Apache Spark源码走读之2:Job的提交与运行
- Apache Spark源码分析-- Job的提交与运行
- 基于HDFS的SparkStreaming案例实战和内幕源码解析
- spark源码阅读2-Job的提交与运行
- Spark源码分析之Job提交运行总流程概述
- Android 源码解析: 图片加载库Picasso 4 任务调度 Dispatcher
- Spark内核介绍:Spark在运行时会把Stage包装成任务提交 (二)
- spark源码学习(八):spark具体是如何使用集群的资源去运行任务
- spark源码分析之Executor启动与任务提交篇
- spark core源码分析1 集群启动及任务提交过程
- Spark技术内幕: Task向Executor提交的源码解析
- spark下使用submit提交任务后报jar包已存在错误
- Spark源码阅读笔记:任务提交流程整理