您的位置:首页 > 其它

spark源码一:SparkSubmit解析

2018-03-07 14:15 155 查看
1、从spark-submit开始

spark程序入口为org.apache.spark.deploy.SparkSubmit

找到main方法

//spark程序执行入口方法
def main(args: Array[String]): Unit = {

/*
*实例化SparkSubmitArguments
*该类主要用作封装spark-submit的提交参数
*该类在实例化的时候会调用几个方法
*/
val appArgs = new SparkSubmitArguments(args)
// 如果verbose为true
//将会打印这么一大串的配置信息 该参数默认为false

if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
//这里默认调用submit方法
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}


// SparkSubmitArguments类实例化的调用方法
// Set parameters from command line arguments
// 分解命令行参数
try {
parse(args.asJava)
} catch {
case e: IllegalArgumentException =>
SparkSubmit.printErrorAndExit(e.getMessage())
}
// Populate `sparkProperties` map from properties file
// 读取系统参数放进sparkProperties 的HashMap里
mergeDefaultSparkProperties()
// Remove keys that don't start with "spark." from `sparkProperties`.
// 从sparkProperties剔除不是spark.开头的参数
ignoreNonSparkProperties()
// Use `sparkProperties` map along with env vars to fill in any missing parameters
// 加载系统参数并赋值给对应定义的变量
loadEnvironmentArguments()

validateArguments()


// loadEnvironmentArguments方法主体
//此处可以看出一些常用的参数配置
private def loadEnvironmentArguments(): Unit = {
master = Option(master)
.orElse(sparkProperties.get("spark.master"))
.orElse(env.get("MASTER"))
.orNull
driverExtraClassPath = Option(driverExtraClassPath)
.orElse(sparkProperties.get("spark.driver.extraClassPath"))
.orNull
driverExtraJavaOptions = Option(driverExtraJavaOptions)
.orElse(sparkProperties.get("spark.driver.extraJavaOptions"))
.orNull
driverExtraLibraryPath = Option(driverExtraLibraryPath)
.orElse(sparkProperties.get("spark.driver.extraLibraryPath"))
.orNull
driverMemory = Option(driverMemory)
.orElse(sparkProperties.get("spark.driver.memory"))
.orElse(env.get("SPARK_DRIVER_MEMORY"))
.orNull
driverCores = Option(driverCores)
.orElse(sparkProperties.get("spark.driver.cores"))
.orNull
executorMemory = Option(executorMemory)
.orElse(sparkProperties.get("spark.executor.memory"))
.orElse(env.get("SPARK_EXECUTOR_MEMORY"))
.orNull
executorCores = Option(executorCores)
.orElse(sparkProperties.get("spark.executor.cores"))
.orElse(env.get("SPARK_EXECUTOR_CORES"))
.orNull
totalExecutorCores = Option(totalExecutorCores)
.orElse(sparkProperties.get("spark.cores.max"))
.orNull
name = Option(name).orElse(sparkProperties.get("spark.app.name")).orNull
jars = Option(jars).orElse(sparkProperties.get("spark.jars")).orNull
ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull
packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull
packagesExclusions = Option(packagesExclusions)
.orElse(sparkProperties.get("spark.jars.excludes")).orNull
deployMode = Option(deployMode).orElse(env.get("DEPLOY_MODE")).orNull
numExecutors = Option(numExecutors)
.getOrElse(sparkProperties.get("spark.executor.instances").orNull)
keytab = Option(keytab).orElse(sparkProperties.get("spark.yarn.keytab")).orNull
principal = Option(principal).orElse(sparkProperties.get("spark.yarn.principal")).orNull

// Try to set main class from JAR if no --class argument is given
if (mainClass == null && !isPython && !isR && primaryResource != null) {
val uri = new URI(primaryResource)
val uriScheme = uri.getScheme()

uriScheme match {
case "file" =>
try {
val jar = new JarFile(uri.getPath)
// Note that this might still return null if no main-class is set; we catch that later
mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class")
} catch {
case e: Exception =>
SparkSubmit.printErrorAndExit(s"Cannot load main class from JAR $primaryResource")
}
case _ =>
SparkSubmit.printErrorAndExit(
s"Cannot load main class from JAR $primaryResource with URI $uriScheme. " +
"Please specify a class through --class.")
}
}

// Global defaults. These should be keep to minimum to avoid confusing behavior.
master = Option(master).getOrElse("local[*]")

// In YARN mode, app name can be set via SPARK_YARN_APP_NAME (see SPARK-5222)
if (master.startsWith("yarn")) {
name = Option(name).orElse(env.get("SPARK_YARN_APP_NAME")).orNull
}

// Set name from main class if not given
name = Option(name).orElse(Option(mainClass)).orNull
if (name == null && primaryResource != null) {
name = Utils.stripDirectory(primaryResource)
}

// Action should be SUBMIT unless otherwise specified
//这里可以看出action的默认值为SUBMIT
action = Option(action).getOrElse(SUBMIT)
}


/**
* submit 方法 提交任务的主要方法
*/
private def submit(args: SparkSubmitArguments): Unit = {
//该方法为任务提交前的主要参数准备
//返回包含四个元素的Tuple
// (1)、进程参数
// (2)、classpath列表
// (3)、系统配置的HashMap
// (4)、任务的执行的入口类
// 主要说下这个 这个入口类是根据命令行输入cluster和deployMode 决定的
// 如果是yarn集群 调用 org.apache.spark.deploy.yarn.Client执行任务,同时如果是R/python等还会在childArgs添加一些其他的参数 添加我们执行的程序的入口类等等
//如果是standalone模式 调用org.apache.spark.deploy.Client 执行任务调度
// 由于篇幅,该方法不贴出
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

// 接下来调用该方法,下面有个if/else判断,反正不管怎么样,最后都会执行到该方法
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
// 反射执行childMainClass
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
}

// In standalone cluster mode, there are two submission gateways:
//   (1) The traditional Akka gateway using o.a.s.deploy.Client as a wrapper
//   (2) The new REST-based gateway introduced in Spark 1.3
// The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
// to use the legacy gateway if the master endpoint turns out to be not a REST server.
if (args.isStandaloneCluster && args.useRest) {
try {
// scalastyle:off println
printStream.println("Running Spark using the REST application submission protocol.")
// scalastyle:on println
doRunMain()
} catch {
// Fail over to use the legacy submission gateway
case e: SubmitRestConnectionException =>
printWarning(s"Master endpoint ${args.master} was not a REST server. " +
"Falling back to legacy submission gateway instead.")
args.useRest = false
submit(args)
}
// In all other modes, just run the main class as prepared
} else {
doRunMain()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 源码 1.6.3