您的位置:首页 > 其它

第43课: Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

2017-06-09 07:19 691 查看
第43课: Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等Spark 1.6推出了以RpcEnv、RPCEndpoint、RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka;Akka是基于Actor的分布式消息通信系统,而在Spark 1.6中封装了Akka,提供更高层的Rpc实现,目的是移除对Akka的依赖,为扩展和自定义Rpc打下基础;
         Spark2.0版本中Rpc的变化情况:
[ SPARK-6280 ] - 从Spark中删除Akka systemName
[ SPARK-7995 ] - 删除AkkaRpcEnv并从Core的依赖中删除Akka
[ SPARK-7997 ] - 删除开发人员api SparkEnv.actorSystem和AkkaUtils
 
RpcEnv是一个抽象类abstract class,传入SparkConf。RPC环境中 [[RpcEndpoint]]需要注册自己的名字[[RpcEnv]]来接收消息 。[[RpcEnv]] 将处理消息发送到[[RpcEndpointRef]]或远程节点,并提供给相应的 [[RpcEndpoint]]。 [[RpcEnv]]未被捕获的异常,[[RpcEnv]]将使用[[RpcCallContext.sendFailure]]发送异常给发送者,如果没有这样的发送者,则记录日志`NotSerializableException`
RpcEnv.scala的源码:
1.           private[spark]abstract class RpcEnv(conf: SparkConf) {
2.          
3.           private[spark] val defaultLookupTimeout =RpcUtils.lookupRpcTimeout(conf)
4.         ......
 
RpcCallContext.scala处理异常的方法包括reply、sendFailure、senderAddress,其中reply是给发送者发送一个信息。如果发送者是[[RpcEndpoint]], 它的[[RpcEndpoint.receive]]将被调用。
RpcCallContext.Scala源码:
1.          private[spark] trait RpcCallContext {
2.           /**
3.            * Reply a message to the sender. If thesender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
4.            * will be called.
5.            */
6.           def reply(response: Any): Unit
7.          
8.           /**
9.            * Report a failure to the sender.
10.         */
11.        def sendFailure(e: Throwable): Unit
12.        /**
13.         * The sender of this message.
14.         */
15.        def senderAddress: RpcAddress
16.      }
 
其中RpcCallContext的地址RpcAddress是一个case class,包括hostPort、toSparkURL等成员。
RpcAddress.scala源码如下:
1.           private[spark]case class RpcAddress(host: String, port: Int) {
2.           def hostPort: String = host + ":" +port
3.           /** Returns a string in the form of"spark://host:port". */
4.           def toSparkURL: String = "spark://"+ hostPort
5.           override def toString: String = hostPort
6.         }
RpcAddress伴生对象object RpcAddress属于包 org.apache.spark.rpc,fromURIString方法从String中提取出RpcAddress;fromSparkURL方法也是从String中提取出RpcAddress;说明:case classRpcAddress通过伴生对象object RpcAddress的方法调用,case class RpcAddress也有自己的方法fromURIString、fromSparkURL,而且方法fromURIString、fromSparkURL的返回值也是RpcAddress。
伴生对象RpcAddress的源码为:
1.          private[spark] object RpcAddress {
2.           /** Return the [[RpcAddress]] represented by`uri`. */
3.           def fromURIString(uri: String): RpcAddress ={
4.             val uriObj = new java.net.URI(uri)
5.             RpcAddress(uriObj.getHost, uriObj.getPort)
6.           }
7.           /** Returns the [[RpcAddress]] encoded in theform of "spark://host:port" */
8.           def fromSparkURL(sparkUrl: String):RpcAddress = {
9.             val (host, port) =Utils.extractHostPortFromSparkUrl(sparkUrl)
10.          RpcAddress(host, port)
11.        }
12.      }
 
RpcEnv解析:
1,  RpcEnv是RPC的环境(相当于Akka中的ActorSystem),所有的RPCEndpoint都需要注册到RpcEnv实例对象中(注册的时候会指定注册的名称,这样客户端就可以通过名称查询到RpcEndpoint的RpcEndpointRef引用,进而进行通信),在RpcEndpoint接受到消息后会调用receive方法进行处理;
2,  RpcEndpoint如果接收到需要reply的消息的话就会交给自己的receiveAndReply来处理(回复时候是通过RpcCallContext中的relpy方法来回复发送者的),如果不需要reply的话就交给receive方法来处理;
3,  RpcEnvFactory是负责创建RpcEnv的,通过create方法创建RpcEnv实例对象,默认是用的Netty。
RpcEnv示意图如下:



图 9- 1 RPC Env示意图
 
 
回到RpcEnv.scala的源码,首先调用RpcUtils.lookupRpcTimeout(conf),返回RPC远程端点查找时默认Spark的超时时间。方法lookupRpcTimeout中构建了一个RpcTimeout,定义"spark.rpc.lookupTimeout","spark.network.timeout"的超时时间是120秒。
RpcUtils.scala的lookupRpcTimeout方法:
1.           deflookupRpcTimeout(conf: SparkConf): RpcTimeout = {
2.             RpcTimeout(conf,Seq("spark.rpc.lookupTimeout", "spark.network.timeout"),"120s")
3.           }
   进入RpcTimeout,RpcTimeout关联超时的原因描述,当TimeoutException发生的时候,关于超时的额外的上下文将在异常消息中。
1.           private[spark]class RpcTimeout(val duration: FiniteDuration, val timeoutProp: String)
2.           extends Serializable {
3.          
4.           /** Amends the standard message ofTimeoutException to include the description */
5.           private def createRpcTimeoutException(te:TimeoutException): RpcTimeoutException = {
6.             new RpcTimeoutException(te.getMessage +". This timeout is controlled by " + timeoutProp, te)
7.           }
 
 
其中的RpcTimeoutException继承至TimeoutException:
1.             private[rpc] classRpcTimeoutException(message: String, cause:TimeoutException)
2.           extends TimeoutException(message) {initCause(cause) }
 
其中的TimeoutException继承至Exception:
1.            public class TimeoutException extendsException {
2.         ......
3.             public TimeoutException(String message) {
4.                 super(message);
5.             }
6.         }
 
回到RpcTimeout.scala,其中的addMessageIfTimeout方法,如果出现超时,将加入这些信息。
RpcTimeout.scala的addMessageIfTimeout源码:
1.           def addMessageIfTimeout[T]:PartialFunction[Throwable, T] = {
2.             // The exception has already been convertedto a RpcTimeoutException so just raise it
3.             case rte: RpcTimeoutException => throwrte
4.             // Any other TimeoutException get convertedto a RpcTimeoutException with modified message
5.             case te: TimeoutException => throwcreateRpcTimeoutException(te)
6.           }
 
RpcTimeout.scala中awaitResult方法比较关键:awaitResult一直等结果完成并获得结果,如果在指定的时间没有返回结果,就抛出异常[[RpcTimeoutException]]:
1.           def awaitResult[T](future: Future[T]): T = {
2.             val wrapAndRethrow:PartialFunction[Throwable, T] = {
3.               case NonFatal(t) =>
4.                 throw newSparkException("Exception thrown in awaitResult", t)
5.             }
6.             try {
7.               // scalastyle:off awaitresult
8.               Await.result(future, duration)
9.               // scalastyle:on awaitresult
10.          } catchaddMessageIfTimeout.orElse(wrapAndRethrow)
11.        }
12.      }
 
其中的future是Future[T]类型,继承至Awaitable。
1.          trait Future[+T] extends Awaitable[T]
 
Awaitable是一个trait,其中的ready方法是指Duration时间片内,Awaitable的状态变成completed状态,就是ready。在Await.result中,result本身是阻塞的。
Awaitable.scala源码:
1.           trait Awaitable[+T] {
2.         ......
3.         def ready(atMost:Duration)(implicit permit: CanAwait): this.type
4.         ……
5.           @throws(classOf[Exception])
6.           def result(atMost: Duration)(implicit permit:CanAwait): T
7.         }
8.           
 
回到RpcEnv.scala中,其中endpointRef方法返回我们注册的rpcendpoint的引用,是代理的模式。我们要使用RpcEndpoint,是通过RpcEndpointRef来使用的。Address方法是RpcEnv监听的地址;setupEndpoint方法注册的时候根据RpcEndpoint名称返回RpcEndpointRef。fileServer返回用于服务文件的文件服务器实例。如果RpcEnv不以服务器模式运行,可能是`null`值。
RpcEnv.scala源码:
1.          private[spark] abstract class RpcEnv(conf:SparkConf) {
2.          
3.           private[spark] val defaultLookupTimeout =RpcUtils.lookupRpcTimeout(conf)
4.         ......
5.           private[rpc] def endpointRef(endpoint:RpcEndpoint): RpcEndpointRef
6.         def address: RpcAddress
7.         def setupEndpoint(name: String,endpoint: RpcEndpoint): RpcEndpointRef
8.         …….
9.           def fileServer: RpcEnvFileServer
10.      ……
RpcEnv.scala中的RpcEnvFileServer,RpcEnvConfig是一个case class。RpcEnvFileServer源码:
1.           private[spark]trait RpcEnvFileServer {
2.           def addFile(file: File): String
3.         ......
4.         private[spark] case classRpcEnvConfig(
5.             conf: SparkConf,
6.             name: String,
7.             bindAddress: String,
8.             advertiseAddress: String,
9.             port: Int,
10.          securityManager: SecurityManager,
11.          clientMode: Boolean)
 
 RpcEnv是一个抽象类,其具体的子类是NettyRpcEnv。在Spark 1.6版本中包括2种方式AkkaRpcEnv、NettyRpcEnv。在Spark 2.0版本中只有NettyRpcEnv。
看一下RpcEnvFactory,RpcEnvFactory是一个工厂类创建[[RpcEnv]],必须有一个空构造函数,以便可以使用反射创建。create根据具体的配置,反射出具体的实例对象。RpcEndpoint方法中定义了receiveAndReply方法、receive方法。
RpcEndpoint.scala源码:
1.          private[spark] trait RpcEnvFactory {
2.          
3.           def create(config: RpcEnvConfig): RpcEnv
4.         }
5.         private[spark] traitRpcEndpoint {
6.         ......
7.           val rpcEnv: RpcEnv
8.          
9.           ......
10.        final def self: RpcEndpointRef = {
11.          require(rpcEnv != null, "rpcEnv hasnot been initialized")
12.          rpcEnv.endpointRef(this)
13.        }
14.      …….
15.       
16.        def receive: PartialFunction[Any, Unit] = {
17.          case _ => throw new SparkException(self+ " does not implement 'receive'")
18.        }
19.      ……
20.        def receiveAndReply(context: RpcCallContext):PartialFunction[Any, Unit] = {
21.          case _ => context.sendFailure(newSparkException(self + " won't reply anything"))
22.        }
23.      ……
 
我们看一下Master,Master是继承至ThreadSafeRpcEndpoint,接收消息的使用receive方法、receiveAndReply方法。
Master.scala源码:
1.          private[deploy] class Master(
2.             override val rpcEnv: RpcEnv,
3.             address: RpcAddress,
4.             webUiPort: Int,
5.             val securityMgr: SecurityManager,
6.             val conf: SparkConf)
7.           extends ThreadSafeRpcEndpoint with Loggingwith LeaderElectable {
8.         ……
9.         override def receive:PartialFunction[Any, Unit] = {
10.          case ElectedLeader =>
11.            val (storedApps, storedDrivers,storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
12.            state = if (storedApps.isEmpty &&storedDrivers.isEmpty && storedWorkers.isEmpty) {
13.              RecoveryState.ALIVE
14.            } else {
15.              RecoveryState.RECOVERING
16.            }
17.            logInfo("I have been elected leader!New state: " + state)
18.            if (state == RecoveryState.RECOVERING) {
19.              beginRecovery(storedApps,storedDrivers, storedWorkers)
20.              recoveryCompletionTask =forwardMessageThread.schedule(new Runnable {
21.                override def run(): Unit =Utils.tryLogNonFatalError {
22.                  self.send(CompleteRecovery)
23.                }
24.              }, WORKER_TIMEOUT_MS,TimeUnit.MILLISECONDS)
25.            }
26.      ……
27.       
28.       
29.       
30.      override defreceiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
31.          case RegisterWorker(
32.              id, workerHost, workerPort, workerRef,cores, memory, workerWebUiUrl) =>
33.            logInfo("Registering worker %s:%dwith %d cores, %s RAM".format(
34.              workerHost, workerPort, cores, Utils.megabytesToString(memory)))
35.            if (state == RecoveryState.STANDBY) {
36.              context.reply(MasterInStandby)
37.            } else if (idToWorker.contains(id)) {
38.              context.reply(RegisterWorkerFailed("Duplicateworker ID"))
39.            } else {
40.              val worker = new WorkerInfo(id,workerHost, workerPort, cores, memory,
41.                workerRef, workerWebUiUrl)
42.              if (registerWorker(worker)) {
43.                persistenceEngine.addWorker(worker)
44.                context.reply(RegisteredWorker(self,masterWebUiUrl))
45.                schedule()
46.              } else {
47.                val workerAddress =worker.endpoint.address
48.                logWarning("Worker registrationfailed. Attempted to re-register worker at same " +
49.                  "address: " +workerAddress)
50.                context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "
51.                  + workerAddress))
52.              }
53.            }
 
其中ThreadSafeRpcEndpoint继承至RpcEndpoint:ThreadSafeRpcEndpoint是一个trait,需要RpcEnv 线程安全地发送消息给它。线程安全是指在处理下一个消息之前通过同样的 [[ThreadSafeRpcEndpoint]]处理一条消息。换句话说,改变[[ThreadSafeRpcEndpoint]]的内部字段在处理下一个消息是可见的,[[ThreadSafeRpcEndpoint]]的字段不需要volatile或 equivalent,不能保证对于不同的消息在相同的[[ThreadSafeRpcEndpoint]]相同线程中来处理。
1.              private[spark] trait ThreadSafeRpcEndpointextends RpcEndpoint
 
 
回到RpcEndpoint.scala,重点看一下receiveAndReply方法、receive方法。receive方法处理从[[RpcEndpointRef.send]]或者[[RpcCallContext.reply)]发过来的消息,如果收到一个不匹配的消息, [[SparkException]]会抛出一个异常`onError`。receiveAndReply方法处理从[[RpcEndpointRef.ask]]发过来的消息,如果收到一个不匹配的消息, [[SparkException]]会抛出一个异常`onError`。receiveAndReply方法返回PartialFunction对象。
RpcEndpoint.scala源码:
1.              def receive: PartialFunction[Any, Unit] ={
2.             case _ => throw new SparkException(self+ " does not implement 'receive'")
3.           }
4.          
5.         ......
6.           def receiveAndReply(context: RpcCallContext):PartialFunction[Any, Unit] = {
7.             case _ => context.sendFailure(newSparkException(self + " won't reply anything"))
8.           }
 
 
在Master.中:Receive方法中收到消息以后,不需要回复对方。
Master.scala的Receive方法源码:
1.            override def receive: PartialFunction[Any,Unit] = {
2.             case ElectedLeader =>
3.             .....
4.           recoveryCompletionTask =forwardMessageThread.schedule(new Runnable {
5.                   override def run(): Unit =Utils.tryLogNonFatalError {
6.                     self.send(CompleteRecovery)
7.                   }
8.                 }, WORKER_TIMEOUT_MS,TimeUnit.MILLISECONDS)
9.               }
10.       
11.          case CompleteRecovery =>completeRecovery()
12.       
13.       
14.          case RevokedLeadership =>
15.            logError("Leadership has beenrevoked -- master shutting down.")
16.            System.exit(0)
17.       
18.          case RegisterApplication(description,driver) =>
19.           ......
20.              schedule()
21.            
 
在Master.中:receiveAndReply方法中收到消息以后,都要通过context.reply回复对方。
Master.scala的receiveAndReply方法源码:
1.          override def receiveAndReply(context:RpcCallContext): PartialFunction[Any, Unit] = {
2.             case RegisterWorker(
3.              .......
4.               if (registerWorker(worker)) {
5.               ......
6.                   context.reply(RegisteredWorker(self,masterWebUiUrl))
7.                    
8.                 } else {
9.              ......
10.          context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "
11.                  + workerAddress))
12.              }
13.            }
14.       
15.          case RequestSubmitDriver(description) =>
16.           .....
17.              context.reply(SubmitDriverResponse(self,true, Some(driver.id),
18.                s"Driver successfully submittedas ${driver.id}"))
19.            }
20.      ......
 
Master.scala的receiveAndReply方法中的context.reply,RpcEndpoint如果接收到需要reply的消息的话就会交给自己的receiveAndReply来处理(回复时候是通过RpcCallContext中的relpy方法来回复发送者的),如果不需要reply的话就交给receive方法来处理;
RpcCallContext源码如下:
1.         private[spark] traitRpcCallContext {
2.          
3.           /**
4.            * Reply a message to the sender. If thesender is [[RpcEndpoint]], its [[RpcEndpoint.receive]]
5.            * will be called.
6.            */
7.           def reply(response: Any): Unit
8.          
9.           /**
10.         * Report a failure to the sender.
11.         */
12.        def sendFailure(e: Throwable): Unit
13.       
14.        /**
15.         * The sender of this message.
16.         */
17.        def senderAddress: RpcAddress
18.      }
 
回到RpcEndpoint.scala,RpcEnvFactory是一个trait,RpcEnvFactory是负责创建RpcEnv的,通过create方法创建RpcEnv实例对象,默认是用的Netty。
RpcEndpoint.scala源码:
1.          private[spark] trait RpcEnvFactory {
2.          
3.           def create(config: RpcEnvConfig): RpcEnv
4.         }
RpcEnvFactory的create方法没有具体的实现,我们看一下RpcEnvFactory的子类NettyRpcEnvFactory中的create的具体实现,使用nettyEnv的方式。
NettyRpcEnv.scala的create方法源码:
1.           def create(config: RpcEnvConfig): RpcEnv = {
2.             val sparkConf = config.conf
3.             // Use JavaSerializerInstance in multiplethreads is safe. However, if we plan to support
4.             // KryoSerializer in future, we have to useThreadLocal to store SerializerInstance
5.             val javaSerializerInstance =
6.               newJavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
7.             val nettyEnv =
8.               new NettyRpcEnv(sparkConf, javaSerializerInstance,config.advertiseAddress,
9.                 config.securityManager)
10.          if (!config.clientMode) {
11.            val startNettyRpcEnv: Int =>(NettyRpcEnv, Int) = { actualPort =>
12.              nettyEnv.startServer(config.bindAddress,actualPort)
13.              (nettyEnv, nettyEnv.address.port)
14.            }
15.            try {
16.              Utils.startServiceOnPort(config.port,startNettyRpcEnv, sparkConf, config.name)._1
17.            } catch {
18.              case NonFatal(e) =>
19.                nettyEnv.shutdown()
20.                throw e
21.            }
22.          }
23.          nettyEnv
24.        }
25.      }
 
  在Spark 2.0版本中回溯一下NettyRpcEnv的实例化过程。在SparkContext实例化的时候调用createSparkEnv方法。
SparkContext.scala源码:
1.         ……
2.         _env = createSparkEnv(_conf,isLocal, listenerBus)
3.             SparkEnv.set(_env)
4.         ......
5.          
6.           private[spark] def createSparkEnv(
7.               conf: SparkConf,
8.               isLocal: Boolean,
9.               listenerBus: LiveListenerBus): SparkEnv ={
10.          SparkEnv.createDriverEnv(conf, isLocal,listenerBus, SparkContext.numDriverCores(master))
11.        }
12.       
13.      .....
 
SparkContext的createSparkEnv方法中调用SparkEnv.createDriverEnv方法,看一下createDriverEnv方法的实现,其调用了create方法。
SparkEnv.scala的createDriverEnv源码:
1.           private[spark] def createDriverEnv(
2.             .......
3.             create(
4.               conf,
5.               SparkContext.DRIVER_IDENTIFIER,
6.               bindAddress,
7.               advertiseAddress,
8.               port,
9.               isLocal,
10.            numCores,
11.            ioEncryptionKey,
12.            listenerBus = listenerBus,
13.            mockOutputCommitCoordinator =mockOutputCommitCoordinator
14.          )
15.        }
16.       
17.       private def create(
18.          ........
19.          val rpcEnv = RpcEnv.create(systemName,bindAddress, advertiseAddress, port, conf,
20.            securityManager, clientMode = !isDriver)
21.      ……
 
在RpcEnv.scala中,creat方法直接new出来一个NettyRpcEnvFactory,调用NettyRpcEnvFactory().create方法,NettyRpcEnvFactory 继承至 RpcEnvFactory。在Spark 2.0中,RpcEnvFactory直接就是使用NettyRpcEnvFactory的方式。
RpcEnv.scala源码:
1.          private[spark] object RpcEnv {
2.          …….
3.          
4.           def create(
5.               name: String,
6.               bindAddress: String,
7.               advertiseAddress: String,
8.               port: Int,
9.               conf: SparkConf,
10.            securityManager: SecurityManager,
11.            clientMode: Boolean): RpcEnv = {
12.          val config = RpcEnvConfig(conf, name,bindAddress, advertiseAddress, port, securityManager,
13.            clientMode)
14.          new NettyRpcEnvFactory().create(config)
15.        }
NettyRpcEnvFactory().create的方法如下:
 NettyRpcEnv.scala源码:
1.           private[rpc] class NettyRpcEnvFactory extendsRpcEnvFactory with Logging {
2.          
3.           defcreate(config: RpcEnvConfig): RpcEnv = {
4.           ......
5.             val nettyEnv =
6.               new NettyRpcEnv(sparkConf,javaSerializerInstance, config.advertiseAddress,
7.                 config.securityManager)
8.             if (!config.clientMode) {
9.               val startNettyRpcEnv: Int => (NettyRpcEnv,Int) = { actualPort =>
10.              nettyEnv.startServer(config.bindAddress,actualPort)
11.              (nettyEnv, nettyEnv.address.port)
12.            }
13.            try {
14.              Utils.startServiceOnPort(config.port,startNettyRpcEnv, sparkConf, config.name)._1
15.            } catch {
16.              case NonFatal(e) =>
17.                nettyEnv.shutdown()
18.                throw e
19.            }
20.          }
21.          nettyEnv
22.        }
23.      }
 
NettyRpcEnvFactory().create中new出来一个NettyRpcEnv,NettyRpcEnv传入SparkConf参数,包括fileServer、startServer等方法。
NettyRpcEnv源码如下:
1.          private[netty] class NettyRpcEnv(
2.             val conf: SparkConf,
3.             javaSerializerInstance:JavaSerializerInstance,
4.             host: String,
5.             securityManager: SecurityManager) extendsRpcEnv(conf) with Logging {
6.          
7.         ……
8.           override def fileServer: RpcEnvFileServer =streamManager
9.         ......
10.        defstartServer(bindAddress: String, port: Int): Unit = {
11.          val bootstraps:java.util.List[TransportServerBootstrap] =
12.            if(securityManager.isAuthenticationEnabled()) {
13.              java.util.Arrays.asList(newSaslServerBootstrap(transportConf, securityManager))
14.            } else {
15.              java.util.Collections.emptyList()
16.            }
17.          server =transportContext.createServer(bindAddress, port, bootstraps)
18.          dispatcher.registerRpcEndpoint(
19.            RpcEndpointVerifier.NAME, newRpcEndpointVerifier(this, dispatcher))
20.        }
 
NettyRpcEnv.scala的startServer中,通过transportContext.createServer创建Server,使用dispatcher.registerRpcEndpoint方法dispatcher注册RpcEndpoint。在createServer方法中,new出来一个TransportServer。
TransportContext的createServer方法源码:
1.           public TransportServer createServer(
2.               String host, int port,List<TransportServerBootstrap> bootstraps) {
3.             return new TransportServer(this, host,port, rpcHandler, bootstraps);
4.           }
TransportServer.java的源码如下:
1.         public TransportServer(
2.               TransportContext context,
3.               String hostToBind,
4.               int portToBind,
5.               RpcHandler appRpcHandler,
6.               List<TransportServerBootstrap>bootstraps) {
7.             this.context = context;
8.             this.conf = context.getConf();
9.             this.appRpcHandler = appRpcHandler;
10.          this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
11.       
12.          try {
13.            init(hostToBind, portToBind);
14.          } catch (RuntimeException e) {
15.            JavaUtils.closeQuietly(this);
16.            throw e;
17.          }
18.        }
TransportServer.java中的关键方法是init,这是Netty本身的实现内容。
TransportServer.java中的init源码:
1.                private void init(String hostToBind, intportToBind) {
2.          
3.             IOMode ioMode =IOMode.valueOf(conf.ioMode());
4.             EventLoopGroup bossGroup =
5.               NettyUtils.createEventLoop(ioMode,conf.serverThreads(), conf.getModuleName() + "-server");
6.             EventLoopGroup workerGroup = bossGroup;
7.         …….    
 
接下来我们看一下RpcEndpointRef,RpcEndpointRef是一个抽象类,RpcEndpointRef是代理模式。
RpcEndpointRef.scala源码:
1.          private[spark] abstract classRpcEndpointRef(conf: SparkConf)
2.           extends Serializable with Logging {
3.          
4.           private[this] val maxRetries =RpcUtils.numRetries(conf)
5.           private[this] val retryWaitMs =RpcUtils.retryWaitMs(conf)
6.           private[this] val defaultAskTimeout =RpcUtils.askRpcTimeout(conf)
7.         ......
8.         def send(message: Any): Unit
9.         def ask[T: ClassTag](message:Any, timeout: RpcTimeout): Future[T]
10.      …..
 
NettyRpcEndpointRef是RpcEndpointRef的具体实现子类。ask 方法通过调用nettyEnv.ask传递消息。RequestMessage是一个case class。
NettyRpcEnv.scala的NettyRpcEndpointRef源码如下:
1.          private[netty] class NettyRpcEndpointRef(
2.             @transient private val conf: SparkConf,
3.             endpointAddress: RpcEndpointAddress,
4.             @transient @volatile private var nettyEnv:NettyRpcEnv)
5.           extends RpcEndpointRef(conf) withSerializable with Logging {
6.         ......
7.          override def ask[T: ClassTag](message: Any,timeout: RpcTimeout): Future[T] = {
8.             nettyEnv.ask(RequestMessage(nettyEnv.address,this, message), timeout)
9.           }
10.      ......
 
下面我们从实例的角度来看一下RPC的应用:
RpcEndpoint的生命周期:构造(constructor)-> 启动(onStart)、消息接收(receive、receiveAndReply )、停止(onStop)。
Master中接收消息的方式有2种:1,receive接收消息不回复 2,receiveAndReply通过context.reply的方式回复消息。例如Worker发送Master的RegisterWorker消息,在Master注册成功Master就返回Worker RegisteredWorker消息。
Master的receiveAndReply源码:
1.           override def receiveAndReply(context: RpcCallContext):PartialFunction[Any, Unit] = {
2.             case RegisterWorker(
3.           ......
4.                 if (registerWorker(worker)) {
5.                   persistenceEngine.addWorker(worker)
6.                   context.reply(RegisteredWorker(self,masterWebUiUrl))
7.                   schedule()
8.                 } else {
9.             ......
10.                context.reply(RegisterWorkerFailed("Attemptedto re-register worker at same address: "
11.                  + workerAddress))
12.              }
13.            }
 
Worker启动的时候,从生命周期的角度Worker实例化的时候提交Master进行注册:
Worker的onStart源码:
1.             override def onStart() {
2.           .......
3.             registerWithMaster()
4.          
5.             metricsSystem.registerSource(workerSource)
6.             metricsSystem.start()
7.             // Attach the worker metrics servlethandler to the web ui after the metrics system is started.
8.             metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
9.           }
进入registerWithMaster方法:
Worker的registerWithMaster源码:
1.          private def registerWithMaster() {
2.             ......
3.                 registerMasterFutures =tryRegisterAllMasters()
4.              ....
进入tryRegisterAllMasters()方法:在rpcEnv.setupEndpointRef中根据masterAddress、ENDPOINT_NAME名称来获取RpcEndpointRef
Worker的tryRegisterAllMasters源码:
1.           private def tryRegisterAllMasters():Array[JFuture[_]] = {
2.            ......
3.                     val masterEndpoint: RpcEndpointRef= rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
4.                     registerWithMaster(masterEndpoint)
5.                ......
基于masterEndpoint使用registerWithMaster方法进行注册,registerWithMaster方法中通过ask方法发生RegisterWorker消息,并要求发送返回结果,返回的消息类型为RegisterWorkerResponse。然后进行模式匹配,如果成功,就handleRegisterResponse。如果失败就退出。
Worker.scala的registerWithMaster源码
1.         private def registerWithMaster(masterEndpoint:RpcEndpointRef): Unit = {
2.             masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
3.               workerId, host, port, self, cores,memory, workerWebUiUrl))
4.               .onComplete {
5.                 // This is a very fast action so we canuse "ThreadUtils.sameThread"
6.                 case Success(msg) =>
7.                   Utils.tryLogNonFatalError {
8.                     handleRegisterResponse(msg)
9.                   }
10.              case Failure(e) =>
11.                logError(s"Cannot register withmaster: ${masterEndpoint.address}", e)
12.                System.exit(1)
13.            }(ThreadUtils.sameThread)
14.        }
handleRegisterResponse方法中模式匹配,收到RegisteredWorker消息进行相应的处理。
Worker.scala的handleRegisterResponse源码:
1.         private defhandleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
2.             msg match {
3.               case RegisteredWorker(masterRef,masterWebUiUrl) =>
4.              .......
5.              
 
 
 
 
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