您的位置:首页 > 其它

【原】Spark中Client源码分析(二)

2016-03-23 20:49 357 查看
继续前一篇的内容。前一篇内容为:


Spark中Client源码分析(一)http://www.cnblogs.com/yourarebest/p/5313006.html


DriverClient中的代码比较简单,它只有一个main函数,同时,和AppClient一样,它也有一个ClientEndpoint,只是两者的用途不一样。

1.Client

Client中唯一的main方法如下:

def main(args: Array[String]) {

if (!sys.props.contains("SPARK_SUBMIT")) {

println("WARNING: This client is deprecated and will be removed in a future version of Spark")

println("Use ./bin/spark-submit with "--master spark://host:port"")

}

val conf = new SparkConf()

val driverArgs = new ClientArguments(args)

if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {

conf.set("spark.akka.logLifecycleEvents", "true")

}

conf.set("spark.rpc.askTimeout", "10")

conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))

Logger.getRootLogger.setLevel(driverArgs.logLevel)

//创建一个driverClient的Rpc环境,并将得到Master和client的远程引用

val rpcEnv =

RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))

val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).

map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))

//clientpoint

rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))

//启动rpc环境

rpcEnv.awaitTermination()

}


2.ClientEndpoint

ClientEndPoint可以看作给Driver传递消息的代理

属性简单,直接略过。

(1)构造函数为ClientEndPoint主构造函数

(2)onstart方法如下,

override def onStart(): Unit = {

driverArgs.cmd match {

case "launch" =>

//driver包装类,使得Worker和Driver的Rpc环境一样,做到共进退

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

//driver类路径

val classPathConf = "spark.driver.extraClassPath"

val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>

cp.split(java.io.File.pathSeparator)

}

//driver库路径

val libraryPathConf = "spark.driver.extraLibraryPath"

val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>

cp.split(java.io.File.pathSeparator)

}

//driver Jvm参数

val extraJavaOptsConf = "spark.driver.extraJavaOptions"

val extraJavaOpts = sys.props.get(extraJavaOptsConf)

.map(Utils.splitCommandString).getOrElse(Seq.empty)

//将所有的在SparkConf中设置的属性赋值给java options的序列

val sparkJavaOpts = Utils.sparkJavaOpts(conf)

//所有的javaOpts

val javaOpts = sparkJavaOpts ++ extraJavaOpts

val command = new Command(mainClass,

Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,

sys.env, classPathEntries, libraryPathEntries, javaOpts)

//将以上所有的信息封装在DriverDescription中

val driverDescription = new DriverDescription(

driverArgs.jarUrl,

driverArgs.memory,

driverArgs.cores,

driverArgs.supervise,

command)

//异步请求给master发送Driver的信息

ayncSendToMasterAndForwardReplySubmitDriverResponse

case "kill" =>

val driverId = driverArgs.driverId

ayncSendToMasterAndForwardReplyKillDriverResponse

}

}


(3)onstop方法简单,略过。

(4)receive方法如下,

override def receive: PartialFunction[Any, Unit] = {

//收到master的响应回来的Driver信息,因为master是管家,Client是老板

case SubmitDriverResponse(master, success, driverId, message) =>

logInfo(message)

if (success) {

//将当前的activeMasterEndpoint设置为响应消息的master

activeMasterEndpoint = master

//找到driver的信息然后退出JVM

pollAndReportStatus(driverId.get)

} else if (!Utils.responseFromBackup(message)) {

System.exit(-1)

}

case KillDriverResponse(master, driverId, success, message) =>

logInfo(message)

if (success) {

activeMasterEndpoint = master

pollAndReportStatus(driverId),详见下①

} else if (!Utils.responseFromBackup(message)) {

System.exit(-1)

}

}


①pollAndReportStatus方法如下,用于找到driver的信息然后退出JVM

def pollAndReportStatus(driverId: String) {

logInfo("... waiting before polling master for driver state")

Thread.sleep(5000)

logInfo("... polling master for driver state")

//master请求得到Driver的信息

val statusResponse =

activeMasterEndpoint.askWithRetryDriverStatusResponse

statusResponse.found match {

case false =>

logError(s"ERROR: Cluster master did not recognize $driverId")

System.exit(-1)

case true =>

logInfo(s"State of $driverId is ${statusResponse.state.get}")

//返回的其实是worker的信息

(statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {

case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>

logInfo(s"Driver running on $hostPort ($id)")

case _ =>

}

statusResponse.exception.map { e =>

logError(s"Exception from cluster was: $e")

e.printStackTrace()

System.exit(-1)

}

System.exit(0)

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: