SparkSubmit object分析
2021-04-22 14:54
1196 查看
org.apache.spark.deploy.SparkSubmit分析
-
Object SparkSubmit类图分析
结合spark的log4j来说,printStream为error,log4j的配置也是System.error,导致做日志收集的时候莫名其妙。。。。
解决方案如下,采用重新构造log4j的appender:Logger.getRootLogger.removeAllAppenders() Logger.getRootLogger.setLevel(Level.INFO) val console = new ConsoleAppender() console.setTarget("System.out") val patternLayout = new PatternLayout() patternLayout.setConversionPattern("%c{1}: %m%n") console.setLayout(patternLayout) console.setEncoding("UTF-8") console.setName("console") console.activateOptions() Logger.getRootLogger.addAppender(console)
-
main方法执行
-
创建org.apache.spark.deploy.SparkSubmit类,重写parseArguments、logInfo、logWarning、doSubmit方法,执行doSubmit方法。
- 调用duSubmit方法,实际调用org.apache.spark.deploy.SparkSubmit类的submit对象。
-
执行doSubmit方法
-
初始化log4j,查找classloader中org/apache/spark/log4j-defaults.properties,如果存在,PropertyConfigurator.configure("org/apache/spark/log4j-defaults.properties")。如果不在输出Spark was unable to load org/apache/spark/log4j-defaults.properties。
- 解析提交参数,返回SparkSubmitArguments对象,具体实现方式如图:
- 如果verbose=true,输出参数信息
- 通过action,执行不同的启动方法,这里以SparkSubmitAction.SUBMIT为例。
- 执行submit方法。首先判断master.startsWith("spark://") && deployMode == "cluster",这里我们应该是false,执行doRun方法。查看是否设置了proxyUser,如果是设置hdfs的security(一些大数据平台修改了底层认证,如tbds、fi等,应该相对应在这里的源码做了处理)。执行runMain方法。
-
runMain方法
-
执行prepareSubmitEnvironment
- 设置classloader,如果设置了spark.driver.userClassPathFirst=true,创建出ChildFirstURLClassLoader,否则创建出MutableURLClassLoader。
- 把childClasspath加载到当前的classloader
- 利用反射初始化mainclass,根据mainclass的类型创建出不同的SparkApplication,scala.App的是JavaMainApplication,其他的则用SparkApplication
- 执行application start()方法。
-
执行prepareSubmitEnvironment
-
创建SparkConf!!!,将system的spark开头的env参数加载。
- 根据args.master设置clusterManager=1(YARN),根据args.deployMode,如果没有填,默认是client,如果填了cluster,默认是cluster。如果master写了yarn-cluster,默认为yarn,cluster,测试org.apache.spark.deploy.yarn.YarnClusterApplication是否可以被加载。默认情况下可能找不到这个类,因为默认没有加入到maven编译内,尴尬,只能手动加入!!!
- args.sparkProperties加入到sparkConf。
- 创建org.apache.hadoop.conf.Configuration,将sparkConf中以spark.hadoop.开头的参数截取,添加到configuration中,设置默认的io.file.buffer.size为65536。
- 创建临时目录,地址在System.getProperty("java.io.tmpdir"),对于linux系统一般为/tmp目录。目录名称为spark-#{UUID},添加ShutdownHookManager,当程序结束时清理目录。
- 设置principal,keytab。在sparkconf中存入,并且在hdfs登录。
- 如果deployMode是client,移动--jars,--files,--pyFiles,--archives到第5步创建好的目录内,并且将这些参数的地址修改为临时目录下的真是地址。
- 以下就是添加相关参数。
- childMainClass如果是client模式,最终为我们所写的main函数,如果是cluster模式,最终为org.apache.spark.deploy.yarn.YarnClusterApplication。
-
sparkconf内容变化
-
加载系统配置,加载spark.开头的参数
- set命令行加载的参数
- 设置spark.yarn.keytab,spark.yarn.principal参数
- 加载SparkSubmit496-555行的参数,参数的具体值根据命令行设置
- 重复2操作,设置不存在的命令行参数??why??
- 将附件做文件路径解析后的路径set
-
childClasspath变化过程
-
如果是client模式,把提交的jar和--jars的jar加入classpath
- 如果是yarn模式且deploy为cluster时,把提交的jar和--jars的jar加入classpath
-
childArgs变化
-
如果是client模式,把提交命令行的args参数加入到childArgs
- 把所有的OptionAssigner加入childArgs
- 如果是yarn-cluster模式,加入--class参数,--jar参数,把提交命令行的args参数加入到childArgs
相关文章推荐
- Spark源码分析-以wordcount为入口进行spark-submit源码分析
- spark-core_02: $SPARK_HOME/bin/spark-submit、spark-class脚本分析
- Spark源码分析之Spark-submit和Spark-class
- spark 2.2 源码分析 Spark-submit 篇
- spark-core_04: org.apache.spark.deploy.SparkSubmit源码分析:
- spark-submit后出现错误 NoSuchMethodError: scala.runtime.VolatileObjectRef.zero()Lscala/runtime/Volatile..
- Spark1.3从创建到提交:2)spark-submit和SparkContext源码分析
- 5-Spark高级数据分析-第五章 基于K均值聚类的网络流量异常检测
- spark-submit
- Spark 源码阅读(2)——spark-submit 提交流程
- Spark SQL Catalyst源码分析之Analyzer
- Spark源码分析之五:Task调度(一)
- Spark底层通信RPC源码分析
- 运维系列:07、spark-submit
- Cassandra联手Spark 大数据分析将迎来哪些改变?
- 大数据IMF传奇行动绝密课程第98-99课:使用Spark Streaming实战对论坛网站动态行为的多维度分析
- Spark源码分析之Rpc
- 【转】分析.net中的object sender与EventArgs e
- Spark项目之电商用户行为分析大数据平台之(二)CentOS7集群搭建
- Spark1.4.0-SparkSQL与Hive整合-支持窗口分析函数