您的位置:首页 > 移动开发 > Objective-C

SparkSubmit object分析

2021-04-22 14:54 1196 查看

org.apache.spark.deploy.SparkSubmit分析

  • Object SparkSubmit类图分析


    结合spark的log4j来说,printStream为error,log4j的配置也是System.error,导致做日志收集的时候莫名其妙。。。。
    解决方案如下,采用重新构造log4j的appender:

    Logger.getRootLogger.removeAllAppenders()
    Logger.getRootLogger.setLevel(Level.INFO)
    val console = new ConsoleAppender()
    console.setTarget("System.out")
    val patternLayout = new PatternLayout()
    patternLayout.setConversionPattern("%c{1}: %m%n")
    console.setLayout(patternLayout)
    console.setEncoding("UTF-8")
    console.setName("console")
    console.activateOptions()
    Logger.getRootLogger.addAppender(console)
  • main方法执行

      创建org.apache.spark.deploy.SparkSubmit类,重写parseArguments、logInfo、logWarning、doSubmit方法,执行doSubmit方法。
    1. 调用duSubmit方法,实际调用org.apache.spark.deploy.SparkSubmit类的submit对象。
  • 执行doSubmit方法

      初始化log4j,查找classloader中org/apache/spark/log4j-defaults.properties,如果存在,PropertyConfigurator.configure("org/apache/spark/log4j-defaults.properties")。如果不在输出Spark was unable to load org/apache/spark/log4j-defaults.properties
    1. 解析提交参数,返回SparkSubmitArguments对象,具体实现方式如图:
    2. 如果verbose=true,输出参数信息
    3. 通过action,执行不同的启动方法,这里以SparkSubmitAction.SUBMIT为例。
    4. 执行submit方法。首先判断master.startsWith("spark://") && deployMode == "cluster",这里我们应该是false,执行doRun方法。查看是否设置了proxyUser,如果是设置hdfs的security(一些大数据平台修改了底层认证,如tbds、fi等,应该相对应在这里的源码做了处理)。执行runMain方法。
  • runMain方法

      执行prepareSubmitEnvironment
    1. 设置classloader,如果设置了spark.driver.userClassPathFirst=true,创建出ChildFirstURLClassLoader,否则创建出MutableURLClassLoader。
    2. 把childClasspath加载到当前的classloader
    3. 利用反射初始化mainclass,根据mainclass的类型创建出不同的SparkApplication,scala.App的是JavaMainApplication,其他的则用SparkApplication
    4. 执行application start()方法。
  • 执行prepareSubmitEnvironment

      创建SparkConf!!!,将system的spark开头的env参数加载。
    1. 根据args.master设置clusterManager=1(YARN),根据args.deployMode,如果没有填,默认是client,如果填了cluster,默认是cluster。如果master写了yarn-cluster,默认为yarn,cluster,测试org.apache.spark.deploy.yarn.YarnClusterApplication是否可以被加载。默认情况下可能找不到这个类,因为默认没有加入到maven编译内,尴尬,只能手动加入!!!
    2. args.sparkProperties加入到sparkConf
    3. 创建org.apache.hadoop.conf.Configuration,将sparkConf中以spark.hadoop.开头的参数截取,添加到configuration中,设置默认的io.file.buffer.size65536
    4. 创建临时目录,地址在System.getProperty("java.io.tmpdir"),对于linux系统一般为/tmp目录。目录名称为spark-#{UUID},添加ShutdownHookManager,当程序结束时清理目录。
    5. 设置principal,keytab。在sparkconf中存入,并且在hdfs登录。
    6. 如果deployMode是client,移动--jars,--files,--pyFiles,--archives到第5步创建好的目录内,并且将这些参数的地址修改为临时目录下的真是地址。
    7. 以下就是添加相关参数。
    8. childMainClass如果是client模式,最终为我们所写的main函数,如果是cluster模式,最终为org.apache.spark.deploy.yarn.YarnClusterApplication
  • sparkconf内容变化

      加载系统配置,加载spark.开头的参数
    1. set命令行加载的参数
    2. 设置spark.yarn.keytab,spark.yarn.principal参数
    3. 加载SparkSubmit496-555行的参数,参数的具体值根据命令行设置
    4. 重复2操作,设置不存在的命令行参数??why??
    5. 将附件做文件路径解析后的路径set
  • childClasspath变化过程

      如果是client模式,把提交的jar和--jars的jar加入classpath
    1. 如果是yarn模式且deploy为cluster时,把提交的jar和--jars的jar加入classpath
  • childArgs变化

      如果是client模式,把提交命令行的args参数加入到childArgs
    1. 把所有的OptionAssigner加入childArgs
    2. 如果是yarn-cluster模式,加入--class参数,--jar参数,把提交命令行的args参数加入到childArgs
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: