【原】Spark中Client源码分析(二)
2016-03-23 20:49
357 查看
继续前一篇的内容。前一篇内容为:
DriverClient中的代码比较简单,它只有一个main函数,同时,和AppClient一样,它也有一个ClientEndpoint,只是两者的用途不一样。
属性简单,直接略过。
(1)构造函数为ClientEndPoint主构造函数
(2)onstart方法如下,
(3)onstop方法简单,略过。
(4)receive方法如下,
①pollAndReportStatus方法如下,用于找到driver的信息然后退出JVM
DriverClient中的代码比较简单,它只有一个main函数,同时,和AppClient一样,它也有一个ClientEndpoint,只是两者的用途不一样。
1.Client
Client中唯一的main方法如下:def main(args: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { println("WARNING: This client is deprecated and will be removed in a future version of Spark") println("Use ./bin/spark-submit with "--master spark://host:port"") } val conf = new SparkConf() val driverArgs = new ClientArguments(args) if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { conf.set("spark.akka.logLifecycleEvents", "true") } conf.set("spark.rpc.askTimeout", "10") conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) Logger.getRootLogger.setLevel(driverArgs.logLevel) //创建一个driverClient的Rpc环境,并将得到Master和client的远程引用 val rpcEnv = RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME)) //clientpoint rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) //启动rpc环境 rpcEnv.awaitTermination() }
2.ClientEndpoint
ClientEndPoint可以看作给Driver传递消息的代理属性简单,直接略过。
(1)构造函数为ClientEndPoint主构造函数
(2)onstart方法如下,
override def onStart(): Unit = { driverArgs.cmd match { case "launch" => //driver包装类,使得Worker和Driver的Rpc环境一样,做到共进退 val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" //driver类路径 val classPathConf = "spark.driver.extraClassPath" val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } //driver库路径 val libraryPathConf = "spark.driver.extraLibraryPath" val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp => cp.split(java.io.File.pathSeparator) } //driver Jvm参数 val extraJavaOptsConf = "spark.driver.extraJavaOptions" val extraJavaOpts = sys.props.get(extraJavaOptsConf) .map(Utils.splitCommandString).getOrElse(Seq.empty) //将所有的在SparkConf中设置的属性赋值给java options的序列 val sparkJavaOpts = Utils.sparkJavaOpts(conf) //所有的javaOpts val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = new Command(mainClass, Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions, sys.env, classPathEntries, libraryPathEntries, javaOpts) //将以上所有的信息封装在DriverDescription中 val driverDescription = new DriverDescription( driverArgs.jarUrl, driverArgs.memory, driverArgs.cores, driverArgs.supervise, command) //异步请求给master发送Driver的信息 ayncSendToMasterAndForwardReplySubmitDriverResponse case "kill" => val driverId = driverArgs.driverId ayncSendToMasterAndForwardReplyKillDriverResponse } }
(3)onstop方法简单,略过。
(4)receive方法如下,
override def receive: PartialFunction[Any, Unit] = { //收到master的响应回来的Driver信息,因为master是管家,Client是老板 case SubmitDriverResponse(master, success, driverId, message) => logInfo(message) if (success) { //将当前的activeMasterEndpoint设置为响应消息的master activeMasterEndpoint = master //找到driver的信息然后退出JVM pollAndReportStatus(driverId.get) } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } case KillDriverResponse(master, driverId, success, message) => logInfo(message) if (success) { activeMasterEndpoint = master pollAndReportStatus(driverId),详见下① } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } }
①pollAndReportStatus方法如下,用于找到driver的信息然后退出JVM
def pollAndReportStatus(driverId: String) { logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") //master请求得到Driver的信息 val statusResponse = activeMasterEndpoint.askWithRetryDriverStatusResponse statusResponse.found match { case false => logError(s"ERROR: Cluster master did not recognize $driverId") System.exit(-1) case true => logInfo(s"State of $driverId is ${statusResponse.state.get}") //返回的其实是worker的信息 (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => logInfo(s"Driver running on $hostPort ($id)") case _ => } statusResponse.exception.map { e => logError(s"Exception from cluster was: $e") e.printStackTrace() System.exit(-1) } System.exit(0) } }
相关文章推荐
- Spring的IOC逐层深入——依赖注入的两种实现类型
- 星号图
- poj1185炮兵阵地【状压dp】
- poj2533 Longest Ordered Subsequence(最长上升子序列)
- Reverse Integer---7
- iOS---对AFNetworking进行二次封装,使用单例单独处理网络请求数据
- 第一章:CSS简介和基本语法
- 《世界是数字的》读后感
- 1020. Tree Traversals (25)
- Linux环境下jdk的配置
- Java基础之内部类
- 第4周项目5—奇数的阶乘
- ArcGIS api for javascript-图层控制
- c语言面试题重点整理·简单编程类
- iOS---对AFNetworking进行二次封装,使用单例单独处理网络请求数据
- EasyuiAPI:布局
- python编码最佳实践之总结
- 关于dirent API中基本函数的使用--载入数据篇
- kettle源码
- Android Stability [转]