spark-core_09: org.apache.spark.deploy.master.Master源码解析1
2018-03-26 21:19
579 查看
1,先是start-all.sh调用start-master.sh(查看spark-core_05:$SPARK_HOME/sbin/start-all.sh、start-master.sh脚本分析)。而start-master.sh 使用如下脚本调用spark-deamon.sh
spark-daemon.sh start org.apache.spark.deploy.master.Master 1luyl152 --port 7077 --webui-port 8080
2,而spark-deamon.sh使用如下脚本调用spark-class(查看spark-core_06:$SPARK_HOME/sbin/spark-daemon.sh脚本分析)
$SPARK_HOME/bin/spark-classorg.apache.spark.deploy.master.Master luyl152 --port 7077 --webui-port 8080
3,而spark-class会使用如下脚本去调用Master(查看spark-core_02: spark-submit、spark-class脚本分析)
java -cp spark_home/lib/spark-assembly-1.6.0-hadoop2.6.0.jarorg.apache.spark.launcher.Main org.apache.spark.deploy.master.Masterluyl152 --port 7077 --webui-port 8080
4,对于launcher.Main是如何调用的,参看(spark-core_03: org.apache.spark.launcher.Main源码分析)
一、Master在main,可以看出初始化了SparkConf、MasterArguments,RpcEnv/**
* 如果debugger是Master在这个master的spark-env.sh文件中添加如下参数
exportSPARK_MASTER_OPTS="-Xdebug-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
如果debugger是worker:则在它节点上spark-env.sh:
exportSPARK_WORKER_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
*/
private[deploy] object Masterextends Logging {
val SYSTEM_NAME= "sparkMaster"
val ENDPOINT_NAME= "Master"
//spark-class传进来的参数是 --port 7077 --webui-port 8080
def main(ar
4000
gStrings: Array[String]) {
SignalLogger.register(log)
val conf= new SparkConf
//RpcEnv所需要的host:"本机主机名",port:7077,webUIport:8080
//MasterArguments会解析参数并
val args= new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
1,在初始化new SparkConf时会去环境变量中,以spark.*开始的配制class SparkConf(loadDefaults: Boolean) extendsCloneable with Logging{
import SparkConf._
/** Create a SparkConf that loads defaults from systemproperties and the classpath */
def this() = this(true)
private val settings = new ConcurrentHashMap[String, String]()
if (loadDefaults){
// Load any spark.* system properties
/**会加载所有以spark.*开始的环境变,如在spark-env.sh中配制的HA配制,就会初始化到SparkConf中
* exportSPARK_DAEMON_JAVA_OPTS="-Dsun.io.serialization.extendedDebugInfo=true
*-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=luyl153:2181,luyl154:2181,luyl155:2181
* -Dspark.deploy.zookeeper.dir=/spark"
* 当在ha的模式下:Master成员RECOVERY_MODE对应的值ZOOKEEPER
*/
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
}
}….
2,分析一下MasterArguments(argStrings,conf),这个类就是将传进来的参数及环境中默认配制参数,对应值解析到自己的成员中,方便Master使用/**
* Command-line parser for the master.
* args 成员: --port 7077 --webui-port 8080
*/
private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
//当前的主机名
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null
// Check for settings in environment variables
//取得脚本中export对应的环境变量值,在spark_env.sh脚本中环境变量值是SPARK_MASTER_IP
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
if (System.getenv("SPARK_MASTER_PORT") != null) {
port = System.getenv("SPARK_MASTER_PORT").toInt
}
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
//spark-class传进来的参数是 --port 7077 --webui-port 8080
parse(args.toList)
// This mutates the SparkConf, so all accesses to it must be made after this line
//可变的SparkConf,所以需要先执行,并且spark-class 在启动master时并没有指定--properties-file,所以propertiesFile是null
/** 从给定filePath加载默认的Spark属性。 如果未提供文件,使用默认文件/conf/spark-defaults.conf或--properties-file指定的属性文件。
//因为filePath没有值,所以加载getDefaultPropertiesFile(),从环境变量加载对应的值$SPARK_CONF_DIR,即spark_home/conf目录,然后使用spark-defaults.conf,做为spark的属性文件
val path= Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile=>
//只有以spark.开始配制。从spark-defaults.conf配制文件,可以先把spark.serializer打开,提高系列化的效率
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
//此处也更新了环境变量中的相关属性
sys.props.getOrElseUpdate(k, v)
}
}
path
}
===》就是加载Properties文件使用的/** Loadproperties present in the given file. */
def getPropertiesFromFile(filename: String):Map[String, String] = {
val file= new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader= new InputStreamReader(new FileInputStream(file), "UTF-8")
try {
val properties= new Properties()
properties.load(inReader)
//加载的属性文件不一定非得是key=value格式, key后面空格或多个空格或冒号或tab都可以形成kv的properties.当前spark-defaults.conf就按多个空格分开的
//java集合变成Scala需要导入importscala.collection.JavaConverters._相关隐式转换
properties.stringPropertyNames().asScala.map(
k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e:IOException =>
throw new SparkException(s"Failedwhen loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
二、加载SparkConf, MasterArguments完成了,开始启动Master的RpcEnv(后续章节分析RpcEnv、RpcEndpoint相关源码)/**
* Start the Master and return a three tuple of:
* (1) The Master RpcEnv
* (2) The web UI bound port
* (3) The REST server bound port, if any
* 将master注册到RpcEnv中,并向masterEndpoint发送case object BoundPortsRequest,并取得receiveAndReplyl
spark-daemon.sh start org.apache.spark.deploy.master.Master 1luyl152 --port 7077 --webui-port 8080
2,而spark-deamon.sh使用如下脚本调用spark-class(查看spark-core_06:$SPARK_HOME/sbin/spark-daemon.sh脚本分析)
$SPARK_HOME/bin/spark-classorg.apache.spark.deploy.master.Master luyl152 --port 7077 --webui-port 8080
3,而spark-class会使用如下脚本去调用Master(查看spark-core_02: spark-submit、spark-class脚本分析)
java -cp spark_home/lib/spark-assembly-1.6.0-hadoop2.6.0.jarorg.apache.spark.launcher.Main org.apache.spark.deploy.master.Masterluyl152 --port 7077 --webui-port 8080
4,对于launcher.Main是如何调用的,参看(spark-core_03: org.apache.spark.launcher.Main源码分析)
一、Master在main,可以看出初始化了SparkConf、MasterArguments,RpcEnv/**
* 如果debugger是Master在这个master的spark-env.sh文件中添加如下参数
exportSPARK_MASTER_OPTS="-Xdebug-Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
如果debugger是worker:则在它节点上spark-env.sh:
exportSPARK_WORKER_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005"
*/
private[deploy] object Masterextends Logging {
val SYSTEM_NAME= "sparkMaster"
val ENDPOINT_NAME= "Master"
//spark-class传进来的参数是 --port 7077 --webui-port 8080
def main(ar
4000
gStrings: Array[String]) {
SignalLogger.register(log)
val conf= new SparkConf
//RpcEnv所需要的host:"本机主机名",port:7077,webUIport:8080
//MasterArguments会解析参数并
val args= new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
1,在初始化new SparkConf时会去环境变量中,以spark.*开始的配制class SparkConf(loadDefaults: Boolean) extendsCloneable with Logging{
import SparkConf._
/** Create a SparkConf that loads defaults from systemproperties and the classpath */
def this() = this(true)
private val settings = new ConcurrentHashMap[String, String]()
if (loadDefaults){
// Load any spark.* system properties
/**会加载所有以spark.*开始的环境变,如在spark-env.sh中配制的HA配制,就会初始化到SparkConf中
* exportSPARK_DAEMON_JAVA_OPTS="-Dsun.io.serialization.extendedDebugInfo=true
*-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=luyl153:2181,luyl154:2181,luyl155:2181
* -Dspark.deploy.zookeeper.dir=/spark"
* 当在ha的模式下:Master成员RECOVERY_MODE对应的值ZOOKEEPER
*/
for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) {
set(key, value)
}
}….
2,分析一下MasterArguments(argStrings,conf),这个类就是将传进来的参数及环境中默认配制参数,对应值解析到自己的成员中,方便Master使用/**
* Command-line parser for the master.
* args 成员: --port 7077 --webui-port 8080
*/
private[master] class MasterArguments(args: Array[String], conf: SparkConf) {
//当前的主机名
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null
// Check for settings in environment variables
//取得脚本中export对应的环境变量值,在spark_env.sh脚本中环境变量值是SPARK_MASTER_IP
if (System.getenv("SPARK_MASTER_HOST") != null) {
host = System.getenv("SPARK_MASTER_HOST")
}
if (System.getenv("SPARK_MASTER_PORT") != null) {
port = System.getenv("SPARK_MASTER_PORT").toInt
}
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
//spark-class传进来的参数是 --port 7077 --webui-port 8080
parse(args.toList)
// This mutates the SparkConf, so all accesses to it must be made after this line
//可变的SparkConf,所以需要先执行,并且spark-class 在启动master时并没有指定--properties-file,所以propertiesFile是null
/** 从给定filePath加载默认的Spark属性。 如果未提供文件,使用默认文件/conf/spark-defaults.conf或--properties-file指定的属性文件。
并返回使用的属性文件的路径。 *spark-defaults.conf文件的内容: * # apache.spark.master spark://master:7077 # apache.spark.eventLog.enabled true # apache.spark.eventLog.dir hdfs://namenode:8021/directory # apache.spark.serializer org.apache.spark.serializer.KryoSerializer # apache.spark.driver.memory 5g # apache.spark.executor.extraJavaOptions -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three" */ propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile) if (conf.contains("spark.master.ui.port")) { webUiPort = conf.get("spark.master.ui.port").toInt } //spark-class传进来的参数是 --port 7077 --webui-port 8080 private def parse(args: List[String]): Unit = args match { case ("--ip" | "-i") :: value :: tail => Utils.checkHost(value, "ip no longer supported, please use hostname " + value) host = value parse(tail) case ("--host" | "-h") :: value :: tail => Utils.checkHost(value, "Please use hostname " + value) host = value parse(tail) case ("--port" | "-p") :: IntParam(value) :: tail => //IntParam("7077")解析成Some(Int值) port = value parse(tail) case "--webui-port" :: IntParam(value) :: tail => //第二次匹配 webUiPort = value parse(tail) case ("--properties-file") :: value :: tail => propertiesFile = value parse(tail) case ("--help") :: tail => printUsageAndExit(0) case Nil => {} case _ => printUsageAndExit(1) }===》很好奇,看一下Utils.loadDefaultSparkProperties(conf,propertiesFile)做了什么事def loadDefaultSparkProperties(conf: SparkConf, filePath:String = null): String = {
//因为filePath没有值,所以加载getDefaultPropertiesFile(),从环境变量加载对应的值$SPARK_CONF_DIR,即spark_home/conf目录,然后使用spark-defaults.conf,做为spark的属性文件
val path= Option(filePath).getOrElse(getDefaultPropertiesFile())
Option(path).foreach { confFile=>
//只有以spark.开始配制。从spark-defaults.conf配制文件,可以先把spark.serializer打开,提高系列化的效率
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
//此处也更新了环境变量中的相关属性
sys.props.getOrElseUpdate(k, v)
}
}
path
}
===》就是加载Properties文件使用的/** Loadproperties present in the given file. */
def getPropertiesFromFile(filename: String):Map[String, String] = {
val file= new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inReader= new InputStreamReader(new FileInputStream(file), "UTF-8")
try {
val properties= new Properties()
properties.load(inReader)
//加载的属性文件不一定非得是key=value格式, key后面空格或多个空格或冒号或tab都可以形成kv的properties.当前spark-defaults.conf就按多个空格分开的
//java集合变成Scala需要导入importscala.collection.JavaConverters._相关隐式转换
properties.stringPropertyNames().asScala.map(
k => (k, properties.getProperty(k).trim)).toMap
} catch {
case e:IOException =>
throw new SparkException(s"Failedwhen loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}
二、加载SparkConf, MasterArguments完成了,开始启动Master的RpcEnv(后续章节分析RpcEnv、RpcEndpoint相关源码)/**
* Start the Master and return a three tuple of:
* (1) The Master RpcEnv
* (2) The web UI bound port
* (3) The REST server bound port, if any
* 将master注册到RpcEnv中,并向masterEndpoint发送case object BoundPortsRequest,并取得receiveAndReplyl
回复的元组(RpcEnv, Int, Option[Int]) */ def startRpcEnvAndEndpoint( host: String, //luyl152,即master的节点 port: Int, //7077 webUiPort: Int,//8080 conf: SparkConf): (RpcEnv, Int, Option[Int]) = { //SecurityManager主要对权限、账号进行设置,也通过它来设置是jetty否启ssl协议(即https) // val connector = securityManager.fileServerSSLOptions.createJettySslContextFactory() //.map(new SslSocketConnector(_)).getOrElse(new SocketConnector) val securityMgr = new SecurityManager(conf) //SYSTEM_NAME=sparkMaster,luyl152, 7077,conf,使用NettyRpcEnvFactory,创建一个rpcEnv容器,这个容器的id是sparkMaster val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) //向rpcEnv容器注入Master的rpcEndpoint,id的值是Master val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) //给master发送BoundPortsRequest,因为要回复信息会进入receiveAndReply去处理 //BoundPortsResponsed成员: rpcEndpointPort是7077, webUi的port是8080,restPort是6066 val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }下面分析一下Master这个RpcEndPoint是如何初始化Master的
相关文章推荐
- spark-core_11:org.apache.spark.deploy.master.Master源码解析3--MasterWebUI(MasterRpcEndPoint,8080)初始化web
- spark-core_12:org.apache.spark.deploy.master.Master源码解析4-- 如何清理超时的Woker及zk是如何选举
- spark-core_04: org.apache.spark.deploy.SparkSubmit源码分析:
- spark 1.6.0 core源码分析2 master启动流程
- Spark技术内幕:Client,Master和Worker 通信源码解析
- org.apache.spark.SparkException: A master URL must be set in your configuration
- SPARK搭建中WORKER不能启动(failed to launch org.apache.spark.deploy.worker.worker)
- Spark集群启动的时候出现failed to launch: nice -n 0 /soft/spark/bin/spark-class org.apache.spark.deploy.worker
- Spark技术内幕:Client,Master和Worker 通信源码解析
- spark worker 启动失败 failed to launch org.apache.spark.deploy.worker.Worker
- 源码-hadoop1.1.0-core-org.apache.hadoop
- HBase源码分析之org.apache.hadoop.hbase.master.handler包
- exception in thread main org.apache.spark.sparkexception:A master URL must be set in your
- spark源码解析 spark-core之rpc
- spark core源码分析2 master启动流程
- exception in thread main org.apache.spark.sparkexception:A master URL must be set in your
- spark SQL源码阅读002——sql.core包核心类——002执行SQL语法2次解析SQL词(analyse)
- spark core源码分析2 master启动流程
- spark master开发中org.apache.spark.serializer.JavaDeserializationStream错误解决
- 源码-hadoop1.1.0-core-org.apache.hadoop.classification