spark源码解析 spark-core之rpc
2017-07-09 00:42
344 查看
最近在看spark-2.1.1的源码,感觉还是有必要记录下,ok 废话就不说了。
自从1.6之后spark rpc的默认实现由原来的akka被netty所替代。netty基于消息发挥作用,并且多地用于网络。 它的核心是NIO,早起spark就是用netty解决大文件传输问题,netty具有使用各种协议(如HTTP,FTP,SSL等)的许多功能。同时netty所具有的网络通信的可靠性和高效性此外,可以实现对线程模型进行了更细粒度的控制。
RpcEndpoint:可以理解为master或者worker节点,内部根据不同的端点需求设计不同的消息和处理逻辑,如果需要发送/询问则调用dispatcher
RpcEndpointRef:是对RpcEndPoint的远程引用对象,通过它可以向远程的RpcEndpoint端发送消息以进行通信。
send()方法是基于netty实现,在NettyRpcEndpointRef类的方法中
通过NettyRpcEnv来发送消息,把NettyRpcEndpointRef 封装成RequestMessage对象,而ask则是future通过异步调用获取结果
RpcEnv: RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv。在NettyRpcEnv中一个rcpendPoint只能注册一次,
Dispatcher:类似一个消息分发器,对于RPC端点需要发送或者远程RPC介绍到的消息分发至对应的指令收件箱/发件箱,如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱
Inbox :一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时都会将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费
OutBox:一个远程端点对应一个发件箱,当消息放入Outbox后,紧接着将消息通过TransportClient发送出去,在同一个线程中进行,原因为远程消息分为RpcOutboxMessage, OneWayOutboxMessage两种消息,而针对于需要应答的消息直接发送更加合适
TransportClient:Netty通信客户端,根据outbox的消息的receiver请求对应远程TransportServer,
TransportServer:Netty通信服务端,一个RPC端点一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱
创建NettyRpcEnv环境对象:
创建一个NettyRpcEnv对象对象,需要通过NettyRpcEnvFactory来创建
Dispatcher负责RPC消息的路由,它能够将消息路由到对应的RpcEndpoint进行处理
NettyStreamManager负责提供文件服务(文件、JAR文件、目录)
NettyRpcHandler负责处理网络IO事件,接收RPC调用请求,并通过Dispatcher派发消息
TransportContext负责管理网路传输上下文信息:创建MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer
TransportServer配置并启动一个RPC Server服务
消息路由过程分析:
在standalone模式中master节点的实现
由此也已看出来,master类继承自ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是继承自RpcEndpoint特质。
master节点启动的时候;
在master节点的receiveAndReply中,处理的第一条就是处理worker节点的注册信息,worker是通过将自己注册到master的RpcEnv中来实现master同worker通讯
对于worker节点:
同样是继承了ThreadSafeRpcEndpoint,而ThreadSafeRpcEndpoint继承自RpcEndpoint
worker节点在启动的时候
work节点处理同master节点类似,
自从1.6之后spark rpc的默认实现由原来的akka被netty所替代。netty基于消息发挥作用,并且多地用于网络。 它的核心是NIO,早起spark就是用netty解决大文件传输问题,netty具有使用各种协议(如HTTP,FTP,SSL等)的许多功能。同时netty所具有的网络通信的可靠性和高效性此外,可以实现对线程模型进行了更细粒度的控制。
RpcEndpoint:可以理解为master或者worker节点,内部根据不同的端点需求设计不同的消息和处理逻辑,如果需要发送/询问则调用dispatcher
private[spark] trait RpcEndpoint { /** * RpcEndpoint是一个通讯端,例如master,worker 启动的时候会创建一个 [[RpcEnv]] 当前[[RpcEndpoint]] 注册到RpcEnv. * 如果想和另外一个RpcEndpoint端通信,一定要获取RpcEndpoint的RpcEndpointRef,而获取该RpcEndpointRef只能通过一个RpcEnv环境对象来获取 */ val rpcEnv: RpcEnv /** * The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is * called. And `self` will become `null` when `onStop` is called. * RpcEndpointRef是用来发送消息的,随着onstart()而可用,随onStop()而消亡 * Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not * valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called. */ final def self: RpcEndpointRef = { require(rpcEnv != null, "rpcEnv has not been initialized") rpcEnv.endpointRef(this) } /** * Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a * unmatched message, [[SparkException]] will be thrown and sent to `onError`. * 处理来自 RpcEndpointRef.send和RpcCallContext.reply)]的消息,如果收到不匹配的消息, * 会抛出SparkException 并且会被发往onError * 该类消息不需要进行响应(reply),只是在RpcEndpoint进行处理 * */ def receive: PartialFunction[Any, Unit] = { case _ => throw new SparkException(self + " does not implement 'receive'") } /** * Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message, * [[SparkException]] will be thrown and sent to `onError`. * 处理来自RpcEndpointRef.ask的消息,异常处理同上 * RpcEndpoint端处理完成后,需要调用 RpcEndpointRef.ask通讯端口reply */ def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case _ => context.sendFailure(new SparkException(self + " won't reply anything")) } /** * Invoked when any exception is thrown during handling messages. * 在处理消息的时候任何异常发生的时候都会被调用 */ def onError(cause: Throwable): Unit = { // By default, throw e and let RpcEnv handle it throw cause } /** * Invoked when `remoteAddress` is connected to the current node. */ def onConnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. } /** * Invoked when `remoteAddress` is lost. */ def onDisconnected(remoteAddress: RpcAddress): Unit = { // By default, do nothing. } /** * Invoked when some network error happens in the connection between the current node and * `remoteAddress`. */ def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { // By default, do nothing. } /** * Invoked before [[RpcEndpoint]] starts to handle any message. */ def onStart(): Unit = { // By default, do nothing. } /** * Invoked when [[RpcEndpoint]] is stopping. `self` will be `null` in this method and you cannot * use it to send or ask messages. */ def onStop(): Unit = { // By default, do nothing. } /** * A convenient method to stop [[RpcEndpoint]]. */ final def stop(): Unit = { val _self = self if (_self != null) { rpcEnv.stop(_self) } } } /** * A trait that requires RpcEnv thread-safely sending messages to it. * * Thread-safety means processing of one message happens before processing of the next message by * the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a * [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the * [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent. * * However, there is no guarantee that the same thread will be executing the same * [[ThreadSafeRpcEndpoint]] for different messages. */ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
RpcEndpointRef:是对RpcEndPoint的远程引用对象,通过它可以向远程的RpcEndpoint端发送消息以进行通信。
/** * A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe. * RpcEndpointRef是一个对RpcEndpoint的远程引用对象,通过它可以向远程的RpcEndpoint端发送消息以进行通信 */ private[spark] abstract class RpcEndpointRef(conf: SparkConf) extends Serializable with Logging { private[this] val maxRetries = RpcUtils.numRetries(conf) private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf) private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf) /** * return the address for the [[RpcEndpointRef]] */ def address: RpcAddress def name: String /** * Sends a one-way asynchronous message. Fire-and-forget semantics. * send方法发送消息后不等待响应,亦即Send-and-forget,Spark中基于Netty实现,实现在NettyRpcEndpointRef中 */ def send(message: Any): Unit /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within the specified timeout. * * This method only sends the message once and never retries. * 而ask方法发送消息后需要等待通信对端给予响应,通过Future来异步获取响应结果,也是在NettyRpcEndpointRef中实现 */ def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] /** * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to * receive the reply within a default timeout. * * This method only sends the message once and never retries. */ def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) /** * Send a message to the corresponding [[RpcEndpoint]] and get its result within a default * timeout, or throw a SparkException if this fails even after the default number of retries. * The default `timeout` will be used in every trial of calling `sendWithReply`. Because this * method retries, the message handling in the receiver side should be idempotent. * * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout) /** * Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a * specified timeout, throw a SparkException if this fails even after the specified number of * retri 14693 es. `timeout` will be used in every trial of calling `sendWithReply`. Because this method * retries, the message handling in the receiver side should be idempotent. * * Note: this is a blocking action which may cost a lot of time, so don't call it in a message * loop of [[RpcEndpoint]]. * * @param message the message to send * @param timeout the timeout duration * @tparam T type of the reply message * @return the reply message from the corresponding [[RpcEndpoint]] */ def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = { // TODO: Consider removing multiple attempts var attempts = 0 var lastException: Exception = null while (attempts < maxRetries) { attempts += 1 try { val future = ask[T](message, timeout) val result = timeout.awaitResult(future) if (result == null) { throw new SparkException("RpcEndpoint returned null") } return result } catch { case ie: InterruptedException => throw ie case e: Exception => lastException = e logWarning(s"Error sending message [message = $message] in $attempts attempts", e) } if (attempts < maxRetries) { Thread.sleep(retryWaitMs) } } throw new SparkException( s"Error sending message [message = $message]", lastException) } }
send()方法是基于netty实现,在NettyRpcEndpointRef类的方法中
//通过NettyRpcEnv来发送RequestMessage消息,并将当前NettyRpcEndpointRef封装到RequestMessage消息对象中发送出去, // 通信对端通过该NettyRpcEndpointRef能够识别出消息来源 override def send(message: Any): Unit = { require(message != null, "Message is null") nettyEnv.send(RequestMessage(nettyEnv.address, this, message)) }
通过NettyRpcEnv来发送消息,把NettyRpcEndpointRef 封装成RequestMessage对象,而ask则是future通过异步调用获取结果
RpcEnv: RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv。在NettyRpcEnv中一个rcpendPoint只能注册一次,
//一个RpcEndpoint只能注册一次(根据RpcEndpoint的名称来检查唯一性) // 这样在Dispatcher内部注册并维护RpcEndpoint与RpcEndpointRef的绑定关系 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef }
//调用NettyRpcEnv内部的Dispatcher对象注册RpcEndPoint override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = { dispatcher.registerRpcEndpoint(name, endpoint) }
/** * A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor * so that it can be created via Reflection. * RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv * 它负责管理RpcEndpoint的注册,以及如何从一个RpcEndpoint获取到一个RpcEndpointRef */ private[spark] object RpcEnv { //RpcEnv的3一个伴生对象,实现了create方法,调用了Netty工厂方法NettyRpcEnvFactory def create( name: String, host: String, port: Int, conf: SparkConf, securityManager: SecurityManager, clientMode: Boolean = false): RpcEnv = { create(name, host, host, port, conf, securityManager, clientMode) } def create( name: String, bindAddress: String, advertiseAddress: String, port: Int, conf: SparkConf, securityManager: SecurityManager, clientMode: Boolean): RpcEnv = { val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager, clientMode) new NettyRpcEnvFactory().create(config) } } /** * An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to * receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote * nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by * [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the * sender, or logging them if no such sender or `NotSerializableException`. * * [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri. * rpc环境,RpcEndpoint必须带名字注册到RpcEnv来收取消息,RpcEnv 会处理来自RpcEndpointRef或者远程节点的消息 * 并传递到相应的RpcEndpoint,如果RpcEnv捕获到未捕获的异常,会RpcCallContext.sendFailure来往sender发送异常 * 如果没有这个sender or NotSerializableException会以日志的形式记录 */ private[spark] abstract class RpcEnv(conf: SparkConf) { private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf) /** * Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement * [[RpcEndpoint.self]]. Return `null` if the corresponding [[RpcEndpointRef]] does not exist. */ private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef /** * Return the address that [[RpcEnv]] is listening to. * 返回 RpcEnv正在监听的地址 */ def address: RpcAddress /** * Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not * guarantee thread-safety. * 注册一个RpcEndpoint到RpcEnv中,RpcEnv来管理RpcEndpoint到RpcEndpointRef的绑定关系 * 每个RpcEndpoint都有一个唯一的名称,并返回RpcEndpointRef,但不保证线程安全 */ def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef /** * Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously. * 通过uri异步查询RpcEndpointRe */ def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] /** * Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action. * 通过uri查询RpcEndpointRef,这种方式会产生阻塞 */ def setupEndpointRefByURI(uri: String): RpcEndpointRef = { defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri)) } /** * Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName`. * This is a blocking action. * 通过address和endpointName查询RpcEndpointRef,这种方式会产生阻塞 */ def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = { setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString) } /** * Stop [[RpcEndpoint]] specified by `endpoint`. */ def stop(endpoint: RpcEndpointRef): Unit /** * Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully, * call [[awaitTermination()]] straight after [[shutdown()]]. */ def shutdown(): Unit /** * Wait until [[RpcEnv]] exits. * * TODO do we need a timeout parameter? */ def awaitTermination(): Unit /** * [[RpcEndpointRef]] cannot be deserialized without [[RpcEnv]]. So when deserializing any object * that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method. * 没有RpcEnv的话RpcEndpointRef是无法被反序列化的,这里是反序列化逻辑 */ def deserialize[T](deserializationAction: () => T): T /** * Return the instance of the file server used to serve files. This may be `null` if the * RpcEnv is not operating in server mode. */ def fileServer: RpcEnvFileServer /** * Open a channel to download a file from the given URI. If the URIs returned by the * RpcEnvFileServer use the "spark" scheme, this method will be called by the Utils class to * retrieve the files. * * @param uri URI with location of the file. */ def openChannel(uri: String): ReadableByteChannel } /** * A server used by the RpcEnv to server files to other processes owned by the application. * * The file server can return URIs handled by common libraries (such as "http" or "hdfs"), or * it can return "spark" URIs which will be handled by `RpcEnv#fetchFile`. */ private[spark] trait RpcEnvFileServer { /** * Adds a file to be served by this RpcEnv. This is used to serve files from the driver * to executors when they're stored on the driver's local file system. * * @param file Local file to serve. * @return A URI for the location of the file. */ def addFile(file: File): String /** * Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using * `SparkContext.addJar`. * * @param file Local file to serve. * @return A URI for the location of the file. */ def addJar(file: File): String /** * Adds a local directory to be served via this file server. * * @param baseUri Leading URI path (files can be retrieved by appending their relative * path to this base URI). This cannot be "files" nor "jars". * @param path Path to the local directory. * @return URI for the root of the directory in the file server. */ def addDirectory(baseUri: String, path: File): String /** Validates and normalizes the base URI for directories. */ protected def validateDirectoryUri(baseUri: String): String = { val fixedBaseUri = "/" + baseUri.stripPrefix("/").stripSuffix("/") require(fixedBaseUri != "/files" && fixedBaseUri != "/jars", "Directory URI cannot be /files nor /jars.") fixedBaseUri } } private[spark] case class RpcEnvConfig( conf: SparkConf, name: String, bindAddress: String, advertiseAddress: String, port: Int, securityManager: SecurityManager, clientMode: Boolean)
Dispatcher:类似一个消息分发器,对于RPC端点需要发送或者远程RPC介绍到的消息分发至对应的指令收件箱/发件箱,如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱
//一个RpcEndpoint只能注册一次(根据RpcEndpoint的名称来检查唯一性) // 这样在Dispatcher内部注册并维护RpcEndpoint与RpcEndpointRef的绑定关系 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = { val addr = RpcEndpointAddress(nettyEnv.address, name) val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv) synchronized { if (stopped) { throw new IllegalStateException("RpcEnv has been stopped") } if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) { throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name") } val data = endpoints.get(name) endpointRefs.put(data.endpoint, data.ref) receivers.offer(data) // for the OnStart message } endpointRef }
Inbox :一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时都会将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费
/** * inbox 为一个RpcEndpoint保存消息和线程安全的向index post消息. */ private[netty] class Inbox( val endpointRef: NettyRpcEndpointRef, val endpoint: RpcEndpoint) extends Logging { inbox => // Give this an alias so we can use it more clearly in closures. @GuardedBy("this") protected val messages = new java.util.LinkedList[InboxMessage]() /** True if the inbox (and its associated endpoint) is stopped. */ @GuardedBy("this") private var stopped = false /** Allow multiple threads to process messages at the same time. */ @GuardedBy("this") private var enableConcurrent = false /** The number of threads processing messages for this inbox. */ @GuardedBy("this") private var numActiveThreads = 0 // OnStart should be the first message to process inbox.synchronized { messages.add(OnStart) } /** * Process stored messages. */ def process(dispatcher: Dispatcher): Unit = { var message: InboxMessage = null inbox.synchronized { if (!enableConcurrent && numActiveThreads != 0) { return } message = messages.poll() if (message != null) { numActiveThreads += 1 } else { return } } while (true) { safelyCall(endpoint) { message match { case RpcMessage(_sender, content, context) => try { endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}") }) } catch { case NonFatal(e) => context.sendFailure(e) // Throw the exception -- this exception will be caught by the safelyCall function. // The endpoint's onError function will be called. throw e } case OneWayMessage(_sender, content) => endpoint.receive.applyOrElse[Any, Unit](content, { msg => throw new SparkException(s"Unsupported message $message from ${_sender}") }) case OnStart => endpoint.onStart() if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) { inbox.synchronized { if (!stopped) { enableConcurrent = true } } } case OnStop => val activeThreads = inbox.synchronized { inbox.numActiveThreads } assert(activeThreads == 1, s"There should be only a single active thread but found $activeThreads threads.") dispatcher.removeRpcEndpointRef(endpoint) endpoint.onStop() assert(isEmpty, "OnStop should be the last message") case RemoteProcessConnected(remoteAddress) => endpoint.onConnected(remoteAddress) case RemoteProcessDisconnected(remoteAddress) => endpoint.onDisconnected(remoteAddress) case RemoteProcessConnectionError(cause, remoteAddress) => endpoint.onNetworkError(cause, remoteAddress) } } inbox.synchronized { // "enableConcurrent" will be set to false after `onStop` is called, so we should check it // every time. if (!enableConcurrent && numActiveThreads != 1) { // If we are not the only one worker, exit numActiveThreads -= 1 return } message = messages.poll() if (message == null) { numActiveThreads -= 1 return } } } } def post(message: InboxMessage): Unit = inbox.synchronized { if (stopped) { // We already put "OnStop" into "messages", so we should drop further messages onDrop(message) } else { messages.add(message) false } } def stop(): Unit = inbox.synchronized { // The following codes should be in `synchronized` so that we can make sure "OnStop" is the last // message if (!stopped) { // We should disable concurrent here. Then when RpcEndpoint.onStop is called, it's the only // thread that is processing messages. So `RpcEndpoint.onStop` can release its resources // safely. enableConcurrent = false stopped = true messages.add(OnStop) // Note: The concurrent events in messages will be processed one by one. } } def isEmpty: Boolean = inbox.synchronized { messages.isEmpty } /** * Called when we are dropping a message. Test cases override this to test message dropping. * Exposed for testing. */ protected def onDrop(message: InboxMessage): Unit = { logWarning(s"Drop $message because $endpointRef is stopped") } /** * Calls action closure, and calls the endpoint's onError function in the case of exceptions. */ private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = { try action catch { case NonFatal(e) => try endpoint.onError(e) catch { case NonFatal(ee) => logError(s"Ignoring error", ee) } } } }
OutBox:一个远程端点对应一个发件箱,当消息放入Outbox后,紧接着将消息通过TransportClient发送出去,在同一个线程中进行,原因为远程消息分为RpcOutboxMessage, OneWayOutboxMessage两种消息,而针对于需要应答的消息直接发送更加合适
TransportClient:Netty通信客户端,根据outbox的消息的receiver请求对应远程TransportServer,
TransportServer:Netty通信服务端,一个RPC端点一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱
创建NettyRpcEnv环境对象:
创建一个NettyRpcEnv对象对象,需要通过NettyRpcEnvFactory来创建
Dispatcher负责RPC消息的路由,它能够将消息路由到对应的RpcEndpoint进行处理
NettyStreamManager负责提供文件服务(文件、JAR文件、目录)
NettyRpcHandler负责处理网络IO事件,接收RPC调用请求,并通过Dispatcher派发消息
TransportContext负责管理网路传输上下文信息:创建MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer
TransportServer配置并启动一个RPC Server服务
消息路由过程分析:
在standalone模式中master节点的实现
private[deploy] class Master( override val rpcEnv: RpcEnv, address: RpcAddress, webUiPort: Int, val securityMgr: SecurityManager, val conf: SparkConf) extends ThreadSafeRpcEndpoint with Logging with LeaderElectable { }
/** * A trait that requires RpcEnv thread-safely sending messages to it. * * Thread-safety means processing of one message happens before processing of the next message by * the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a * [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the * [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent. * * However, there is no guarantee that the same thread will be executing the same * [[ThreadSafeRpcEndpoint]] for different messages. */ private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
由此也已看出来,master类继承自ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是继承自RpcEndpoint特质。
master节点启动的时候;
/** * Start the Master and return a three tuple of: * (1) The Master RpcEnv * (2) The web UI bound port * (3) The REST server bound port, if any * 创建一个RpcEnv对象,通过创建一个NettyRpcEnvFactory对象来完成该RpcEnv对象的创建, * 实际创建了一个NettyRpcEnv对象。接着,通过setupEndpoint方法注册一个RpcEndpoint, * 这里Master就是一个RpcEndpoint,返回的masterEndpoint是Master的RpcEndpointRef引用对象。 */ def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, conf: SparkConf): (RpcEnv, Int, Option[Int]) = { val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr) val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest) (rpcEnv, portsResponse.webUIPort, portsResponse.restPort) }
在master节点的receiveAndReply中,处理的第一条就是处理worker节点的注册信息,worker是通过将自己注册到master的RpcEnv中来实现master同worker通讯
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RegisterWorker( id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { context.reply(MasterInStandby) } else if (idToWorker.contains(id)) { context.reply(RegisterWorkerFailed("Duplicate worker ID")) } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, workerRef, workerWebUiUrl) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) context.reply(RegisteredWorker(self, masterWebUiUrl)) schedule() } else { val workerAddress = worker.endpoint.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress)) } } case RequestSubmitDriver(description) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only accept driver submissions in ALIVE state." context.reply(SubmitDriverResponse(self, false, None, msg)) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget". context.reply(SubmitDriverResponse(self, true, Some(driver.id), s"Driver successfully submitted as ${driver.id}")) } case RequestKillDriver(driverId) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + s"Can only kill drivers in ALIVE state." context.reply(KillDriverResponse(self, driverId, success = false, msg)) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => if (waitingDrivers.contains(d)) { waitingDrivers -= d self.send(DriverStateChanged(driverId, DriverState.KILLED, None)) } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master // to notify it that the driver was successfully killed. d.worker.foreach { w => w.endpoint.send(KillDriver(driverId)) } } // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) context.reply(KillDriverResponse(self, driverId, success = true, msg)) case None => val msg = s"Driver $driverId has already finished or does not exist" logWarning(msg) context.reply(KillDriverResponse(self, driverId, success = false, msg)) } } case RequestDriverStatus(driverId) => if (state != RecoveryState.ALIVE) { val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " + "Can only request driver status in ALIVE state." context.reply( DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg)))) } else { (drivers ++ completedDrivers).find(_.id == driverId) match { case Some(driver) => context.reply(DriverStatusResponse(found = true, Some(driver.state), driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception)) case None => context.reply(DriverStatusResponse(found = false, None, None, None, None)) } } case RequestMasterState => context.reply(MasterStateResponse( address.host, address.port, restServerBoundPort, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state)) case BoundPortsRequest => context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort)) case RequestExecutors(appId, requestedTotal) => context.reply(handleRequestExecutors(appId, requestedTotal)) case KillExecutors(appId, executorIds) => val formattedExecutorIds = formatExecutorIds(executorIds) context.reply(handleKillExecutors(appId, formattedExecutorIds)) }
对于worker节点:
private[deploy] class Worker( override val rpcEnv: RpcEnv, webUiPort: Int, cores: Int, memory: Int, masterRpcAddresses: Array[RpcAddress], endpointName: String, workDirPath: String = null, val conf: SparkConf, val securityMgr: SecurityManager) extends ThreadSafeRpcEndpoint with Logging { //worker节电注册到master节点,发送一个RegisterWorker消息到masterEndpoint,并期望返回RegisterWorkerResponse //对response做相应的处理, private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = { masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker( workerId, host, port, self, cores, memory, workerWebUiUrl)) .onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => Utils.tryLogNonFatalError { handleRegisterResponse(msg) } case Failure(e) => logError(s"Cannot register with master: ${masterEndpoint.address}", e) System.exit(1) }(ThreadUtils.sameThread) } /** * Send a message to the current master. If we have not yet registered successfully with any * master, the message will be dropped. * send一个消息到当前的master节点,如果还没有成功注册则消息会被dropped * 主要会有一些心跳发送,状态改变等等 */ private def sendToMaster(message: Any): Unit = { master match { case Some(masterRef) => masterRef.send(message) case None => logWarning( s"Dropping $message because the connection to master has not yet been established") } } }
同样是继承了ThreadSafeRpcEndpoint,而ThreadSafeRpcEndpoint继承自RpcEndpoint
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint
worker节点在启动的时候
def startRpcEnvAndEndpoint( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None, conf: SparkConf = new SparkConf): RpcEnv = { // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("") val securityMgr = new SecurityManager(conf) val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr) val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_)) rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory, masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr)) rpcEnv }
work节点处理同master节点类似,
相关文章推荐
- spark-core_11:org.apache.spark.deploy.master.Master源码解析3--MasterWebUI(MasterRpcEndPoint,8080)初始化web
- spark-core_12:org.apache.spark.deploy.master.Master源码解析4-- 如何清理超时的Woker及zk是如何选举
- spark-core_09: org.apache.spark.deploy.master.Master源码解析1
- spark SQL源码阅读002——sql.core包核心类——002执行SQL语法2次解析SQL词(analyse)
- spark内核揭秘-05-SparkContext核心源码解析初体验
- spark内核揭秘-05-SparkContext核心源码解析初体验
- Spark技术内幕:Worker源码与架构解析
- Spark技术内幕:Worker源码与架构解析
- Spark 源码解析 ----RDD创建与本质
- Spark技术内幕: Task向Executor提交的源码解析
- Spark源码系列(九)Spark SQL初体验之解析过程详解
- Spark技术内幕:Client,Master和Worker 通信源码解析
- 源代码阅读方法 jQuery源码解析 核心模块core.js
- Hadoop RPC源码解析——Server类(一)
- Hadoop RPC源码解析——Server类(二)
- spark内核揭秘-06-TaskSceduler启动源码解析初体验
- Spark源码解析——Shuffle
- Spark源码解析——Storage模块
- Spark视频第15期:Spark内核源码深度解析
- Hadoop RPC源码解析——RPC框架详解