您的位置:首页 > 运维架构 > Shell

spark源码阅读之spark-shell

2016-06-25 20:48 531 查看
首先我们来看SPARK_HOME/bin/spark-shell脚本,内容:

function main() {
if $cygwin; then
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
"$FWDIR"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
fi
}


可以看到使用的是org.apache.spark.repl.Main为入口。
1.查阅repl.Main源代码,主要设计如下内容

1.1定义私有变量 conf,sparkContext,sparkSession,SparkILoop,hasErrors=false用来标记错误,代码如下:

object Main extends Logging {  val conf = new SparkConf()  var sparkContext: SparkContext = _  var sparkSession: SparkSession = _  // this is a public var because tests reset it.  var interp: SparkILoop = _
  private var hasErrors = false  private def scalaOptionError(msg: String): Unit = {    hasErrors = true    Console.err.println(msg)  }
  def main(args: Array[String]) {    doMain(args, new SparkILoop)  }}
其中main方法中调用了object Main的私有方法doMain,同时创建了新的SparkILoop对象,该类主要提供repl功能,如BufferReader提供Read,JPrintWriter提供Writer,而通过SparkILoop的父类ILoop提供命令的run,processLine方法实现命令循环计算等操作,SparkILoop和ILoop的源代码如下:

class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) extends ILoop(in0, out) {  def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out)
  def initializeSpark() {    intp.beQuietDuring {    //创建sparkSession,使用transient修饰符来标识一个成员变量在序列化子系统中应被忽略    //调用父类ILoop的processLine方法      processLine("""        @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {            org.apache.spark.repl.Main.sparkSession          } else {            org.apache.spark.repl.Main.createSparkSession()          }    //创建sc              @transient val sc = {          val _sc = spark.sparkContext          _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))          println("Spark context available as 'sc' " +            s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")          println("Spark session available as 'spark'.")          _sc        }        """)      processLine("import org.apache.spark.SparkContext._")      processLine("import spark.implicits._")      processLine("import spark.sql")      processLine("import org.apache.spark.sql.functions._")      replayCommandStack = Nil // remove above commands from session history.    }  }
  /** Add repl commands that needs to be blocked. e.g. reset */  private val blockedCommands = Set[String]()
  /** Standard commands */  lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] =    standardCommands.filter(cmd => !blockedCommands(cmd.name))
  /** Available commands */  override def commands: List[LoopCommand] = sparkStandardCommands}
