您的位置:首页 > 运维架构 > Apache

spark-core_09: org.apache.spark.deploy.master.Master源码解析1

2018-03-26 21:19 579 查看
1,先是start-all.sh调用start-master.sh(查看spark-core_05:$SPARK_HOME/sbin/start-all.sh、start-master.sh脚本分析)。而start-master.sh 使用如下脚本调用spark-deamon.sh
spark-daemon.sh start org.apache.spark.deploy.master.Master 1luyl152 --port 7077 --webui-port 8080
2,而spark-deamon.sh使用如下脚本调用spark-class(查看spark-core_06:$SPARK_HOME/sbin/spark-daemon.sh脚本分析)
$SPARK_HOME/bin/spark-classorg.apache.spark.deploy.master.Master luyl152 --port 7077 --webui-port 8080
3,而spark-class会使用如下脚本去调用Master(查看spark-core_02: spark-submit、spark-class脚本分析)
java -cp spark_home/lib/spark-assembly-1.6.0-hadoop2.6.0.jarorg.apache.spark.launcher.Main  org.apache.spark.deploy.master.Masterluyl152 --port 7077 --webui-port 8080
4,对于launcher.Main是如何调用的,参看(spark-core_03: org.apache.spark.launcher.Main源码分析
一、Master在main,可以看出初始化了SparkConf、MasterArguments,RpcEnv/**
  * 如果debugger是Master在这个master的spark-env.sh文件中添加如下参数
    exportSPARK_MASTER_OPTS="-Xdebug-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
    如果debugger是worker:则在它节点上spark-env.sh:
    exportSPARK_WORKER_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
  */
private[deploy] object Masterextends Logging {
  val SYSTEM_NAME= "sparkMaster"
  val ENDPOINT_NAME= "Master"
  //spark-class传进来的参数是 --port 7077 --webui-port 8080
  def main(ar
4000
gStrings: Array[String]) {
    SignalLogger.register(log)
    val conf= new SparkConf
    //RpcEnv所需要的host:"本机主机名",port:7077,webUIport:8080
    //MasterArguments会解析参数并
    val args= new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }
1,在初始化new SparkConf时会去环境变量中,以spark.*开始的配制class SparkConf(loadDefaults: Boolean) extendsCloneable with Logging{
  import SparkConf._
  /** Create a SparkConf that loads defaults from systemproperties and the classpath */
  def this() = this(true)
  private val settings = new ConcurrentHashMap[String, String]()
  if (loadDefaults){
    // Load any spark.* system properties
    /**会加载所有以spark.*开始的环境变,如在spark-env.sh中配制的HA配制,就会初始化到SparkConf中
      * exportSPARK_DAEMON_JAVA_OPTS="-Dsun.io.serialization.extendedDebugInfo=true
  *-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=luyl153:2181,luyl154:2181,luyl155:2181
      * -Dspark.deploy.zookeeper.dir=/spark"
      * 当在ha的模式下:Master成员RECOVERY_MODE对应的值ZOOKEEPER
      */
    for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
      set(key, value)
    }
  }….
2,分析一下MasterArguments(argStrings,conf),这个类就是将传进来的参数及环境中默认配制参数,对应值解析到自己的成员中,方便Master使用/**
 * Command-line parser for the master.
  * args 成员: --port 7077 --webui-port 8080
 */
private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
  //当前的主机名
  var host = Utils.localHostName()
  var port = 7077
  var webUiPort = 8080
  var propertiesFile: String = null

 
// Check for settings in environment variables
  //取得脚本中export对应的环境变量值,在spark_env.sh脚本中环境变量值是SPARK_MASTER_IP
  if (System.getenv("SPARK_MASTER_HOST") != null) {
    host = System.getenv("SPARK_MASTER_HOST")
  }
  if (System.getenv("SPARK_MASTER_PORT") != null) {
    port = System.getenv("SPARK_MASTER_PORT").toInt
  }
  if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
    webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
  }
  //spark-class传进来的参数是 --port 7077 --webui-port 8080
  parse(args.toList)

  // This mutates the SparkConf, so all accesses to it must be made after this line
  //可变的SparkConf,所以需要先执行,并且spark-class 在启动master时并没有指定--properties-file,所以propertiesFile是null
  /** 从给定filePath加载默认的Spark属性。 如果未提供文件,使用默认文件/conf/spark-defaults.conf或--properties-file指定的属性文件。
并返回使用的属性文件的路径。
  *spark-defaults.conf文件的内容:
  * # apache.spark.master                     spark://master:7077
  # apache.spark.eventLog.enabled           true
  # apache.spark.eventLog.dir               hdfs://namenode:8021/directory
  # apache.spark.serializer                 org.apache.spark.serializer.KryoSerializer
  # apache.spark.driver.memory              5g
  # apache.spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
  */
  propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

  if (conf.contains("spark.master.ui.port")) {
    webUiPort = conf.get("spark.master.ui.port").toInt
  }
  //spark-class传进来的参数是 --port 7077 --webui-port 8080
  private def parse(args: List[String]): Unit = args match {
    case ("--ip" | "-i") :: value :: tail =>
      Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
      host = value
      parse(tail)

    case ("--host" | "-h") :: value :: tail =>
      Utils.checkHost(value, "Please use hostname " + value)
      host = value
      parse(tail)

    case ("--port" | "-p") :: IntParam(value) :: tail =>
      //IntParam("7077")解析成Some(Int值)
      port = value
      parse(tail)

    case "--webui-port" :: IntParam(value) :: tail =>
      //第二次匹配
      webUiPort = value
      parse(tail)

    case ("--properties-file") :: value :: tail =>
      propertiesFile = value
      parse(tail)

    case ("--help") :: tail =>
      printUsageAndExit(0)

    case Nil => {}

    case _ =>
      printUsageAndExit(1)
  }
===》很好奇,看一下Utils.loadDefaultSparkProperties(conf,propertiesFile)做了什么事def loadDefaultSparkProperties(conf: SparkConf, filePath:String = null): String = {
 //因为filePath没有值,所以加载getDefaultPropertiesFile(),从环境变量加载对应的值$SPARK_CONF_DIR,即spark_home/conf目录,然后使用spark-defaults.conf,做为spark的属性文件
  val path= Option(filePath).getOrElse(getDefaultPropertiesFile())
  Option(path).foreach { confFile=>
   //只有以spark.开始配制。从spark-defaults.conf配制文件,可以先把spark.serializer打开,提高系列化的效率
    getPropertiesFromFile(confFile).filter { case (k, v) =>
      k.startsWith("spark.")
    }.foreach { case (k, v) =>
      conf.setIfMissing(k, v)
      //此处也更新了环境变量中的相关属性
      sys.props.getOrElseUpdate(k, v)
    }
  }
  path
}
===》就是加载Properties文件使用的/** Loadproperties present in the given file. */
def getPropertiesFromFile(filename: String):Map[String, String] = {
  val file= new File(filename)
  require(file.exists(), s"Properties file $file does not exist")
  require(file.isFile(), s"Properties file $file is not a normal file")

  val inReader= new InputStreamReader(new FileInputStream(file), "UTF-8")
  try {
    val properties= new Properties()
    properties.load(inReader)
    //加载的属性文件不一定非得是key=value格式, key后面空格或多个空格或冒号或tab都可以形成kv的properties.当前spark-defaults.conf就按多个空格分开的
    //java集合变成Scala需要导入importscala.collection.JavaConverters._相关隐式转换
    properties.stringPropertyNames().asScala.map(
      k => (k, properties.getProperty(k).trim)).toMap
  } catch {
    case e:IOException =>
      throw new SparkException(s"Failedwhen loading Spark properties from $filename", e)
  } finally {
    inReader.close()
  }
}
二、加载SparkConf, MasterArguments完成了,开始启动Master的RpcEnv(后续章节分析RpcEnv、RpcEndpoint相关源码)/**
 * Start the Master and return a three tuple of:
 *   (1) The Master RpcEnv
 *   (2) The web UI bound port
 *   (3) The REST server bound port, if any
  *   将master注册到RpcEnv中,并向masterEndpoint发送case object BoundPortsRequest,并取得receiveAndReplyl
回复的元组(RpcEnv, Int, Option[Int])
 */
def startRpcEnvAndEndpoint(
    host: String,  //luyl152,即master的节点
    port: Int,   //7077
    webUiPort: Int,//8080
    conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
  //SecurityManager主要对权限、账号进行设置,也通过它来设置是jetty否启ssl协议(即https)
  //    val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory()
  //.map(new SslSocketConnector(_)).getOrElse(new SocketConnector)
  val securityMgr = new SecurityManager(conf)
  //SYSTEM_NAME=sparkMaster,luyl152, 7077,conf,使用NettyRpcEnvFactory,创建一个rpcEnv容器,这个容器的id是sparkMaster
  val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
  //向rpcEnv容器注入Master的rpcEndpoint,id的值是Master
  val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
    new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  //给master发送BoundPortsRequest,因为要回复信息会进入receiveAndReply去处理
  //BoundPortsResponsed成员: rpcEndpointPort是7077, webUi的port是8080,restPort是6066
  val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
  (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
下面分析一下Master这个RpcEndPoint是如何初始化Master的
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