您的位置:首页 > 其它

Spark源码分析-以wordcount为入口进行spark-submit源码分析

2017-03-14 17:57 615 查看


1、以wordcount开始spark源码分析


1)Standalone模式启动

启动Master,start-master.sh输出:
/home/weiw/spark-1.6.3-bin-hadoop2.6/sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip ubuntu --port 7077 --webui-port 8080


启动Wroker,start-slave.sh输出:./start-slave.sh spark://ubuntu:7077
/home/weiw/spark-1.6.3-bin-hadoop2.6/sbin/spark-config.sh
/home/weiw/spark-1.6.3-bin-hadoop2.6/bin/load-spark-env.sh
/home/weiw/spark-1.6.3-bin-hadoop2.6/sbin/spark-daemon.sh start org.apache.spark.deploy.worker.Worker 1 --webui-port 8081 spark://ubuntu:7077


根据输出命令的结果分析可以知道,最后都是调用了相应的Master和Worker类来启动Spark服务的。


2)在IDEA中执行wordcount

首先分析Spark使用spark-submit提交任务时需要的参数已经调用核心类的情况。

spark-submit.sh提交workcount.jar任务:
./bin/spark-submit --class main.scala.com.bbd.WordCount --master spark://192.168.1.20:7077 /home/weiw/wordcount.jar  /home/weiw/project_spark/hello /home/weiw/project_spark/out_hello_count


spark-submit.sh核心执行命令输出:
/home/weiw/spark-1.6.3-bin-hadoop2.6/bin/spark-class org.apache.spark.deploy.SparkSubmit --master=spark://ubuntu:7077 /home/weiw/wordcount.jar


spark-class.sh核心执行命令输出:
/home/weiw/jdk1.7.0_80/bin/java
-cp /home/weiw/spark-1.6.3-bin-hadoop2.6/conf/:
/home/weiw/spark-1.6.3-bin-hadoop2.6/lib/spark-assembly-1.6.3-hadoop2.6.0.jar:
/home/weiw/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar:
/home/weiw/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar:
/home/weiw/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar
-Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit
--master spark://ubuntu:7077 /home/weiw/wordcount.jar

根据提交wordcount例子的命令路径分析,最后任务的提交会执行org.apache.spark.deploy.SparkSubmit类,因此在IDE中,可以在该类中直接进行任务提交。

在IDEA中启动Master和Worker:
Spark master启动的入口:org.apache.spark.deploy.master.Master
SparkWorker启动入口:org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://192.168.1.9:7077

启动Worker需要进行一些参数配置:



IDEA中提交wordcount任务配置:



wordcount例子源代码:




2、SparkSubmit源码分析

spark-submit提交任务后,执行org.apache.spark.deploy.SparkSubmit.main()方法:

SparkSubmit.main()
object SparkSubmit {
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
printStream.println(appArgs)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}

args:SparkSubmit.main方法的参数:



下面是经过SparkSubmitArguments类进行参数解析后的参数,然后根据action参数值决定执行submit函数
appArgs :Parsed arguments:
master                  spark://192.168.1.20:7077
deployMode              null
executorMemory          null
executorCores           null
totalExecutorCores      null
propertiesFile          null
driverMemory            null
driverCores             null
driverExtraClassPath    null
driverExtraLibraryPath  null
driverExtraJavaOptions  null
supervise               false
queue                   null
numExecutors            null
files                   null
pyFiles                 null
archives                null
mainClass               main.scala.com.bbd.WordCount
primaryResource         file:/home/weiw/wordcount.jar
name                    main.scala.com.bbd.WordCount
childArgs               [/home/weiw/project_spark/hello /home/weiw/project_spark/out_hello_count2]
jars                    null
packages                null
packagesExclusions      null
repositories            null
verbose                 false
action SUBMIT


SparkSubmit.submit():
private def submit(args: SparkSubmitArguments): Unit = {
// 设置deployMode,classpath,mainClass,clusterManager,files,jars等等参数值
// childArgs=/home/weiw/project_spark/hello,/home/weiw/project_spark/out_hello_count2
// childClasspath=file:/home/weiw/wordcount.jar
// sysProps=[(SPARK_SUBMIT,true),(spark.app.name,main.scala.com.ttt.WordCount),(spark.jars,file:/home/user/wordcount.jar),(spark.submit.deployMode,client),(spark.master,spark://ip:7077)]
// childMainClass=main.scala.com.bbd.WordCount
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) { //false
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
if (e.getStackTrace().length == 0) {
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
exitFn(1)
} else {
throw e
}
}
} else {
// 类加载器加载mainClass,不清楚ClassLoader的见参考文档资料
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}
if (args.isStandaloneCluster && args.useRest) { // false
try {
printStream.println("Running Spark using the REST application submission protocol.")
doRunMain()
} catch {
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
} else {
doRunMain()
}
}


SparkSubmit.runMain()

最后通过反射机制执行mainClass(wordcount)的main方法,进行SparkContext的初始化,以及RDD的构建调度计算等操作,SparkContext的初始化,任务的计算和提交下一篇进行分析。
private def runMain(
childArgs: Seq[String],
childClasspath: Seq[String],
sysProps: Map[String, String],
childMainClass: String,
verbose: Boolean): Unit = {
val loader =
if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
mainClass = Utils.classForName(childMainClass)
val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 源码分析