Spark通信机制:1)Spark1.3 vs Spark1.6源码分析
2017-01-06 09:56
351 查看
前一段时间看了Spark1.3的源码,其RPC通信机制是基于Akka的,但是在Spark1.6中,提供了2种实现方式:Netty(默认)、Akka
下面对比Spark1.3的Akka通信机制,看下Spark1.6中Master是如何与Worker进行通信。
首先看下Spark1.6中的Master类
RpcEndpoint 里面有个关键的成员RpcEnv,RpcEndpoint 需要向RpcEnv注册一个名字来收发消息,这个特质后面会详细介绍。RpcEndpoint 里的方法其是为例兼容akka和netty设计的,其可以类比Akka.Acotr:
preStart <----> onStart 类启动时调用
receive或receiveAndReply <----> receiveWithLogging 接收消息使用
再回到Master的main方法中
在抽象层的基础上,相比较Spark1.3的Akka.Actor,看看Spark1.6中Master是如何与Worker通信的
在Spark1.3中Worker的presStart方法最终调用了tryRegisterAllMasters向Master注册,其代码如下
在Spark1.6中Worker的onStart方法调用了registerWithMaster方法向Master进行注册,具体看下registerWithMaster
在Master中看下其是如何接受处理这个注册请求的,具体在Master.receiveAndReply中
后话:Spark1.6只是把通信机制进行了更高层次的抽象,核心的DAGScheduler和TaskSheduler并没有本质上的改变。熟悉Spark1.3的源码依然可以看懂Spark1.6或者Spark2.0的源码
下面对比Spark1.3的Akka通信机制,看下Spark1.6中Master是如何与Worker进行通信。
首先看下Spark1.6中的Master类
private[deploy] class Master( //v1.6 override val rpcEnv: RpcEnv, address: RpcAddress, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends ThreadSafeRpcEndpoint with Logging with LeaderElectable
private[spark] class Master( //v1.3 host: String, port: Int, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends Actor with ActorLogReceive with Logging with LeaderElectableSpark1.3中Master直接继承Akka.Actor,而Spark1.6中Maste继承了特质ThreadSafeRpcEndpoint,看下特质ThreadSafeRpcEndpoint
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint //v1.6ThreadSafeRpcEndpoint又继承特质RpcEndpoint,RpcEndpoint算法比较重要,其部分代码如下
private[spark] trait RpcEndpoint { //v1.6 //只保留我们关心的代码 /** * The [[RpcEnv]] that this [[RpcEndpoint]] is registered to. */ val rpcEnv: RpcEnv /** * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is * called. And `self` will become `null` when `onStop` is called. * * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called. */ final def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) } /** * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a * unmatched message, [[SparkException]] will be thrown and sent to `onError`. */ def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException(self + " does not implement 'receive'") } /** * Invoked before [[RpcEndpoint]] starts to handle any message. */ def onStart(): Unit = { // By default, do nothing. } }
RpcEndpoint 里面有个关键的成员RpcEnv,RpcEndpoint 需要向RpcEnv注册一个名字来收发消息,这个特质后面会详细介绍。RpcEndpoint 里的方法其是为例兼容akka和netty设计的,其可以类比Akka.Acotr:
preStart <----> onStart 类启动时调用
receive或receiveAndReply <----> receiveWithLogging 接收消息使用
再回到Master的main方法中
def main(argStrings: Array[String]) { //v1.6 SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf) rpcEnv.awaitTermination() }其调用了startRpcEnvAndEndpoint创建了一个rpcEnv的实例,下面看下Master中的startRpcEnvAndEndpoint方法
/** * Start the Master and return a three tuple of: * (1) The Master RpcEnv * (2) The web UI bound port * (3) The REST server bound port, if any */ def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) } }其调用了RpcEnv.create创建了一个rpcEnv,接着去RpcEnv.create中
def create( //v1.6 name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager, clientMode: Boolean = false): RpcEnv = { // Using Reflection to create the RpcEnv to avoid to depend on Akka directly val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode) getRpcEnvFactory(conf).create(config) } }首先它getRpcEnvFactory通过读取配置文件获得RpcEnvFactory来创建rpcEnv,getRpcEnvFactory方法代码:
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = { val rpcEnvNames = Map( "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory", "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory") val rpcEnvName = conf.get("spark.rpc", "netty") val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName) Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory] }可以看到这里的工厂有2种实现:akka和netty,可以在配置文件中指导使用哪种通信机制,最后使用反射的机制创建了RpcEnvFactory(事实上,是其子类)。接下来看下这个RpcEnvFactory工厂
private[spark] trait RpcEnvFactory { def create(config: RpcEnvConfig): RpcEnv }其设计很简单只有一个方法,下面分别看下它的2个实现类AkkaRpcEnvFactory和NettyRpcEnvFactory
private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory { def create(config: RpcEnvConfig): RpcEnv = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem( config.name, config.host, config.port, config.conf, config.securityManager) actorSystem.actorOf(Props(classOf[ErrorMonitor]), "ErrorMonitor") new AkkaRpcEnv(actorSystem, config.securityManager, config.conf, boundPort) } }
private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { def create(config: RpcEnvConfig): RpcEnv = { val sparkConf = config.conf val javaSerializerInstance = new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance] val nettyEnv = new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager) if (!config.clientMode) { val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => nettyEnv.startServer(actualPort) (nettyEnv, nettyEnv.address.port) } try { Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1 } catch { case NonFatal(e) => nettyEnv.shutdown() throw e } } nettyEnv } }
在抽象层的基础上,相比较Spark1.3的Akka.Actor,看看Spark1.6中Master是如何与Worker通信的
在Spark1.3中Worker的presStart方法最终调用了tryRegisterAllMasters向Master注册,其代码如下
private def tryRegisterAllMasters() { //v1.3 for (masterAkkaUrl <- masterAkkaUrls) { logInfo("Connecting to master " + masterAkkaUrl + "...") val actor = context.actorSelection(masterAkkaUrl) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } }在Master端,Master在receiveWithLogging方法中接收到了Worker的注册消息,并给Worker返回了一个注册成功的反馈
override def receiveWithLogging = {//v1.3 //只保留我们关心的代码...... case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) //通知worker注册成功 sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } } }
在Spark1.6中Worker的onStart方法调用了registerWithMaster方法向Master进行注册,具体看下registerWithMaster
private def registerWithMaster() {//v1.6 //只保留我们关心的代码...... registrationRetryTimer match { case None => registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { Option(self).foreach(_.send(ReregisterWithMaster)) } }, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS, TimeUnit.SECONDS)) case Some(_) => logInfo("Not spawning another attempt to register with the master, since there is an" + " attempt scheduled already.") } }里面使用了一个线程池调度器来向可能的Master发送注册的消息,这里的self是一个方法
final def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) }self用来获取RPC通信的实例对象,registerWithMaster中的send方法类似于Spark1.3中的 actor ! RegisterWorker
在Master中看下其是如何接受处理这个注册请求的,具体在Master.receiveAndReply中
//receive只处理消息,而receiveAndReply处理消息并反馈 override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { context.reply(MasterInStandby) } else if (idToWorker.contains(id)) { context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) //通知worker注册成功 context.reply(RegisteredWorker(self, masterWebUiUrl)) schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } } } //..... }
后话:Spark1.6只是把通信机制进行了更高层次的抽象,核心的DAGScheduler和TaskSheduler并没有本质上的改变。熟悉Spark1.3的源码依然可以看懂Spark1.6或者Spark2.0的源码
相关文章推荐
- Standalone模式下Spark 中通信机制的源码分析
- Spark源码分析之Worker启动通信机制
- nginx源码分析1———进程间的通信机制二(自旋锁)
- Spark底层通信RPC源码分析
- Spark1.3从创建到提交:9)Stage的划分和提交源码分析
- Spark1.3从创建到提交:4)资源分配源码分析
- OpenStack建立实例完整过程源码详细分析(15)----依据AMQP通信架构实现消息接收机制解析之二
- Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- spark源码学习(十二)--- checkpoint机制分析
- Spark1.3从创建到提交:8)DAGScheduler.runJob源码分析
- ceph源码分析之消息通信机制
- nginx源码分析1———进程间的通信机制五(文件锁)
- Spark1.3从创建到提交:2)spark-submit和SparkContext源码分析
- OpenStack建立实例完整过程源码详细分析(12)----依据AMQP通信架构实现消息发送机制解析之一
- nginx源码分析1———进程间的通信机制三(mmap)
- nginx源码分析1———进程间的通信机制六(UNIX域协议)
- Android线程间异步通信机制源码分析
- Spark1.3从创建到提交:5)Executor启动源码分析
- Spark1.3从创建到提交:3)任务调度初始化源码分析