您的位置:首页 > 其它

spark源码解析 spark-core之rpc

2017-07-09 00:42 344 查看
最近在看spark-2.1.1的源码,感觉还是有必要记录下,ok 废话就不说了。

自从1.6之后spark rpc的默认实现由原来的akka被netty所替代。netty基于消息发挥作用,并且多地用于网络。 它的核心是NIO,早起spark就是用netty解决大文件传输问题,netty具有使用各种协议(如HTTP,FTP,SSL等)的许多功能。同时netty所具有的网络通信的可靠性和高效性此外,可以实现对线程模型进行了更细粒度的控制。

RpcEndpoint:可以理解为master或者worker节点,内部根据不同的端点需求设计不同的消息和处理逻辑,如果需要发送/询问则调用dispatcher

private[spark] trait RpcEndpoint {

/**
* RpcEndpoint是一个通讯端,例如master,worker 启动的时候会创建一个 [[RpcEnv]] 当前[[RpcEndpoint]] 注册到RpcEnv.
* 如果想和另外一个RpcEndpoint端通信,一定要获取RpcEndpoint的RpcEndpointRef,而获取该RpcEndpointRef只能通过一个RpcEnv环境对象来获取
*/
val rpcEnv: RpcEnv

/**
* The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
* called. And `self` will become `null` when `onStop` is called.
* RpcEndpointRef是用来发送消息的,随着onstart()而可用,随onStop()而消亡
* Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
* valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
*/
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}

/**
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
* unmatched message, [[SparkException]] will be thrown and sent to `onError`.
* 处理来自 RpcEndpointRef.send和RpcCallContext.reply)]的消息,如果收到不匹配的消息,
* 会抛出SparkException 并且会被发往onError
* 该类消息不需要进行响应(reply),只是在RpcEndpoint进行处理
*
*/
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}

/**
* Process messages from [[RpcEndpointRef.ask]]. If receiving a unmatched message,
* [[SparkException]] will be thrown and sent to `onError`.
* 处理来自RpcEndpointRef.ask的消息,异常处理同上
* RpcEndpoint端处理完成后,需要调用 RpcEndpointRef.ask通讯端口reply
*/
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}

/**
* Invoked when any exception is thrown during handling messages.
* 在处理消息的时候任何异常发生的时候都会被调用
*/
def onError(cause: Throwable): Unit = {
// By default, throw e and let RpcEnv handle it
throw cause
}

/**
* Invoked when `remoteAddress` is connected to the current node.
*/
def onConnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}

/**
* Invoked when `remoteAddress` is lost.
*/
def onDisconnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}

/**
* Invoked when some network error happens in the connection between the current node and
* `remoteAddress`.
*/
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}

/**
* Invoked before [[RpcEndpoint]] starts to handle any message.
*/
def onStart(): Unit = {
// By default, do nothing.
}

/**
* Invoked when [[RpcEndpoint]] is stopping. `self` will be `null` in this method and you cannot
* use it to send or ask messages.
*/
def onStop(): Unit = {
// By default, do nothing.
}

/**
* A convenient method to stop [[RpcEndpoint]].
*/
final def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
}
}
}

/**
* A trait that requires RpcEnv thread-safely sending messages to it.
*
* Thread-safety means processing of one message happens before processing of the next message by
* the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a
* [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the
* [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent.
*
* However, there is no guarantee that the same thread will be executing the same
* [[ThreadSafeRpcEndpoint]] for different messages.
*/
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint


RpcEndpointRef:是对RpcEndPoint的远程引用对象,通过它可以向远程的RpcEndpoint端发送消息以进行通信。

/**
* A reference for a remote [[RpcEndpoint]]. [[RpcEndpointRef]] is thread-safe.
* RpcEndpointRef是一个对RpcEndpoint的远程引用对象,通过它可以向远程的RpcEndpoint端发送消息以进行通信
*/
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {

private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

/**
* return the address for the [[RpcEndpointRef]]
*/
def address: RpcAddress

def name: String

/**
* Sends a one-way asynchronous message. Fire-and-forget semantics.
*  send方法发送消息后不等待响应,亦即Send-and-forget,Spark中基于Netty实现,实现在NettyRpcEndpointRef中
*/
def send(message: Any): Unit

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within the specified timeout.
*
* This method only sends the message once and never retries.
* 而ask方法发送消息后需要等待通信对端给予响应,通过Future来异步获取响应结果,也是在NettyRpcEndpointRef中实现
*/
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

/**
* Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
* receive the reply within a default timeout.
*
* This method only sends the message once and never retries.
*/
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint]] and get its result within a default
* timeout, or throw a SparkException if this fails even after the default number of retries.
* The default `timeout` will be used in every trial of calling `sendWithReply`. Because this
* method retries, the message handling in the receiver side should be idempotent.
*
* Note: this is a blocking action which may cost a lot of time,  so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)

