spark源码阅读之spark-shell
2016-06-25 20:48
531 查看
首先我们来看SPARK_HOME/bin/spark-shell脚本,内容:
可以看到使用的是org.apache.spark.repl.Main为入口。
1.查阅repl.Main源代码,主要设计如下内容
1.1定义私有变量 conf,sparkContext,sparkSession,SparkILoop,hasErrors=false用来标记错误,代码如下:
object Main extends Logging { val conf = new SparkConf() var sparkContext: SparkContext = _ var sparkSession: SparkSession = _ // this is a public var because tests reset it. var interp: SparkILoop = _
private var hasErrors = false private def scalaOptionError(msg: String): Unit = { hasErrors = true Console.err.println(msg) }
def main(args: Array[String]) { doMain(args, new SparkILoop) }}
其中main方法中调用了object Main的私有方法doMain,同时创建了新的SparkILoop对象,该类主要提供repl功能,如BufferReader提供Read,JPrintWriter提供Writer,而通过SparkILoop的父类ILoop提供命令的run,processLine方法实现命令循环计算等操作,SparkILoop和ILoop的源代码如下:
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) extends ILoop(in0, out) { def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def initializeSpark() { intp.beQuietDuring { //创建sparkSession,使用transient修饰符来标识一个成员变量在序列化子系统中应被忽略 //调用父类ILoop的processLine方法 processLine(""" @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { org.apache.spark.repl.Main.sparkSession } else { org.apache.spark.repl.Main.createSparkSession() } //创建sc @transient val sc = { val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") _sc } """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } }
/** Add repl commands that needs to be blocked. e.g. reset */ private val blockedCommands = Set[String]()
/** Standard commands */ lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] = standardCommands.filter(cmd => !blockedCommands(cmd.name))
/** Available commands */ override def commands: List[LoopCommand] = sparkStandardCommands}
object SparkILoop {
/** * Creates an interpreter loop with default settings and feeds * the given code to it as input. */ def run(code: String, sets: Settings = new Settings): String = { import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
stringFromStream { ostream => Console.withOut(ostream) { val input = new BufferedReader(new StringReader(code)) val output = new JPrintWriter(new OutputStreamWriter(ostream), true) val repl = new SparkILoop(input, output)
if (sets.classpath.isDefault) { sets.classpath.value = sys.props("java.class.path") } //调用父类ILoop的process方法 repl process sets } } } def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)}
class ILoop(in0 : scala.Option[java.io.BufferedReader], protected val out : scala.tools.nsc.interpreter.JPrintWriter) extends scala.AnyRef with scala.tools.nsc.interpreter.LoopCommands { def this(in0 : java.io.BufferedReader, out : scala.tools.nsc.interpreter.JPrintWriter) = { /* compiled code */ }
def helpCommand(line : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ } val historyCommand : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].LoopCommand { def defaultLines : scala.Int } = { /* compiled code */ }
def searchHistory(_cmdline : scala.Predef.String) : scala.Unit = { /* compiled code */ }
//枚组,分组命令状态 object LineResults extends scala.Enumeration { type LineResult = LineResults.Value val EOF : LineResults.Value = { /* compiled code */ } val ERR : LineResults.Value = { /* compiled code */ } val OK : LineResults.Value = { /* compiled code */ } } def processLine(line : scala.Predef.String) : scala.Boolean = { /* compiled code */ }
@scala.annotation.tailrec final def loop() : ILoop.this.LineResults.LineResult = { /* compiled code */ }
def reset() : scala.Unit = { /* compiled code */ } def lineCommand(what : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }
def command(line : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ } def pasteCommand(arg : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }
def process(settings : scala.tools.nsc.Settings) : scala.Boolean = { /* compiled code */ } @scala.deprecated("Use `process` instead") def main(settings : scala.tools.nsc.Settings) : scala.Unit = { /* compiled code */ }}object ILoop extends scala.AnyRef { implicit def loopToInterpreter(repl : scala.tools.nsc.interpreter.ILoop) : scala.tools.nsc.interpreter.IMain = { /* compiled code */ } def runForTranscript(code : scala.Predef.String, settings : scala.tools.nsc.Settings, inSession : scala.Boolean = { /* compiled code */ }) : scala.Predef.String = { /* compiled code */ } def run(code : scala.Predef.String, sets : scala.tools.nsc.Settings = { /* compiled code */ }) : scala.Predef.String = { /* compiled code */ } def run(lines : scala.List[scala.Predef.String]) : scala.Predef.String = { /* compiled code */ }}可以看出ILoop是对shell命令的方法抽象,并没有进行实现,如scala> :help:cp <path> add a jar or directory to the classpath:help [command] print this summary or command-specific help:history [num] show the history (optional num is commands to show):h? <string> search the history:imports [name name ...] show import history, identifying sources of names:implicits [-v] show the implicits in scope:javap <path|class> disassemble a file or class name:load <path> load and interpret a Scala file:paste enter paste mode: all input up to ctrl-D compiled together:quit exit the repl:replay reset execution and replay all previous commands:reset reset the repl to its initial state, forgetting all session entries:sh <command line> run a shell command (result is implicitly => List[String]):silent disable/enable automatic printing of results:fallback disable/enable advanced repl changes, these fix some issues but may introduce others. This mode will be removed once these fixes stablize:type [-v] <expr> display the type of an expression without evaluating it:warnings show the suppressed warnings from the most recent line which had any
这里使用了scala.tools.nsc.interpreter.LoopCommands特质中的LoopCommands进行命令封装,然后执行命令并返回结果,代码如下:
trait LoopCommands extends scala.AnyRef { abstract class LoopCommand(val name : scala.Predef.String, val help : scala.Predef.String) extends scala.AnyRef with scala.Function1[scala.Predef.String, LoopCommands.this.Result] { def usage : scala.Predef.String = { /* compiled code */ } def usageMsg : scala.Predef.String = { /* compiled code */ } def apply(line : scala.Predef.String) : LoopCommands.this.Result //返回执行结果 def showUsage() : LoopCommands.this.Result = { /* compiled code */ } } object LoopCommand extends scala.AnyRef { def nullary(name : scala.Predef.String, help : scala.Predef.String, f : scala.Function0[LoopCommands.this.Result]) : LoopCommands.this.LoopCommand = { /* compiled code */ } //执行命令 def cmd(name : scala.Predef.String, usage : scala.Predef.String, help : scala.Predef.String, f : scala.Function1[scala.Predef.String, LoopCommands.this.Result]) : LoopCommands.this.LoopCommand = { /* compiled code */ } } //封装返回结果 case class Result(val keepRunning : scala.Boolean, val lineToRecord : scala.Option[scala.Predef.String]) extends scala.AnyRef with scala.Product with scala.Serializable { }
object Result extends scala.AnyRef with scala.Serializable { val default : LoopCommands.this.Result = { /* compiled code */ } def recording(line : scala.Predef.String) : LoopCommands.this.Result = { /* compiled code */ } }}主要使用LoopCommands.this.LoopCommand和LoopCommands.this.Result伴生类和对象,对命令和结果进行封装
1.2在repl.Main中涉及两个主要方法,分别是doMain和createSparkSession,代码如下:
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { val settings = new GenericRunnerSettings(scalaOptionError) settings.processArguments(interpArguments, true) }
def createSparkSession(): SparkSession = { val builder = SparkSession.builder.config(conf) //获取sparkSession的实现方式并创建 if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { if (SparkSession.hiveClassesArePresent) { sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") sparkSession = builder.getOrCreate() logInfo("Created Spark session") } } else { sparkSession = builder.getOrCreate() logInfo("Created Spark session") } //通过sparkSession创建sparkContext sparkContext = sparkSession.sparkContext //Signaling中途退出 Signaling.cancelOnInterrupt(sparkContext) sparkSession }主要涉及scala.tools.nsc.GenericRunnerSettings,通过他的父类scala.tools.nsc.settings.MutableSettings提供的方法,如processArgumentString函数进行参数封装通过org.apache.spark.internal.config加载配置信息,代码如: package object config {//Driver的一些信息,包括driver_class_path,driver_java_options,driver_library_path,driver_user_library_path_first,driver_memory private[spark] val DRIVER_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g")//Executor的一些信息,executor_class_path,executor_java_options,executor_library_path,executor_user_library_path_first,executor_memory private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g")//CUP信息:spark.task.cpus,默认是1//动态的Executors信息,如spark.dynamicAllocation.minExecutors/initialExecutors/maxExecutors//spark.shuffle.service.enabled,默认值是false,设置为true//spark.executor.instances executor的执行接口???//sql实现 private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation") .internal() .stringConf .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") }
在conf中调用了org.apache.spark.launcher.SparkLauncher,SparkLauncher中封装了application所需要的配置信息,并启动一个application,代码如:public class SparkLauncher {
public static final String SPARK_MASTER = "spark.master"; public static final String DEPLOY_MODE = "spark.submit.deployMode";... static final Map<String, String> launcherConfig = new HashMap<>(); //其他配置信息加载方法: setConfig setJavaHome setSparkHome setPropertiesFile setAppName //设置application名称 setMaster //设置master setDeployMode //运行模式 setMainClass //运行类 addSparkArg //添加spark的参数 addAppArgs //application的参数 addJar/addFile/setVerbose startApplication //主要方法,启动application,同时该方法返回SparkAppHandle,SparkAppHandle会实例化单一的SparkContext,同时在SparkContext的整个生命周期,报告SparkContext的状态,比如SparkContext停止,不能检测到SparkContext的状态,同时运行的是一个子进程,则会SparkAppHandle#kill() 通过CHILD_PROCESS_LOGGER_NAME这只application的日志名称,如果该项没有设置,则会org.apache.spark.launcher.app 开始命名日志开头。
3.Signaling主要是用来中途退出,如:quit或者ctrl + c,代码如下:
private[repl] object Signaling extends Logging { def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") { if (!ctx.statusTracker.getActiveJobIds().isEmpty) { logWarning("Cancelling all active jobs, this can take a while. " + "Press Ctrl+C again to exit now.") ctx.cancelAllJobs() true } else { false } }}
类之间的调用关系图如下:
function main() { if $cygwin; then stty -icanon min 1 -echo > /dev/null 2>&1 export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix" "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS "$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@" fi }
可以看到使用的是org.apache.spark.repl.Main为入口。
1.查阅repl.Main源代码,主要设计如下内容
1.1定义私有变量 conf,sparkContext,sparkSession,SparkILoop,hasErrors=false用来标记错误,代码如下:
object Main extends Logging { val conf = new SparkConf() var sparkContext: SparkContext = _ var sparkSession: SparkSession = _ // this is a public var because tests reset it. var interp: SparkILoop = _
private var hasErrors = false private def scalaOptionError(msg: String): Unit = { hasErrors = true Console.err.println(msg) }
def main(args: Array[String]) { doMain(args, new SparkILoop) }}
其中main方法中调用了object Main的私有方法doMain,同时创建了新的SparkILoop对象,该类主要提供repl功能,如BufferReader提供Read,JPrintWriter提供Writer,而通过SparkILoop的父类ILoop提供命令的run,processLine方法实现命令循环计算等操作,SparkILoop和ILoop的源代码如下:
class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) extends ILoop(in0, out) { def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
def initializeSpark() { intp.beQuietDuring { //创建sparkSession,使用transient修饰符来标识一个成员变量在序列化子系统中应被忽略 //调用父类ILoop的processLine方法 processLine(""" @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { org.apache.spark.repl.Main.sparkSession } else { org.apache.spark.repl.Main.createSparkSession() } //创建sc @transient val sc = { val _sc = spark.sparkContext _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}")) println("Spark context available as 'sc' " + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") println("Spark session available as 'spark'.") _sc } """) processLine("import org.apache.spark.SparkContext._") processLine("import spark.implicits._") processLine("import spark.sql") processLine("import org.apache.spark.sql.functions._") replayCommandStack = Nil // remove above commands from session history. } }
/** Add repl commands that needs to be blocked. e.g. reset */ private val blockedCommands = Set[String]()
/** Standard commands */ lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] = standardCommands.filter(cmd => !blockedCommands(cmd.name))
/** Available commands */ override def commands: List[LoopCommand] = sparkStandardCommands}
object SparkILoop {
/** * Creates an interpreter loop with default settings and feeds * the given code to it as input. */ def run(code: String, sets: Settings = new Settings): String = { import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
stringFromStream { ostream => Console.withOut(ostream) { val input = new BufferedReader(new StringReader(code)) val output = new JPrintWriter(new OutputStreamWriter(ostream), true) val repl = new SparkILoop(input, output)
if (sets.classpath.isDefault) { sets.classpath.value = sys.props("java.class.path") } //调用父类ILoop的process方法 repl process sets } } } def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)}
class ILoop(in0 : scala.Option[java.io.BufferedReader], protected val out : scala.tools.nsc.interpreter.JPrintWriter) extends scala.AnyRef with scala.tools.nsc.interpreter.LoopCommands { def this(in0 : java.io.BufferedReader, out : scala.tools.nsc.interpreter.JPrintWriter) = { /* compiled code */ }
def helpCommand(line : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ } val historyCommand : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].LoopCommand { def defaultLines : scala.Int } = { /* compiled code */ }
def searchHistory(_cmdline : scala.Predef.String) : scala.Unit = { /* compiled code */ }
//枚组,分组命令状态 object LineResults extends scala.Enumeration { type LineResult = LineResults.Value val EOF : LineResults.Value = { /* compiled code */ } val ERR : LineResults.Value = { /* compiled code */ } val OK : LineResults.Value = { /* compiled code */ } } def processLine(line : scala.Predef.String) : scala.Boolean = { /* compiled code */ }
@scala.annotation.tailrec final def loop() : ILoop.this.LineResults.LineResult = { /* compiled code */ }
def reset() : scala.Unit = { /* compiled code */ } def lineCommand(what : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }
def command(line : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ } def pasteCommand(arg : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }
def process(settings : scala.tools.nsc.Settings) : scala.Boolean = { /* compiled code */ } @scala.deprecated("Use `process` instead") def main(settings : scala.tools.nsc.Settings) : scala.Unit = { /* compiled code */ }}object ILoop extends scala.AnyRef { implicit def loopToInterpreter(repl : scala.tools.nsc.interpreter.ILoop) : scala.tools.nsc.interpreter.IMain = { /* compiled code */ } def runForTranscript(code : scala.Predef.String, settings : scala.tools.nsc.Settings, inSession : scala.Boolean = { /* compiled code */ }) : scala.Predef.String = { /* compiled code */ } def run(code : scala.Predef.String, sets : scala.tools.nsc.Settings = { /* compiled code */ }) : scala.Predef.String = { /* compiled code */ } def run(lines : scala.List[scala.Predef.String]) : scala.Predef.String = { /* compiled code */ }}可以看出ILoop是对shell命令的方法抽象,并没有进行实现,如scala> :help:cp <path> add a jar or directory to the classpath:help [command] print this summary or command-specific help:history [num] show the history (optional num is commands to show):h? <string> search the history:imports [name name ...] show import history, identifying sources of names:implicits [-v] show the implicits in scope:javap <path|class> disassemble a file or class name:load <path> load and interpret a Scala file:paste enter paste mode: all input up to ctrl-D compiled together:quit exit the repl:replay reset execution and replay all previous commands:reset reset the repl to its initial state, forgetting all session entries:sh <command line> run a shell command (result is implicitly => List[String]):silent disable/enable automatic printing of results:fallback disable/enable advanced repl changes, these fix some issues but may introduce others. This mode will be removed once these fixes stablize:type [-v] <expr> display the type of an expression without evaluating it:warnings show the suppressed warnings from the most recent line which had any
这里使用了scala.tools.nsc.interpreter.LoopCommands特质中的LoopCommands进行命令封装,然后执行命令并返回结果,代码如下:
trait LoopCommands extends scala.AnyRef { abstract class LoopCommand(val name : scala.Predef.String, val help : scala.Predef.String) extends scala.AnyRef with scala.Function1[scala.Predef.String, LoopCommands.this.Result] { def usage : scala.Predef.String = { /* compiled code */ } def usageMsg : scala.Predef.String = { /* compiled code */ } def apply(line : scala.Predef.String) : LoopCommands.this.Result //返回执行结果 def showUsage() : LoopCommands.this.Result = { /* compiled code */ } } object LoopCommand extends scala.AnyRef { def nullary(name : scala.Predef.String, help : scala.Predef.String, f : scala.Function0[LoopCommands.this.Result]) : LoopCommands.this.LoopCommand = { /* compiled code */ } //执行命令 def cmd(name : scala.Predef.String, usage : scala.Predef.String, help : scala.Predef.String, f : scala.Function1[scala.Predef.String, LoopCommands.this.Result]) : LoopCommands.this.LoopCommand = { /* compiled code */ } } //封装返回结果 case class Result(val keepRunning : scala.Boolean, val lineToRecord : scala.Option[scala.Predef.String]) extends scala.AnyRef with scala.Product with scala.Serializable { }
object Result extends scala.AnyRef with scala.Serializable { val default : LoopCommands.this.Result = { /* compiled code */ } def recording(line : scala.Predef.String) : LoopCommands.this.Result = { /* compiled code */ } }}主要使用LoopCommands.this.LoopCommand和LoopCommands.this.Result伴生类和对象,对命令和结果进行封装
1.2在repl.Main中涉及两个主要方法,分别是doMain和createSparkSession,代码如下:
private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { val settings = new GenericRunnerSettings(scalaOptionError) settings.processArguments(interpArguments, true) }
def createSparkSession(): SparkSession = { val builder = SparkSession.builder.config(conf) //获取sparkSession的实现方式并创建 if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") { if (SparkSession.hiveClassesArePresent) { sparkSession = builder.enableHiveSupport().getOrCreate() logInfo("Created Spark session with Hive support") } else { builder.config(CATALOG_IMPLEMENTATION.key, "in-memory") sparkSession = builder.getOrCreate() logInfo("Created Spark session") } } else { sparkSession = builder.getOrCreate() logInfo("Created Spark session") } //通过sparkSession创建sparkContext sparkContext = sparkSession.sparkContext //Signaling中途退出 Signaling.cancelOnInterrupt(sparkContext) sparkSession }主要涉及scala.tools.nsc.GenericRunnerSettings,通过他的父类scala.tools.nsc.settings.MutableSettings提供的方法,如processArgumentString函数进行参数封装通过org.apache.spark.internal.config加载配置信息,代码如: package object config {//Driver的一些信息,包括driver_class_path,driver_java_options,driver_library_path,driver_user_library_path_first,driver_memory private[spark] val DRIVER_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory") .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g")//Executor的一些信息,executor_class_path,executor_java_options,executor_library_path,executor_user_library_path_first,executor_memory private[spark] val EXECUTOR_JAVA_OPTIONS = ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory") .bytesConf(ByteUnit.MiB) .createWithDefaultString("1g")//CUP信息:spark.task.cpus,默认是1//动态的Executors信息,如spark.dynamicAllocation.minExecutors/initialExecutors/maxExecutors//spark.shuffle.service.enabled,默认值是false,设置为true//spark.executor.instances executor的执行接口???//sql实现 private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation") .internal() .stringConf .checkValues(Set("hive", "in-memory")) .createWithDefault("in-memory") }
在conf中调用了org.apache.spark.launcher.SparkLauncher,SparkLauncher中封装了application所需要的配置信息,并启动一个application,代码如:public class SparkLauncher {
public static final String SPARK_MASTER = "spark.master"; public static final String DEPLOY_MODE = "spark.submit.deployMode";... static final Map<String, String> launcherConfig = new HashMap<>(); //其他配置信息加载方法: setConfig setJavaHome setSparkHome setPropertiesFile setAppName //设置application名称 setMaster //设置master setDeployMode //运行模式 setMainClass //运行类 addSparkArg //添加spark的参数 addAppArgs //application的参数 addJar/addFile/setVerbose startApplication //主要方法,启动application,同时该方法返回SparkAppHandle,SparkAppHandle会实例化单一的SparkContext,同时在SparkContext的整个生命周期,报告SparkContext的状态,比如SparkContext停止,不能检测到SparkContext的状态,同时运行的是一个子进程,则会SparkAppHandle#kill() 通过CHILD_PROCESS_LOGGER_NAME这只application的日志名称,如果该项没有设置,则会org.apache.spark.launcher.app 开始命名日志开头。
3.Signaling主要是用来中途退出,如:quit或者ctrl + c,代码如下:
private[repl] object Signaling extends Logging { def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") { if (!ctx.statusTracker.getActiveJobIds().isEmpty) { logWarning("Cancelling all active jobs, this can take a while. " + "Press Ctrl+C again to exit now.") ctx.cancelAllJobs() true } else { false } }}
类之间的调用关系图如下:
相关文章推荐
- 从源码安装Mysql/Percona 5.5
- Spark RDD API详解(一) Map和Reduce
- 使用spark和spark mllib进行股票预测
- Spark随谈——开发指南(译)
- Spark,一种快速数据分析替代方案
- 浅析Ruby的源代码布局及其编程风格
- asp.net 抓取网页源码三种实现方法
- JS小游戏之仙剑翻牌源码详解
- JS小游戏之宇宙战机源码详解
- jQuery源码分析之jQuery中的循环技巧详解
- 本人自用的global.js库源码分享
- java中原码、反码与补码的问题分析
- ASP.NET使用HttpWebRequest读取远程网页源代码
- PHP网页游戏学习之Xnova(ogame)源码解读(六)
- C#获取网页HTML源码实例
- PHP网页游戏学习之Xnova(ogame)源码解读(八)
- PHP网页游戏学习之Xnova(ogame)源码解读(四)
- 深入理解PHP之源码目录结构与功能说明
- JS小游戏之极速快跑源码详解
- JS小游戏之象棋暗棋源码详解