spark源码学习(十五)--- application注册机制分析
2017-10-15 21:16
507 查看
首先看master的定义,继承自actor,说明是一个线程。
private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends Actor with ActorLogReceive with Logging with LeaderElectable {
找到处理appliction注册的请求:
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
//如果当前是standbymaster,那么application来请求注册什么都不做
// ignore, don't send response
} else {
//active状态
logInfo("Registering app " + description.name)
//使用description创建appliction对象
val app = createApplication(description, sender)
//注册appliction,将applictionInfo加入缓存,讲appliction加入等待调度的队列
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//使用持久化引擎持久化appliction信息
persistenceEngine.addApplication(app)
//actor之间发信息的方法:反向向sparkdeployschedulerbackend的appclint的Clientactor
//发送消息,注意方法是RegisteredApplication,Registered而不是Register
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
}
上面调用了createappliction,里面的参数descrption封装了appliction需要的资源信息,core,mem等。。。
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {
还是createappliction方法:
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
}使用时间,descrption创建了一个applictionInfo对象。返回付给app对象。然后调用注册方法:registerApplication(app)。
def registerApplication(app: ApplicationInfo): Unit = {
//首先拿到dirver的地址
val appAddress = app.driver.path.address
if (addressToApp.contains(appAddress)) {
//如果该driver地址已经存在,认为是重复注册,那么就直接返回
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
//------------将driver信息加入到内存缓存中
//HashSet,去重app
apps += app
//hashmap
idToApp(app.id) = app
//HashMap
actorToApp(app.driver) = app
addressToApp(appAddress) = app
//------------将app加入等待调度的缓存队列,waitingApps是ArrayBuffer,fifo的一个线程队列
waitingApps += app
}
private[spark] class Master(
host: String,
port: Int,
webUiPort: Int,
val securityMgr: SecurityManager,
val conf: SparkConf)
extends Actor with ActorLogReceive with Logging with LeaderElectable {
找到处理appliction注册的请求:
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
//如果当前是standbymaster,那么application来请求注册什么都不做
// ignore, don't send response
} else {
//active状态
logInfo("Registering app " + description.name)
//使用description创建appliction对象
val app = createApplication(description, sender)
//注册appliction,将applictionInfo加入缓存,讲appliction加入等待调度的队列
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
//使用持久化引擎持久化appliction信息
persistenceEngine.addApplication(app)
//actor之间发信息的方法:反向向sparkdeployschedulerbackend的appclint的Clientactor
//发送消息,注意方法是RegisteredApplication,Registered而不是Register
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
}
上面调用了createappliction,里面的参数descrption封装了appliction需要的资源信息,core,mem等。。。
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[String] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
val eventLogCodec: Option[String] = None)
extends Serializable {
还是createappliction方法:
def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
}使用时间,descrption创建了一个applictionInfo对象。返回付给app对象。然后调用注册方法:registerApplication(app)。
def registerApplication(app: ApplicationInfo): Unit = {
//首先拿到dirver的地址
val appAddress = app.driver.path.address
if (addressToApp.contains(appAddress)) {
//如果该driver地址已经存在,认为是重复注册,那么就直接返回
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
applicationMetricsSystem.registerSource(app.appSource)
//------------将driver信息加入到内存缓存中
//HashSet,去重app
apps += app
//hashmap
idToApp(app.id) = app
//HashMap
actorToApp(app.driver) = app
addressToApp(appAddress) = app
//------------将app加入等待调度的缓存队列,waitingApps是ArrayBuffer,fifo的一个线程队列
waitingApps += app
}
相关文章推荐
- Spark2.X源码学习--从SparkContext创建分析AppClient注册
- Spark2.2源码之Application注册机制
- spark源码学习(二)---Master源码分析(2)-master内组件状态改变机制
- spark源码学习(十二)--- checkpoint机制分析
- spark源码分析之master注册机制篇
- Spark源码分析之Master注册机制原理
- Android开发学习之路-Handler消息派发机制源码分析
- spark源码学习(二)------------spark-shell启动分析
- Spark 消息队列机制源码学习
- Master原理剖析与源码分析:注册机制原理剖析与源码分析
- spark源码学习(八)--- executor启动task分析
- android消息处理机制学习(三)-Handler,Message,MessageQueue,Looper源码分析
- spark源码学习(三):job的提交以及runJob函数的分析
- Vue学习之源码分析--Vue.js事件机制(四)
- ExtJs源码分析与学习—ExtJs事件机制(四)
- spark源码学习(二)---Master源码分析(3)-master对driver、executor的调度
- Spark源码分析之Worker启动通信机制
- Spark组件之GraphX学习5--随机图生成和消息发送aggregateMessages以及mapreduce操作(含源码分析)
- Spark通信机制:1)Spark1.3 vs Spark1.6源码分析
- Ceph学习——Ceph网络通信机制与源码分析