/**
* Send a message to the corresponding [[RpcEndpoint.receive]] and get its result within a
* specified timeout, throw a SparkException if this fails even after the specified number of
* retri
14693
es. `timeout` will be used in every trial of calling `sendWithReply`. Because this method
* retries, the message handling in the receiver side should be idempotent.
*
* Note: this is a blocking action which may cost a lot of time, so don't call it in a message
* loop of [[RpcEndpoint]].
*
* @param message the message to send
* @param timeout the timeout duration
* @tparam T type of the reply message
* @return the reply message from the corresponding [[RpcEndpoint]]
*/
def askWithRetry[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
// TODO: Consider removing multiple attempts
var attempts = 0
var lastException: Exception = null
while (attempts < maxRetries) {
attempts += 1
try {
val future = ask[T](message, timeout)
val result = timeout.awaitResult(future)
if (result == null) {
throw new SparkException("RpcEndpoint returned null")
}
return result
} catch {
case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
logWarning(s"Error sending message [message = $message] in $attempts attempts", e)
}

if (attempts < maxRetries) {
Thread.sleep(retryWaitMs)
}
}

throw new SparkException(
s"Error sending message [message = $message]", lastException)
}

}


send()方法是基于netty实现,在NettyRpcEndpointRef类的方法中

//通过NettyRpcEnv来发送RequestMessage消息,并将当前NettyRpcEndpointRef封装到RequestMessage消息对象中发送出去,
// 通信对端通过该NettyRpcEndpointRef能够识别出消息来源
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
}


通过NettyRpcEnv来发送消息,把NettyRpcEndpointRef 封装成RequestMessage对象,而ask则是future通过异步调用获取结果

RpcEnv: RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv。在NettyRpcEnv中一个rcpendPoint只能注册一次,

//一个RpcEndpoint只能注册一次(根据RpcEndpoint的名称来检查唯一性)
// 这样在Dispatcher内部注册并维护RpcEndpoint与RpcEndpointRef的绑定关系
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data)  // for the OnStart message
}
endpointRef
}


//调用NettyRpcEnv内部的Dispatcher对象注册RpcEndPoint
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}


/**
* A RpcEnv implementation must have a [[RpcEnvFactory]] implementation with an empty constructor
* so that it can be created via Reflection.
* RPC上下文环境,每个Rpc端点运行时依赖的上下文环境称之为RpcEnv
* 它负责管理RpcEndpoint的注册,以及如何从一个RpcEndpoint获取到一个RpcEndpointRef
*/
private[spark] object RpcEnv {
//RpcEnv的3一个伴生对象,实现了create方法,调用了Netty工厂方法NettyRpcEnvFactory
def create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
create(name, host, host, port, conf, securityManager, clientMode)
}

def create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean): RpcEnv = {
val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
clientMode)
new NettyRpcEnvFactory().create(config)
}
}

/**
* An RPC environment. [[RpcEndpoint]]s need to register itself with a name to [[RpcEnv]] to
* receives messages. Then [[RpcEnv]] will process messages sent from [[RpcEndpointRef]] or remote
* nodes, and deliver them to corresponding [[RpcEndpoint]]s. For uncaught exceptions caught by
* [[RpcEnv]], [[RpcEnv]] will use [[RpcCallContext.sendFailure]] to send exceptions back to the
* sender, or logging them if no such sender or `NotSerializableException`.
*
* [[RpcEnv]] also provides some methods to retrieve [[RpcEndpointRef]]s given name or uri.
* rpc环境,RpcEndpoint必须带名字注册到RpcEnv来收取消息,RpcEnv 会处理来自RpcEndpointRef或者远程节点的消息
* 并传递到相应的RpcEndpoint,如果RpcEnv捕获到未捕获的异常,会RpcCallContext.sendFailure来往sender发送异常
* 如果没有这个sender or NotSerializableException会以日志的形式记录
*/
private[spark] abstract class RpcEnv(conf: SparkConf) {

private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf)

