您的位置:首页 > 其它

【原】Spark中Client源码分析(一)

2016-03-23 20:47 399 查看
在Spark Standalone中我们所谓的Client,它的任务其实是由AppClient和DriverClient共同完成的。AppClient是一个允许app(Client)和Spark集群通信的中间人,接受master URL、app的信息、一个集群事件的监听器以及事件监听的回调函数,主要和Master交互App相关的信息,DriverClient主要用于和Master交互Driver相关的信息,比如启动、停止及运行状况等,本篇先介绍AppClient。

1.AppClient类主要字段、方法如下:



由上图我们可以知道,ClientEndpoint是作为AppClient的一个私有类存在的。

(1)stop方法如下所示,主要用于向master发送消息,停止并注销app。

def stop() {

if (endpoint != null) {

try {

//返回Rpc ask的超时时间120s

val timeout = RpcUtils.askRpcTimeout(conf)

//client向master发送注销app的信息,在120s内如果不响应,那么将抛RpcTimeoutException

timeout.awaitResult(endpoint.askBoolean)

} catch {

case e: TimeoutException =>

logInfo("Stop request to Master timed out; it may already be shut down.")

}

endpoint = null

}

}


下面我们重点看ClientEndpoint,它是线程安全的。

2.ClientEndpoint

2.1属性

(1)//设置一个boolean标识,用于避免多次调用listener.disconnected()

private var alreadyDisconnected = false

(2)//app向master申请注册的线程池,因为被maser注册是一个阻塞操作,所以线程池的个数是"masterRpcAddresses.size",这样app就能同时被所有的master注册

private val registerMasterThreadPool = new ThreadPoolExecutor(

0,

masterRpcAddresses.size, // Make sure we can register with all masters at the same time

60L, TimeUnit.SECONDS,

new SynchronousQueueRunnable,

ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))

(3)一个守护单线程用于申请注册操作

private val registrationRetryThread =

ThreadUtils.newDaemonSingleThreadScheduledExecutor("appclient-registration-retry-thread")

2.2方法

(1)构造函数为ClientEndpoint的主构造器。

(2)onStart方法,用于将App注册到所有的Master上

override def onStart(): Unit = {

try {

//“1”表示第几次注册,最大次数不超过3次,第n次申请注册到master上

registerWithMaster(1)详见下①

} catch {

case e: Exception =>

logWarning("Failed to connect to master", e)

//监听器停止并将boolen状态标识设置为true

markDisconnected()

//停止rpcendpoint

stop()

}

}


①registerWithMaster方法如下,用于异步注册到所有的master上,如果没有超过再次注册的次数(3次),那么每20s将会重新调用该方法申请注册,如果注册成功,所有的调用work和futures将会被取消。

private def registerWithMaster(nthRetry: Int) {

registerMasterFutures = tryRegisterAllMasters()

registrationRetryTimer = registrationRetryThread.scheduleAtFixedRate(new Runnable {

override def run(): Unit = {

Utils.tryOrExit {

if (registered) {

registerMasterFutures.foreach(.cancel(true))

registerMasterThreadPool.shutdownNow()

} else if (nthRetry >= REGISTRATION_RETRIES) {

markDead("All masters are unresponsive! Giving up.")

} else {

registerMasterFutures.foreach(.cancel(true))

registerWithMaster(nthRetry + 1)

}

}

}

}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)

}


(3)onstop方法如下,释放资源。

override def onStop(): Unit = {

if (registrationRetryTimer != null) {

registrationRetryTimer.cancel(true)

}

registrationRetryThread.shutdownNow()

registerMasterFutures.foreach(_.cancel(true))

registerMasterThreadPool.shutdownNow()

}


(4)receive方法,receive接受到的消息分为5种,分别为

(1)app被master成功注册,并将注册成功的app添加到监听器中

case RegisteredApplication(appId_, masterRef) => appId = appId_ registered = true master = Some(masterRef) listener.connected(appId)


(2)移除app,停止rpcendpoint

case ApplicationRemoved(message) => markDead("Master removed our application: %s".format(message))

stop()


(3)向master申请为app添加executor,并添加到监听器中

case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>val fullId = appId + "/" + id

logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))

sendToMaster(ExecutorStateChanged(appId, id, ExecutorState.RUNNING, None, None))

listener.executorAdded(fullId, workerId, hostPort, cores, memory)


(4)Executor的信息发生改变,记录到日志中
case ExecutorUpdated(id, state, message, exitStatus) =>

val fullId = appId + "/" + id

val messageText = message.map(s => " (" + s + ")").getOrElse("")

logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))

if (ExecutorState.isFinished(state)) {

listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) }


(5)HA机制,为app更换master

case MasterChanged(masterRef, masterWebUiUrl) =>

logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)

master = Some(masterRef)

alreadyDisconnected = false

masterRef.send(MasterChangeAcknowledged(appId))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: