Spark 源码阅读(6)——Master接收到ClientActor后,进行worker的资源分配
2018-03-18 21:57
393 查看
看一下appActor的preStart方法
def registerWithMaster() {
tryRegisterAllMasters()
import context.dispatcher
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
}
}
}看一下tryRegisterAllMaster:
def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}上面的代码中,遍历出所有的masterAkkaUrl 获取master的Actor连接,向master发送一个注册消息,我们去master中找到匹配的消息。
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
}createApplictaion主要是创建app的描述,放到内存,sender指的是clientActor。
override def preStart() { context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) try { registerWithMaster() } catch { case e: Exception => logWarning("Failed to connect to master", e) markDisconnected() context.stop(self) } }调用registerWithMaster向Master注册信息:
def registerWithMaster() {
tryRegisterAllMasters()
import context.dispatcher
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
}
}
}看一下tryRegisterAllMaster:
def tryRegisterAllMasters() {
for (masterAkkaUrl <- masterAkkaUrls) {
logInfo("Connecting to master " + masterAkkaUrl + "...")
val actor = context.actorSelection(masterAkkaUrl)
actor ! RegisterApplication(appDescription)
}
}上面的代码中,遍历出所有的masterAkkaUrl 获取master的Actor连接,向master发送一个注册消息,我们去master中找到匹配的消息。
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
}createApplictaion主要是创建app的描述,放到内存,sender指的是clientActor。
相关文章推荐
- IMF SPARK 源代码发行定制班 预习课程 Spark框架源码的调试 (2) 从master worker main入口进行调试
- spark调度系列----1. spark stanalone模式下Master对worker上各个executor资源的分配
- spark源码分析--Master和worker建立连接
- spark源码分析之master资源调度schedule篇
- Spark Master 如何分配集群资源?
- [Spark内核] 第31课:Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- Spark资源调度分配内幕解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- Spark技术内幕:Client,Master和Worker 通信源码解析
- Spark技术内幕:Client,Master和Worker 通信源码解析
- 【原】Spark不同运行模式下资源分配源码解读
- Spark2.2 Worker、Driver和Executor向Master注册原理剖析图解及源码
- 第31课: Spark资源调度分配内幕天机彻底解密:Driver在Cluster模式下的启动、两种不同的资源调度方式源码彻底解析、资源调度内幕总结
- spark源码分析Master与Worker启动流程篇
- Spark技术内幕:Client,Master和Worker 通信源码解析
- spark学习-Master资源调度分配算法
- Spark源码分析之Master资源调度算法原理
- Spark集群启动之Master、Worker启动流程源码分析
- spark 1.3.1 master 源码初探01 分类: spark 2015-07-08 22:15 386人阅读 评论(0) 收藏
- Spark1.3从创建到提交:4)资源分配源码分析