/**
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
* [[RpcEndpoint.self]]. Return `null` if the corresponding [[RpcEndpointRef]] does not exist.
*/
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef

/**
* Return the address that [[RpcEnv]] is listening to.
* 返回 RpcEnv正在监听的地址
*/
def address: RpcAddress

/**
* Register a [[RpcEndpoint]] with a name and return its [[RpcEndpointRef]]. [[RpcEnv]] does not
* guarantee thread-safety.
* 注册一个RpcEndpoint到RpcEnv中,RpcEnv来管理RpcEndpoint到RpcEndpointRef的绑定关系
* 每个RpcEndpoint都有一个唯一的名称,并返回RpcEndpointRef,但不保证线程安全
*/
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef

/**
* Retrieve the [[RpcEndpointRef]] represented by `uri` asynchronously.
* 通过uri异步查询RpcEndpointRe
*/
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]

/**
* Retrieve the [[RpcEndpointRef]] represented by `uri`. This is a blocking action.
* 通过uri查询RpcEndpointRef,这种方式会产生阻塞
*/
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}

/**
* Retrieve the [[RpcEndpointRef]] represented by `address` and `endpointName`.
* This is a blocking action.
*  通过address和endpointName查询RpcEndpointRef,这种方式会产生阻塞
*/
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
}

/**
* Stop [[RpcEndpoint]] specified by `endpoint`.
*/
def stop(endpoint: RpcEndpointRef): Unit

/**
* Shutdown this [[RpcEnv]] asynchronously. If need to make sure [[RpcEnv]] exits successfully,
* call [[awaitTermination()]] straight after [[shutdown()]].
*/
def shutdown(): Unit

/**
* Wait until [[RpcEnv]] exits.
*
* TODO do we need a timeout parameter?
*/
def awaitTermination(): Unit

/**
* [[RpcEndpointRef]] cannot be deserialized without [[RpcEnv]]. So when deserializing any object
* that contains [[RpcEndpointRef]]s, the deserialization codes should be wrapped by this method.
* 没有RpcEnv的话RpcEndpointRef是无法被反序列化的,这里是反序列化逻辑
*/
def deserialize[T](deserializationAction: () => T): T

/**
* Return the instance of the file server used to serve files. This may be `null` if the
* RpcEnv is not operating in server mode.
*/
def fileServer: RpcEnvFileServer

/**
* Open a channel to download a file from the given URI. If the URIs returned by the
* RpcEnvFileServer use the "spark" scheme, this method will be called by the Utils class to
* retrieve the files.
*
* @param uri URI with location of the file.
*/
def openChannel(uri: String): ReadableByteChannel
}

/**
* A server used by the RpcEnv to server files to other processes owned by the application.
*
* The file server can return URIs handled by common libraries (such as "http" or "hdfs"), or
* it can return "spark" URIs which will be handled by `RpcEnv#fetchFile`.
*/
private[spark] trait RpcEnvFileServer {

/**
* Adds a file to be served by this RpcEnv. This is used to serve files from the driver
* to executors when they're stored on the driver's local file system.
*
* @param file Local file to serve.
* @return A URI for the location of the file.
*/
def addFile(file: File): String

/**
* Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using
* `SparkContext.addJar`.
*
* @param file Local file to serve.
* @return A URI for the location of the file.
*/
def addJar(file: File): String

/**
* Adds a local directory to be served via this file server.
*
* @param baseUri Leading URI path (files can be retrieved by appending their relative
*                path to this base URI). This cannot be "files" nor "jars".
* @param path Path to the local directory.
* @return URI for the root of the directory in the file server.
*/
def addDirectory(baseUri: String, path: File): String

/** Validates and normalizes the base URI for directories. */
protected def validateDirectoryUri(baseUri: String): String = {
val fixedBaseUri = "/" + baseUri.stripPrefix("/").stripSuffix("/")
require(fixedBaseUri != "/files" && fixedBaseUri != "/jars",
"Directory URI cannot be /files nor /jars.")
fixedBaseUri
}

}

private[spark] case class RpcEnvConfig(
conf: SparkConf,
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
securityManager: SecurityManager,
clientMode: Boolean)


Dispatcher:类似一个消息分发器,对于RPC端点需要发送或者远程RPC介绍到的消息分发至对应的指令收件箱/发件箱,如果指令接收方是自己存入收件箱,如果指令接收方为非自身端点,则放入发件箱

