Spark源码分析-以wordcount为入口进行spark-submit源码分析
2017-03-14 17:57
615 查看
1、以wordcount开始spark源码分析
1)Standalone模式启动
启动Master,start-master.sh输出:/home/weiw/spark-1.6.3-bin-hadoop2.6/sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip ubuntu --port 7077 --webui-port 8080
启动Wroker,start-slave.sh输出:./start-slave.sh spark://ubuntu:7077
/home/weiw/spark-1.6.3-bin-hadoop2.6/sbin/spark-config.sh /home/weiw/spark-1.6.3-bin-hadoop2.6/bin/load-spark-env.sh /home/weiw/spark-1.6.3-bin-hadoop2.6/sbin/spark-daemon.sh start org.apache.spark.deploy.worker.Worker 1 --webui-port 8081 spark://ubuntu:7077
根据输出命令的结果分析可以知道,最后都是调用了相应的Master和Worker类来启动Spark服务的。
2)在IDEA中执行wordcount
首先分析Spark使用spark-submit提交任务时需要的参数已经调用核心类的情况。spark-submit.sh提交workcount.jar任务:
./bin/spark-submit --class main.scala.com.bbd.WordCount --master spark://192.168.1.20:7077 /home/weiw/wordcount.jar /home/weiw/project_spark/hello /home/weiw/project_spark/out_hello_count
spark-submit.sh核心执行命令输出:
/home/weiw/spark-1.6.3-bin-hadoop2.6/bin/spark-class org.apache.spark.deploy.SparkSubmit --master=spark://ubuntu:7077 /home/weiw/wordcount.jar
spark-class.sh核心执行命令输出:
/home/weiw/jdk1.7.0_80/bin/java -cp /home/weiw/spark-1.6.3-bin-hadoop2.6/conf/: /home/weiw/spark-1.6.3-bin-hadoop2.6/lib/spark-assembly-1.6.3-hadoop2.6.0.jar: /home/weiw/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar: /home/weiw/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar: /home/weiw/spark-1.6.3-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar -Xms1g -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master spark://ubuntu:7077 /home/weiw/wordcount.jar
根据提交wordcount例子的命令路径分析,最后任务的提交会执行org.apache.spark.deploy.SparkSubmit类,因此在IDE中,可以在该类中直接进行任务提交。
在IDEA中启动Master和Worker:
Spark master启动的入口:org.apache.spark.deploy.master.Master
SparkWorker启动入口:org.apache.spark.deploy.worker.Worker --webui-port 8081 spark://192.168.1.9:7077
启动Worker需要进行一些参数配置:
IDEA中提交wordcount任务配置:
wordcount例子源代码:
2、SparkSubmit源码分析
spark-submit提交任务后,执行org.apache.spark.deploy.SparkSubmit.main()方法:SparkSubmit.main()
object SparkSubmit { def main(args: Array[String]): Unit = { val appArgs = new SparkSubmitArguments(args) if (appArgs.verbose) { printStream.println(appArgs) } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
args:SparkSubmit.main方法的参数:
下面是经过SparkSubmitArguments类进行参数解析后的参数,然后根据action参数值决定执行submit函数
appArgs :Parsed arguments: master spark://192.168.1.20:7077 deployMode null executorMemory null executorCores null totalExecutorCores null propertiesFile null driverMemory null driverCores null driverExtraClassPath null driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutors null files null pyFiles null archives null mainClass main.scala.com.bbd.WordCount primaryResource file:/home/weiw/wordcount.jar name main.scala.com.bbd.WordCount childArgs [/home/weiw/project_spark/hello /home/weiw/project_spark/out_hello_count2] jars null packages null packagesExclusions null repositories null verbose false action SUBMIT
SparkSubmit.submit():
private def submit(args: SparkSubmitArguments): Unit = { // 设置deployMode,classpath,mainClass,clusterManager,files,jars等等参数值 // childArgs=/home/weiw/project_spark/hello,/home/weiw/project_spark/out_hello_count2 // childClasspath=file:/home/weiw/wordcount.jar // sysProps=[(SPARK_SUBMIT,true),(spark.app.name,main.scala.com.ttt.WordCount),(spark.jars,file:/home/user/wordcount.jar),(spark.submit.deployMode,client),(spark.master,spark://ip:7077)] // childMainClass=main.scala.com.bbd.WordCount val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { //false val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) } catch { case e: Exception => if (e.getStackTrace().length == 0) { printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") exitFn(1) } else { throw e } } } else { // 类加载器加载mainClass,不清楚ClassLoader的见参考文档资料 runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } if (args.isStandaloneCluster && args.useRest) { // false try { printStream.println("Running Spark using the REST application submission protocol.") doRunMain() } catch { case e: SubmitRestConnectionException => printWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args) } } else { doRunMain() } }
SparkSubmit.runMain()
最后通过反射机制执行mainClass(wordcount)的main方法,进行SparkContext的初始化,以及RDD的构建调度计算等操作,SparkContext的初始化,任务的计算和提交下一篇进行分析。
private def runMain( childArgs: Seq[String], childClasspath: Seq[String], sysProps: Map[String, String], childMainClass: String, verbose: Boolean): Unit = { val loader = if (sysProps.getOrElse("spark.driver.userClassPathFirst", "false").toBoolean) { new ChildFirstURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } else { new MutableURLClassLoader(new Array[URL](0), Thread.currentThread.getContextClassLoader) } Thread.currentThread.setContextClassLoader(loader) mainClass = Utils.classForName(childMainClass) val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass) }
相关文章推荐
- Spark源码分析(1) 从WordCount示例看Spark延迟计算原理
- SparkStreaming的WordCount示例及源码分析(一)
- SparkStreaming的WordCount示例及源码分析(二)
- SparkStreaming的WordCount示例及源码分析(三)
- Spark源码分析(1) 从WordCount示例看Spark延迟计算原理
- Spark源码分析(1) 从WordCount示例看Spark延迟计算原理
- MapReduce之 WordCount 源码分析和操作流程
- WordCount源码分析
- spark-submit 错误: ava.lang.ClassNotFoundException: WordCount
- 从源码剖析一个Spark WordCount Job执行的全过程
- 在Spark shell中基于HDFS文件系统进行wordcount交互式分析
- hadoop之WordCount源码分析
- Spark源码编译并在YARN上运行WordCount实例
- IDEA【基本配置1】配置SBT 和 scala 并在spark环境中进行wordcount测试(spark集群运行模式)
- 启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计
- Hadoop学习笔记-WordCount源码分析
- IMF SPARK 源代码发行定制班 预习课程 Spark框架源码的调试 (2) 从master worker main入口进行调试
- 从源码剖析一个Spark WordCount Job执行的全过程
- hadoop 实战———WordCount源码分析
- Hadoop2.2.0源码分析(一)——Eclipse运行WordCount.java