spark源码一:SparkSubmit解析
2018-03-07 14:15
155 查看
1、从spark-submit开始
spark程序入口为org.apache.spark.deploy.SparkSubmit
找到main方法
spark程序入口为org.apache.spark.deploy.SparkSubmit
找到main方法
//spark程序执行入口方法 def main(args: Array[String]): Unit = { /* *实例化SparkSubmitArguments *该类主要用作封装spark-submit的提交参数 *该类在实例化的时候会调用几个方法 */ val appArgs = new SparkSubmitArguments(args) // 如果verbose为true //将会打印这么一大串的配置信息 该参数默认为false if (appArgs.verbose) { // scalastyle:off println printStream.println(appArgs) // scalastyle:on println } appArgs.action match { //这里默认调用submit方法 case SparkSubmitAction.SUBMIT => submit(appArgs) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) } }
// SparkSubmitArguments类实例化的调用方法 // Set parameters from command line arguments // 分解命令行参数 try { parse(args.asJava) } catch { case e: IllegalArgumentException => SparkSubmit.printErrorAndExit(e.getMessage()) } // Populate `sparkProperties` map from properties file // 读取系统参数放进sparkProperties 的HashMap里 mergeDefaultSparkProperties() // Remove keys that don't start with "spark." from `sparkProperties`. // 从sparkProperties剔除不是spark.开头的参数 ignoreNonSparkProperties() // Use `sparkProperties` map along with env vars to fill in any missing parameters // 加载系统参数并赋值给对应定义的变量 loadEnvironmentArguments() validateArguments()
// loadEnvironmentArguments方法主体 //此处可以看出一些常用的参数配置 private def loadEnvironmentArguments(): Unit = { master = Option(master) .orElse(sparkProperties.get("spark.master")) .orElse(env.get("MASTER")) .orNull driverExtraClassPath = Option(driverExtraClassPath) .orElse(sparkProperties.get("spark.driver.extraClassPath")) .orNull driverExtraJavaOptions = Option(driverExtraJavaOptions) .orElse(sparkProperties.get("spark.driver.extraJavaOptions")) .orNull driverExtraLibraryPath = Option(driverExtraLibraryPath) .orElse(sparkProperties.get("spark.driver.extraLibraryPath")) .orNull driverMemory = Option(driverMemory) .orElse(sparkProperties.get("spark.driver.memory")) .orElse(env.get("SPARK_DRIVER_MEMORY")) .orNull driverCores = Option(driverCores) .orElse(sparkProperties.get("spark.driver.cores")) .orNull executorMemory = Option(executorMemory) .orElse(sparkProperties.get("spark.executor.memory")) .orElse(env.get("SPARK_EXECUTOR_MEMORY")) .orNull executorCores = Option(executorCores) .orElse(sparkProperties.get("spark.executor.cores")) .orElse(env.get("SPARK_EXECUTOR_CORES")) .orNull totalExecutorCores = Option(totalExecutorCores) .orElse(sparkProperties.get("spark.cores.max")) .orNull name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull packagesExclusions = Option(packagesExclusions) .orElse(sparkProperties.get("spark.jars.excludes")).orNull deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull numExecutors = Option(numExecutors) .getOrElse(sparkProperties.get("spark.executor.instances").orNull) keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && !isR && primaryResource != null) { val uri = new URI(primaryResource) val uriScheme = uri.getScheme() uriScheme match { case "file" => try { val jar = new JarFile(uri.getPath) // Note that this might still return null if no main-class is set; we catch that later mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") } catch { case e: Exception => SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource") } case _ => SparkSubmit.printErrorAndExit( s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " + "Please specify a class through --class.") } } // Global defaults. These should be keep to minimum to avoid confusing behavior. master = Option(master).getOrElse("local[*]") // In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222) if (master.startsWith("yarn")) { name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull } // Set name from main class if not given name = Option(name).orElse(Option(mainClass)).orNull if (name == null && primaryResource != null) { name = Utils.stripDirectory(primaryResource) } // Action should be SUBMIT unless otherwise specified //这里可以看出action的默认值为SUBMIT action = Option(action).getOrElse(SUBMIT) }
/** * submit 方法 提交任务的主要方法 */ private def submit(args: SparkSubmitArguments): Unit = { //该方法为任务提交前的主要参数准备 //返回包含四个元素的Tuple // (1)、进程参数 // (2)、classpath列表 // (3)、系统配置的HashMap // (4)、任务的执行的入口类 // 主要说下这个 这个入口类是根据命令行输入cluster和deployMode 决定的 // 如果是yarn集群 调用 org.apache.spark.deploy.yarn.Client执行任务,同时如果是R/python等还会在childArgs添加一些其他的参数 添加我们执行的程序的入口类等等 //如果是standalone模式 调用org.apache.spark.deploy.Client 执行任务调度 // 由于篇幅,该方法不贴出 val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args) // 接下来调用该方法,下面有个if/else判断,反正不管怎么样,最后都会执行到该方法 def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { // 反射执行childMainClass runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } }) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { // scalastyle:off println printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") // scalastyle:on println exitFn(1) } else { throw e } } } else { runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose) } } // In standalone cluster mode, there are two submission gateways: // (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { // scalastyle:off println printStream.println("Running Spark using the REST application submission protocol.") // scalastyle:on println doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => printWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args) } // In all other modes, just run the main class as prepared } else { doRunMain() } }
相关文章推荐
- Spark源码解析之任务提交(spark-submit)篇
- spark SQL源码阅读002——sql.core包核心类——002执行SQL语法2次解析SQL词(analyse)
- Spark源码解析(一)
- Spark技术内幕:Client,Master和Worker 通信源码解析
- Spark2.0 源码解析系列目录
- SparkContext初始化图解与源码解析
- 第45讲:Scala中Context Bounds代码实战及其在Spark中的应用源码解析
- TensorFlowOnSpark 源码解析
- Spark源码解析之textFile
- Spark 源码解析之SparkContext家族(一)
- 46.ClassTag 、Manifest、ClassManifest、TypeTag代码实战及其在Spark中的应用源码解析
- Spark 源码解析之SparkContext都做了些什么
- Spark 源码解析之SparkContext都做了些什么
- Scala深入浅出进阶经典 第59讲:Scala中隐式转换初体验实战详解以及隐式转换在Spark中的应用源码解析
- Spark源码解析之小分区合并
- Spark源码解析之小分区合并
- Spark 源码阅读(5)——Spark-submit任务提交流程
- Scala 深入浅出实战经典 第60讲:Scala中隐式参数实战详解以及在Spark中的应用源码解析
- Spark 源码解析:TaskScheduler的任务提交和task最佳位置算法
- Scala中Context Bounds代码实战及其在Spark中的应用源码解析之Scala学习笔记-36