//一个RpcEndpoint只能注册一次(根据RpcEndpoint的名称来检查唯一性)
// 这样在Dispatcher内部注册并维护RpcEndpoint与RpcEndpointRef的绑定关系
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
val data = endpoints.get(name)
endpointRefs.put(data.endpoint, data.ref)
receivers.offer(data)  // for the OnStart message
}
endpointRef
}


Inbox :一个本地端点对应一个收件箱,Dispatcher在每次向Inbox存入消息时都会将对应EndpointData加入内部待Receiver Queue中,另外Dispatcher创建时会启动一个单独线程进行轮询Receiver Queue,进行收件箱消息消费

/**
* inbox 为一个RpcEndpoint保存消息和线程安全的向index post消息.
*/
private[netty] class Inbox(
val endpointRef: NettyRpcEndpointRef,
val endpoint: RpcEndpoint)
extends Logging {

inbox =>  // Give this an alias so we can use it more clearly in closures.

@GuardedBy("this")
protected val messages = new java.util.LinkedList[InboxMessage]()

/** True if the inbox (and its associated endpoint) is stopped. */
@GuardedBy("this")
private var stopped = false

/** Allow multiple threads to process messages at the same time. */
@GuardedBy("this")
private var enableConcurrent = false

/** The number of threads processing messages for this inbox. */
@GuardedBy("this")
private var numActiveThreads = 0

// OnStart should be the first message to process
inbox.synchronized {
messages.add(OnStart)
}

/**
* Process stored messages.
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
return
}
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}

case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})

case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}

case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")

case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)

case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)

case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}

inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}

def post(message: InboxMessage): Unit = inbox.synchronized {
if (stopped) {
// We already put "OnStop" into "messages", so we should drop further messages
onDrop(message)
} else {
messages.add(message)
false
}
}

def stop(): Unit = inbox.synchronized {
// The following codes should be in `synchronized` so that we can make sure "OnStop" is the last
// message
if (!stopped) {
// We should disable concurrent here. Then when RpcEndpoint.onStop is called, it's the only
// thread that is processing messages. So `RpcEndpoint.onStop` can release its resources
// safely.
enableConcurrent = false
stopped = true
messages.add(OnStop)
// Note: The concurrent events in messages will be processed one by one.
}
}

def isEmpty: Boolean = inbox.synchronized { messages.isEmpty }

/**
* Called when we are dropping a message. Test cases override this to test message dropping.
* Exposed for testing.
*/
protected def onDrop(message: InboxMessage): Unit = {
logWarning(s"Drop $message because $endpointRef is stopped")
}

/**
* Calls action closure, and calls the endpoint's onError function in the case of exceptions.
*/
private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
try action catch {
case NonFatal(e) =>
try endpoint.onError(e) catch {
case NonFatal(ee) => logError(s"Ignoring error", ee)
}
}
}

}


OutBox:一个远程端点对应一个发件箱,当消息放入Outbox后,紧接着将消息通过TransportClient发送出去,在同一个线程中进行,原因为远程消息分为RpcOutboxMessage, OneWayOutboxMessage两种消息,而针对于需要应答的消息直接发送更加合适

TransportClient:Netty通信客户端,根据outbox的消息的receiver请求对应远程TransportServer,

TransportServer:Netty通信服务端,一个RPC端点一个TransportServer,接受远程消息后调用Dispatcher分发消息至对应收发件箱

创建NettyRpcEnv环境对象

创建一个NettyRpcEnv对象对象,需要通过NettyRpcEnvFactory来创建

Dispatcher负责RPC消息的路由,它能够将消息路由到对应的RpcEndpoint进行处理

NettyStreamManager负责提供文件服务(文件、JAR文件、目录)

NettyRpcHandler负责处理网络IO事件,接收RPC调用请求,并通过Dispatcher派发消息

TransportContext负责管理网路传输上下文信息:创建MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer

TransportServer配置并启动一个RPC Server服务

消息路由过程分析:

在standalone模式中master节点的实现

private[deploy] class Master(
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
}


/**
* A trait that requires RpcEnv thread-safely sending messages to it.
*
* Thread-safety means processing of one message happens before processing of the next message by
* the same [[ThreadSafeRpcEndpoint]]. In the other words, changes to internal fields of a
* [[ThreadSafeRpcEndpoint]] are visible when processing the next message, and fields in the
* [[ThreadSafeRpcEndpoint]] need not be volatile or equivalent.
*
* However, there is no guarantee that the same thread will be executing the same
* [[ThreadSafeRpcEndpoint]] for different messages.
*/
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint


