您的位置:首页 > 其它

Spark通信机制:1)Spark1.3 vs Spark1.6源码分析

2017-01-06 09:56 351 查看
前一段时间看了Spark1.3的源码,其RPC通信机制是基于Akka的,但是在Spark1.6中,提供了2种实现方式:Netty(默认)、Akka

下面对比Spark1.3的Akka通信机制,看下Spark1.6中Master是如何与Worker进行通信。

首先看下Spark1.6中的Master类

private[deploy] class Master( //v1.6
override val rpcEnv: RpcEnv,
address: RpcAddress,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends ThreadSafeRpcEndpoint with Logging with LeaderElectable


private[spark] class Master( //v1.3
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends Actor with ActorLogReceive with Logging with LeaderElectable
Spark1.3中Master直接继承Akka.Actor,而Spark1.6中Maste继承了特质ThreadSafeRpcEndpoint,看下特质ThreadSafeRpcEndpoint
private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint //v1.6
ThreadSafeRpcEndpoint又继承特质RpcEndpoint,RpcEndpoint算法比较重要,其部分代码如下

private[spark] trait RpcEndpoint { //v1.6
//只保留我们关心的代码
/**
* The [[RpcEnv]] that this [[RpcEndpoint]] is registered to.
*/
val rpcEnv: RpcEnv

/**
* The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
* called. And `self` will become `null` when `onStop` is called.
*
* Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
* valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
*/
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}

/**
* Process messages from [[RpcEndpointRef.send]] or [[RpcCallContext.reply)]]. If receiving a
* unmatched message, [[SparkException]] will be thrown and sent to `onError`.
*/
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}

/**
* Invoked before [[RpcEndpoint]] starts to handle any message.
*/
def onStart(): Unit = {
// By default, do nothing.
}
}

RpcEndpoint 里面有个关键的成员RpcEnv,RpcEndpoint 需要向RpcEnv注册一个名字来收发消息,这个特质后面会详细介绍。RpcEndpoint 里的方法其是为例兼容akka和netty设计的,其可以类比Akka.Acotr:

preStart  <---->  onStart  类启动时调用

receive或receiveAndReply <----> receiveWithLogging  接收消息使用

再回到Master的main方法中

def main(argStrings: Array[String]) {  //v1.6
SignalLogger.register(log)
val conf = new SparkConf
val args = new MasterArguments(argStrings, conf)
val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
rpcEnv.awaitTermination()
}
其调用了startRpcEnvAndEndpoint创建了一个rpcEnv的实例,下面看下Master中的startRpcEnvAndEndpoint方法
/**
* Start the Master and return a three tuple of:
*   (1) The Master RpcEnv
*   (2) The web UI bound port
*   (3) The REST server bound port, if any
*/
def startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
val securityMgr = new SecurityManager(conf)
val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
(rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}
}
其调用了RpcEnv.create创建了一个rpcEnv,接着去RpcEnv.create中

def create( //v1.6
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv = {
// Using Reflection to create the RpcEnv to avoid to depend on Akka directly
val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)
getRpcEnvFactory(conf).create(config)
}
}
首先它getRpcEnvFactory通过读取配置文件获得RpcEnvFactory来创建rpcEnv,getRpcEnvFactory方法代码:
private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
val rpcEnvNames = Map(
"akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
"netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
val rpcEnvName = conf.get("spark.rpc", "netty")
val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}
可以看到这里的工厂有2种实现:akka和netty,可以在配置文件中指导使用哪种通信机制,最后使用反射的机制创建了RpcEnvFactory(事实上,是其子类)。接下来看下这个RpcEnvFactory工厂

private[spark] trait RpcEnvFactory {

def create(config: RpcEnvConfig): RpcEnv
}
其设计很简单只有一个方法,下面分别看下它的2个实现类AkkaRpcEnvFactory和NettyRpcEnvFactory 
private[spark] class AkkaRpcEnvFactory extends RpcEnvFactory {

def create(config: RpcEnvConfig): RpcEnv = {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
config.name, config.host, config.port, config.conf, config.securityManager)
actorSystem.actorOf(Props(classOf[ErrorMonitor]), "ErrorMonitor")
new AkkaRpcEnv(actorSystem, config.securityManager, config.conf, boundPort)
}
}
private[netty] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {

def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
}


在抽象层的基础上,相比较Spark1.3的Akka.Actor,看看Spark1.6中Master是如何与Worker通信的

在Spark1.3中Worker的presStart方法最终调用了tryRegisterAllMasters向Master注册,其代码如下

private def tryRegisterAllMasters() { //v1.3
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
在Master端,Master在receiveWithLogging方法中接收到了Worker的注册消息,并给Worker返回了一个注册成功的反馈
override def receiveWithLogging = {//v1.3
//只保留我们关心的代码......
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
//通知worker注册成功
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
}
}
}


在Spark1.6中Worker的onStart方法调用了registerWithMaster方法向Master进行注册,具体看下registerWithMaster
private def registerWithMaster() {//v1.6
//只保留我们关心的代码......
registrationRetryTimer match {
case None =>
registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReregisterWithMaster))
}
},
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
TimeUnit.SECONDS))
case Some(_) =>
logInfo("Not spawning another attempt to register with the master, since there is an" +
" attempt scheduled already.")
}
}
里面使用了一个线程池调度器来向可能的Master发送注册的消息,这里的self是一个方法
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
self用来获取RPC通信的实例对象,registerWithMaster中的send方法类似于Spark1.3中的 actor ! RegisterWorker

在Master中看下其是如何接受处理这个注册请求的,具体在Master.receiveAndReply中
//receive只处理消息,而receiveAndReply处理消息并反馈
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
context.reply(MasterInStandby)
} else if (idToWorker.contains(id)) {
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
//通知worker注册成功
context.reply(RegisteredWorker(self, masterWebUiUrl))
schedule()
} else {
val workerAddress = worker.endpoint.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress))
}
}
}
//.....
}


后话:Spark1.6只是把通信机制进行了更高层次的抽象,核心的DAGScheduler和TaskSheduler并没有本质上的改变。熟悉Spark1.3的源码依然可以看懂Spark1.6或者Spark2.0的源码
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  SPARK