object SparkILoop {
  /**   * Creates an interpreter loop with default settings and feeds   * the given code to it as input.   */  def run(code: String, sets: Settings = new Settings): String = {    import java.io.{ BufferedReader, StringReader, OutputStreamWriter }
    stringFromStream { ostream =>      Console.withOut(ostream) {        val input = new BufferedReader(new StringReader(code))        val output = new JPrintWriter(new OutputStreamWriter(ostream), true)        val repl = new SparkILoop(input, output)
        if (sets.classpath.isDefault) {          sets.classpath.value = sys.props("java.class.path")        }        //调用父类ILoop的process方法        repl process sets      }    }  }  def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString)}   
class ILoop(in0 : scala.Option[java.io.BufferedReader], protected val out : scala.tools.nsc.interpreter.JPrintWriter) extends scala.AnyRef with scala.tools.nsc.interpreter.LoopCommands {  def this(in0 : java.io.BufferedReader, out : scala.tools.nsc.interpreter.JPrintWriter) = { /* compiled code */ }
  def helpCommand(line : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }  val historyCommand : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].LoopCommand {    def defaultLines : scala.Int  } = { /* compiled code */ }
  def searchHistory(_cmdline : scala.Predef.String) : scala.Unit = { /* compiled code */ }
 //枚组,分组命令状态  object LineResults extends scala.Enumeration {    type LineResult = LineResults.Value    val EOF : LineResults.Value = { /* compiled code */ }    val ERR : LineResults.Value = { /* compiled code */ }    val OK : LineResults.Value = { /* compiled code */ }  }  def processLine(line : scala.Predef.String) : scala.Boolean = { /* compiled code */ }
  @scala.annotation.tailrec  final def loop() : ILoop.this.LineResults.LineResult = { /* compiled code */ }
  def reset() : scala.Unit = { /* compiled code */ }  def lineCommand(what : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }
  def command(line : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }  def pasteCommand(arg : scala.Predef.String) : ILoop.super[LoopCommands/*scala.tools.nsc.interpreter.LoopCommands*/].Result = { /* compiled code */ }
  def process(settings : scala.tools.nsc.Settings) : scala.Boolean = { /* compiled code */ }  @scala.deprecated("Use `process` instead")  def main(settings : scala.tools.nsc.Settings) : scala.Unit = { /* compiled code */ }}object ILoop extends scala.AnyRef {  implicit def loopToInterpreter(repl : scala.tools.nsc.interpreter.ILoop) : scala.tools.nsc.interpreter.IMain = { /* compiled code */ }  def runForTranscript(code : scala.Predef.String, settings : scala.tools.nsc.Settings, inSession : scala.Boolean = { /* compiled code */ }) : scala.Predef.String = { /* compiled code */ }  def run(code : scala.Predef.String, sets : scala.tools.nsc.Settings = { /* compiled code */ }) : scala.Predef.String = { /* compiled code */ }  def run(lines : scala.List[scala.Predef.String]) : scala.Predef.String = { /* compiled code */ }}可以看出ILoop是对shell命令的方法抽象,并没有进行实现,如scala> :help:cp <path>                 add a jar or directory to the classpath:help [command]            print this summary or command-specific help:history [num]             show the history (optional num is commands to show):h? <string>               search the history:imports [name name ...]   show import history, identifying sources of names:implicits [-v]            show the implicits in scope:javap <path|class>        disassemble a file or class name:load <path>               load and interpret a Scala file:paste                     enter paste mode: all input up to ctrl-D compiled together:quit                      exit the repl:replay                    reset execution and replay all previous commands:reset                     reset the repl to its initial state, forgetting all session entries:sh <command line>         run a shell command (result is implicitly => List[String]):silent                    disable/enable automatic printing of results:fallback                  disable/enable advanced repl changes, these fix some issues but may introduce others. This mode will be removed once these fixes stablize:type [-v] <expr>          display the type of an expression without evaluating it:warnings                  show the suppressed warnings from the most recent line which had any

这里使用了scala.tools.nsc.interpreter.LoopCommands特质中的LoopCommands进行命令封装,然后执行命令并返回结果,代码如下:

trait LoopCommands extends scala.AnyRef {  abstract class LoopCommand(val name : scala.Predef.String, val help : scala.Predef.String) extends scala.AnyRef with scala.Function1[scala.Predef.String, LoopCommands.this.Result] {    def usage : scala.Predef.String = { /* compiled code */ }    def usageMsg : scala.Predef.String = { /* compiled code */ }    def apply(line : scala.Predef.String) : LoopCommands.this.Result    //返回执行结果    def showUsage() : LoopCommands.this.Result = { /* compiled code */ }  }  object LoopCommand extends scala.AnyRef {    def nullary(name : scala.Predef.String, help : scala.Predef.String, f : scala.Function0[LoopCommands.this.Result]) : LoopCommands.this.LoopCommand = { /* compiled code */ }    //执行命令    def cmd(name : scala.Predef.String, usage : scala.Predef.String, help : scala.Predef.String, f : scala.Function1[scala.Predef.String, LoopCommands.this.Result]) : LoopCommands.this.LoopCommand = { /* compiled code */ }  }  //封装返回结果  case class Result(val keepRunning : scala.Boolean, val lineToRecord : scala.Option[scala.Predef.String]) extends scala.AnyRef with scala.Product with scala.Serializable {  }
  object Result extends scala.AnyRef with scala.Serializable {    val default : LoopCommands.this.Result = { /* compiled code */ }    def recording(line : scala.Predef.String) : LoopCommands.this.Result = { /* compiled code */ }  }}主要使用LoopCommands.this.LoopCommand和LoopCommands.this.Result伴生类和对象,对命令和结果进行封装

1.2在repl.Main中涉及两个主要方法,分别是doMain和createSparkSession,代码如下:

  private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = {    val settings = new GenericRunnerSettings(scalaOptionError)    settings.processArguments(interpArguments, true)  }
  def createSparkSession(): SparkSession = {    val builder = SparkSession.builder.config(conf) //获取sparkSession的实现方式并创建    if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") {      if (SparkSession.hiveClassesArePresent) {        sparkSession = builder.enableHiveSupport().getOrCreate()        logInfo("Created Spark session with Hive support")      } else {        builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")        sparkSession = builder.getOrCreate()        logInfo("Created Spark session")      }    } else {      sparkSession = builder.getOrCreate()      logInfo("Created Spark session")    } //通过sparkSession创建sparkContext    sparkContext = sparkSession.sparkContext    //Signaling中途退出    Signaling.cancelOnInterrupt(sparkContext)    sparkSession  }主要涉及scala.tools.nsc.GenericRunnerSettings,通过他的父类scala.tools.nsc.settings.MutableSettings提供的方法,如processArgumentString函数进行参数封装通过org.apache.spark.internal.config加载配置信息,代码如:  package object config {//Driver的一些信息,包括driver_class_path,driver_java_options,driver_library_path,driver_user_library_path_first,driver_memory  private[spark] val DRIVER_JAVA_OPTIONS =    ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
  private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")    .bytesConf(ByteUnit.MiB)    .createWithDefaultString("1g")//Executor的一些信息,executor_class_path,executor_java_options,executor_library_path,executor_user_library_path_first,executor_memory  private[spark] val EXECUTOR_JAVA_OPTIONS =    ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
  private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")    .bytesConf(ByteUnit.MiB)    .createWithDefaultString("1g")//CUP信息:spark.task.cpus,默认是1//动态的Executors信息,如spark.dynamicAllocation.minExecutors/initialExecutors/maxExecutors//spark.shuffle.service.enabled,默认值是false,设置为true//spark.executor.instances executor的执行接口???//sql实现  private[spark] val CATALOG_IMPLEMENTATION = ConfigBuilder("spark.sql.catalogImplementation")    .internal()    .stringConf    .checkValues(Set("hive", "in-memory"))    .createWithDefault("in-memory") }
在conf中调用了org.apache.spark.launcher.SparkLauncher,SparkLauncher中封装了application所需要的配置信息,并启动一个application,代码如:public class SparkLauncher {
  public static final String SPARK_MASTER = "spark.master";  public static final String DEPLOY_MODE = "spark.submit.deployMode";... static final Map<String, String> launcherConfig = new HashMap<>(); //其他配置信息加载方法: setConfig setJavaHome setSparkHome setPropertiesFile setAppName //设置application名称 setMaster //设置master setDeployMode //运行模式 setMainClass //运行类 addSparkArg //添加spark的参数 addAppArgs //application的参数 addJar/addFile/setVerbose startApplication //主要方法,启动application,同时该方法返回SparkAppHandle,SparkAppHandle会实例化单一的SparkContext,同时在SparkContext的整个生命周期,报告SparkContext的状态,比如SparkContext停止,不能检测到SparkContext的状态,同时运行的是一个子进程,则会SparkAppHandle#kill() 通过CHILD_PROCESS_LOGGER_NAME这只application的日志名称,如果该项没有设置,则会org.apache.spark.launcher.app 开始命名日志开头。

3.Signaling主要是用来中途退出,如:quit或者ctrl + c,代码如下:

private[repl] object Signaling extends Logging {  def cancelOnInterrupt(ctx: SparkContext): Unit = SignalUtils.register("INT") {    if (!ctx.statusTracker.getActiveJobIds().isEmpty) {      logWarning("Cancelling all active jobs, this can take a while. " +        "Press Ctrl+C again to exit now.")      ctx.cancelAllJobs()      true    } else {      false    }  }}
类之间的调用关系图如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark 源码