由此也已看出来,master类继承自ThreadSafeRpcEndpoint,ThreadSafeRpcEndpoint是继承自RpcEndpoint特质。

master节点启动的时候;

/**
* Start the Master and return a three tuple of:
*   (1) The Master RpcEnv
*   (2) The web UI bound port
*   (3) The REST server bound port, if any
*   创建一个RpcEnv对象,通过创建一个NettyRpcEnvFactory对象来完成该RpcEnv对象的创建,
*   实际创建了一个NettyRpcEnv对象。接着,通过setupEndpoint方法注册一个RpcEndpoint,
*   这里Master就是一个RpcEndpoint,返回的masterEndpoint是Master的RpcEndpointRef引用对象。
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}


在master节点的receiveAndReply中,处理的第一条就是处理worker节点的注册信息,worker是通过将自己注册到master的RpcEnv中来实现master同worker通讯

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}

case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()

// TODO: It might be good to instead have the submission client poll the master to determine
//       the current status of the driver. For now it's simply "fire and forget".

context.reply(SubmitDriverResponse(self, true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}"))
}

case RequestKillDriver(driverId) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
s"Can only kill drivers in ALIVE state."
context.reply(KillDriverResponse(self, driverId, success = false, msg))
} else {
logInfo("Asked to kill driver " + driverId)
val driver = drivers.find(_.id == driverId)
driver match {
case Some(d) =>
if (waitingDrivers.contains(d)) {
waitingDrivers -= d
self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
} else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.
d.worker.foreach { w =>
w.endpoint.send(KillDriver(driverId))
}
}
// TODO: It would be nice for this to be a synchronous response
val msg = s"Kill request for $driverId submitted"
logInfo(msg)
context.reply(KillDriverResponse(self, driverId, success = true, msg))
case None =>
val msg = s"Driver $driverId has already finished or does not exist"
logWarning(msg)
context.reply(KillDriverResponse(self, driverId, success = false, msg))
}
}

case RequestDriverStatus(driverId) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only request driver status in ALIVE state."
context.reply(
DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))))
} else {
(drivers ++ completedDrivers).find(_.id == driverId) match {
case Some(driver) =>
context.reply(DriverStatusResponse(found = true, Some(driver.state),
driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception))
case None =>
context.reply(DriverStatusResponse(found = false, None, None, None, None))
}
}

case RequestMasterState =>
context.reply(MasterStateResponse(
address.host, address.port, restServerBoundPort,
workers.toArray, apps.toArray, completedApps.toArray,
drivers.toArray, completedDrivers.toArray, state))

case BoundPortsRequest =>
context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))

case RequestExecutors(appId, requestedTotal) =>
context.reply(handleRequestExecutors(appId, requestedTotal))

case KillExecutors(appId, executorIds) =>
val formattedExecutorIds = formatExecutorIds(executorIds)
context.reply(handleKillExecutors(appId, formattedExecutorIds))
}


对于worker节点:

private[deploy] class Worker(
override val rpcEnv: RpcEnv,
webUiPort: Int,
cores: Int,
memory: Int,
masterRpcAddresses: Array[RpcAddress],
endpointName: String,
workDirPath: String = null,
val conf: SparkConf,
val securityMgr: SecurityManager)
extends ThreadSafeRpcEndpoint with Logging {
//worker节电注册到master节点,发送一个RegisterWorker消息到masterEndpoint,并期望返回RegisterWorkerResponse
//对response做相应的处理,
private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {
handleRegisterResponse(msg)
}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}
/**
* Send a message to the current master. If we have not yet registered successfully with any
* master, the message will be dropped.
* send一个消息到当前的master节点,如果还没有成功注册则消息会被dropped
* 主要会有一些心跳发送,状态改变等等
*/
private def sendToMaster(message: Any): Unit = {
master match {
case Some(masterRef) => masterRef.send(message)
case None =>
logWarning(
s"Dropping $message because the connection to master has not yet been established")
}
}
}


同样是继承了ThreadSafeRpcEndpoint,而ThreadSafeRpcEndpoint继承自RpcEndpoint

private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint


worker节点在启动的时候

def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf): RpcEnv = {

// The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
rpcEnv
}


work节点处理同master节点类似,

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: