您的位置:首页 > 移动开发

spark源码学习(十五)--- application注册机制分析

2017-10-15 21:16 507 查看
首先看master的定义,继承自actor,说明是一个线程。

private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends Actor with ActorLogReceive with Logging with LeaderElectable {

找到处理appliction注册的请求:
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
//如果当前是standbymaster,那么application来请求注册什么都不做
// ignore, don't send response
} else {
//active状态
logInfo("Registering app " + description.name)
//使用description创建appliction对象
val app = createApplication(description, sender)
//注册appliction,将applictionInfo加入缓存,讲appliction加入等待调度的队列
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//使用持久化引擎持久化appliction信息
persistenceEngine.addApplication(app)
//actor之间发信息的方法:反向向sparkdeployschedulerbackend的appclint的Clientactor
//发送消息,注意方法是RegisteredApplication,Registered而不是Register
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
}

上面调用了createappliction,里面的参数descrption封装了appliction需要的资源信息,core,mem等。。。

private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {

还是createappliction方法:
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
}使用时间,descrption创建了一个applictionInfo对象。返回付给app对象。然后调用注册方法:registerApplication(app)。
def registerApplication(app: ApplicationInfo): Unit = {
//首先拿到dirver的地址
val appAddress = app.driver.path.address
if (addressToApp.contains(appAddress)) {
//如果该driver地址已经存在,认为是重复注册,那么就直接返回
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}

applicationMetricsSystem.registerSource(app.appSource)
//------------将driver信息加入到内存缓存中
//HashSet,去重app
apps += app
//hashmap
idToApp(app.id) = app
//HashMap
actorToApp(app.driver) = app
addressToApp(appAddress) = app
//------------将app加入等待调度的缓存队列,waitingApps是ArrayBuffer,fifo的一个线程队列
waitingApps += app
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: