Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (上)
2016-09-24 16:46
483 查看
参考了一篇文章, 才看懂了Yarnrunner的整个流程:
http://blog.csdn.net/caodaoxi/article/details/12970993
网上很多文章写的不是很清楚, 有些到AM的启动的时候就错了, 结合最近俩星期看的Yarnrunner部分的源码 我把我的理解写下来, 打算分三部分
上: SubmitJob到schduler为ApplicationMaster申请Container
中:AM到NodeManager启动container 至在NM端执行APPMaster的脚本
下:启动container去执行Job的过程
可能中间也会有一些理解的错误, 导致文章并不对, 希望有人能指出来, 毕竟写个博客本身就是为了记录自己对大数据各种工具的学习过程
先借用上面文章的一部分, 作为一些需要先了解的内容, 不然完全不知道代码在讲什么:
(1)RMApp:每个application对应一个RMApp对象,保存该application的各种信息。
(2)RMAppAttempt:每个RMApp可能会对应多个RMAppAttempt对象,这取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功。RMAppAttempt对象称为“application执行尝试”,这RMApp与RMAppAttempt关系类似于MapReduce中的task与taskAttempt的关系。
(3)RMNode:保存各个节点的信息。
(4)RMContainer:保存各个container的信息。
2. 事件调度器
(1)AsyncDispatcher
中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册,注册方式信息包括:<事件,事件调度器>。该调度器维护了一个事件队列,它会不断扫描整个队列,取出一个事件,检查事件类型,并交给相应的事件调度器处理。
(2)各个子事件调度器
事件类型 状态机 事件处理器
RMAppEvent RMApp ApplicationEventDispatcher
RMAppAttemptEvent RMAppAttempt ApplicationAttemptEventDispatcher
RMNodeEvent RMNode NodeEventDispatcher
SchedulerEvent — SchedulerEventDispatcher
AMLauncherEvent — ApplicationMasterLauncher
好了, 首先我们得从SubmitJob来讲起, 入口是job.waitForCompletion方法, 深入的话会看到最后是调用JobSubmitter类的submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); 来提交的。 submitClient的申明是ClientProtocol接口, 一共有两个实现类:
LocalRunner
YarnRunner
我之前有一篇文章写得是从LocalRunner这边进入, 然后最后是怎么执行Job的。
Yarn的话就是就是通过YarnRunner执行的
我们看一下YarnRunner的submitJob方法:
主要就是创建了ApplicationSubmissionContext对象, 以及通过ResourceMgrDelegate去提交ApplicationSubmissionContext
上面讲到了ApplicationSubmissionContext里面存储了启动Application Master的类的脚本, 那么就稍微看一下:
主要里面都是拼接各种变量和命令的, 所以就略去了大部分, 就留下了最重要的那句:
ApplicationSubmissionContext只需要记住amContainer的启动脚本在里面, 后面会用到。 那么继续看一下ResourceMgrDelegate的submitApplication:
他是通过client来submit的, 这个client实在ResourceMgrDelegate的构造函数里面创建的, 其实就是一个YarnClientImpl对象:
到目前为止, 所有的内容都还是在提交Job的 4000 那台Client机器上, 还没有到ResourceManger那边。
我们看一下YarnClientImpl的submitApplication:
看一下rmClient也就是ClientRMService的submitApplication:
来看一下rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);的具体内容:
在文章的开头有写“事件调度器”, 在resourcemanager那边会有AsyncDispatcher来调度所有事件, 这里的话会通过ApplicationEventDispatcher去做RmAppImpl的transition方法, 看一下RmAppImpl类的初始化的时候的各种event和transition:
我们看一下RMAppNewlySavingTransition里面做了什么:
只做了storeNewApplication(app)这个动作:
RMStateStore的状态机transition定义:
继续看下去StoreAppTransition:
在RMAppImpl中我们可以看到RMAppEventType.APP_NEW_SAVED会触发AddApplicationToSchedulerTransition方法
看一下AddApplicationToSchedulerTransition:
我们到FifoScheduler的APP_ADDED事件看看:
会去调用scheduler的addApplication方法, 看一下:
那么我们就要再回到RMAPPImpl去看看RMAppEventType.APP_ACCEPTED做了什么:
会去执行StartAppAttemptTransition方法, 这个方法其实就是尝试去启动一次Application, 如果失败 还会尝试, 直到尝试的次数到达最大尝试次数为止
看一下StartAppAttemptTransition方法:
这里就会去RMAPPAttemptImpl 里面去触发RMAppAttemptEventType.START事件, 看一下这个状态机里面的transition是怎么样的:
看一下本类的AttemptStartedTransition方法:
我们回到FifoScheduler里面看一下APP_ATTEMPT_ADDED做了什么:
那么接下来就又要回到MRAPPAttemptImpl去看RMAppAttemptEventType.ATTEMPT_ADDED事件了:
会去执行ScheduleTransition方法, 完成这一步后 这次尝试就会变成scheduled状态, 等着scheduler去assignContainer到nodemanager去了:
到目前为止AM的Allocation以及生成了, 接下去就是通过Nodemanager去分配container, 然后在NM上面启动Container (执行刚刚开始设置的脚本)
这部分会在第二篇文章里面再写, 这篇就到这里吧
http://blog.csdn.net/caodaoxi/article/details/12970993
网上很多文章写的不是很清楚, 有些到AM的启动的时候就错了, 结合最近俩星期看的Yarnrunner部分的源码 我把我的理解写下来, 打算分三部分
上: SubmitJob到schduler为ApplicationMaster申请Container
中:AM到NodeManager启动container 至在NM端执行APPMaster的脚本
下:启动container去执行Job的过程
可能中间也会有一些理解的错误, 导致文章并不对, 希望有人能指出来, 毕竟写个博客本身就是为了记录自己对大数据各种工具的学习过程
先借用上面文章的一部分, 作为一些需要先了解的内容, 不然完全不知道代码在讲什么:
(1)RMApp:每个application对应一个RMApp对象,保存该application的各种信息。
(2)RMAppAttempt:每个RMApp可能会对应多个RMAppAttempt对象,这取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功。RMAppAttempt对象称为“application执行尝试”,这RMApp与RMAppAttempt关系类似于MapReduce中的task与taskAttempt的关系。
(3)RMNode:保存各个节点的信息。
(4)RMContainer:保存各个container的信息。
2. 事件调度器
(1)AsyncDispatcher
中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册,注册方式信息包括:<事件,事件调度器>。该调度器维护了一个事件队列,它会不断扫描整个队列,取出一个事件,检查事件类型,并交给相应的事件调度器处理。
(2)各个子事件调度器
事件类型 状态机 事件处理器
RMAppEvent RMApp ApplicationEventDispatcher
RMAppAttemptEvent RMAppAttempt ApplicationAttemptEventDispatcher
RMNodeEvent RMNode NodeEventDispatcher
SchedulerEvent — SchedulerEventDispatcher
AMLauncherEvent — ApplicationMasterLauncher
好了, 首先我们得从SubmitJob来讲起, 入口是job.waitForCompletion方法, 深入的话会看到最后是调用JobSubmitter类的submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); 来提交的。 submitClient的申明是ClientProtocol接口, 一共有两个实现类:
LocalRunner
YarnRunner
我之前有一篇文章写得是从LocalRunner这边进入, 然后最后是怎么执行Job的。
Yarn的话就是就是通过YarnRunner执行的
我们看一下YarnRunner的submitJob方法:
主要就是创建了ApplicationSubmissionContext对象, 以及通过ResourceMgrDelegate去提交ApplicationSubmissionContext
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {
addHistoryToken(ts);
// Construct necessary information to start the MR AM
//这个appContext很重要, 里面拼接了各种环境变量, 以及启动App Master的脚本 这个对象会一直贯穿于各个类之间, 直到AM启动
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);
// Submit to ResourceManager
try {
//通过ResourceMgrDelegate来sumbit这个appContext, ResourceMgrDelegate类是用来和Resource Manager在通讯的
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);
//这个appMaster并不是我们说的ApplicationMaster对象, 这样的命名刚开始也把我迷惑了。。。
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}
上面讲到了ApplicationSubmissionContext里面存储了启动Application Master的类的脚本, 那么就稍微看一下:
主要里面都是拼接各种变量和命令的, 所以就略去了大部分, 就留下了最重要的那句:
public ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf,
String jobSubmitDir, Credentials ts) throws IOException {
...
//这里才是设定Appmaster类的地方, MRJobConfig.APPLICATION_MASTER_CLASS = org.apache.hadoop.mapreduce.v2.app.MRAppMaster
//所以最后通过命令在nodemanager那边执行的其实是MRAppMaster类的main方法
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDERR);
Vector<String> vargsFinal = new Vector<String>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
vargsFinal.add(mergedCommand.toString());
...
// Setup ContainerLaunchContext for AM container
//根据前面的拼接的命令生成AM的container 在后面会通过这个对象来启动container 从而启动MRAppMaster
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(localResources, environment,
vargsFinal, null, securityTokens, acls);
...
//设置AMContainer
appContext.setAMContainerSpec(amContainer);
...
return appContext;
}
ApplicationSubmissionContext只需要记住amContainer的启动脚本在里面, 后面会用到。 那么继续看一下ResourceMgrDelegate的submitApplication:
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
return client.submitApplication(appContext);
}
他是通过client来submit的, 这个client实在ResourceMgrDelegate的构造函数里面创建的, 其实就是一个YarnClientImpl对象:
public ResourceMgrDelegate(YarnConfiguration conf) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
this.client = YarnClient.createYarnClient();
init(conf);
start();
}
到目前为止, 所有的内容都还是在提交Job的 4000 那台Client机器上, 还没有到ResourceManger那边。
我们看一下YarnClientImpl的submitApplication:
public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException(
"ApplicationId is not provided in ApplicationSubmissionContext");
}
//将appContext设置到一个request里面
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);
// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
if (isSecurityEnabled() && timelineServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
//通过rmClient提交request, 这个rmClient其实就是ClientRMService类, 是用来和Resource Manager做RPC的call, 通过这个类, 可以直接和RM对话
rmClient.submitApplication(request);
int pollCount = 0;
long startTime = System.currentTimeMillis();
//一直循环, 直到状态变为NEW为止, 如果长时间状态没变, 那么就timeout
while (true) {
try {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}
long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}
// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application "
+ applicationId
+ " to be successfully submitted.");
}
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
// ApplicationState
LOG.info("Re-submit application " + applicationId + "with the " +
"same ApplicationSubmissionContext");
rmClient.submitApplication(request);
}
}
return applicationId;
}
看一下rmClient也就是ClientRMService的submitApplication:
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();
// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.
//开始各种验证 一不开心就不让干活
String user = null;
try {
// Safety
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
}
//各种验证
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}
//继续验证
if (submissionContext.getQueue() == null) {
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
if (submissionContext.getApplicationType() == null) {
submissionContext
.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
} else {
if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
submissionContext.setApplicationType(submissionContext
.getApplicationType().substring(0,
YarnConfiguration.APPLICATION_TYPE_LENGTH));
}
}
try {
// call RMAppManager to submit application directly
//干活 通过rmAppManager提交
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (YarnException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw e;
}
SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}
来看一下rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);的具体内容:
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
//创建一个RMAppImpl对象 其实就是启动RMApp状态机 以及执行RMAppEvent
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();
//如果有安全认证enable的话会走这里, 比如kerberos啥的 我就不这么麻烦了 以看懂为主, 直接到else
if (UserGroupInformation.isSecurityEnabled()) {
try {
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.
//启动RMApp的状态机, 这里rmContext其实是resourceManager的Client代理, 这一步就是让去RM端的dispatcher去处理RMAppEventType.START事件
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
}
在文章的开头有写“事件调度器”, 在resourcemanager那边会有AsyncDispatcher来调度所有事件, 这里的话会通过ApplicationEventDispatcher去做RmAppImpl的transition方法, 看一下RmAppImpl类的初始化的时候的各种event和transition:
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
...
//实在太长了, 到这里我们可以看到上面已经捕捉了RMAppEventType.START事件, 会把RMApp的状态从NEW变成NEW_SAVING, 调用RMAppNewlySavingTransition方法
我们看一下RMAppNewlySavingTransition里面做了什么:
private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
// communication
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
}
}
只做了storeNewApplication(app)这个动作:
public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationState appState =
new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
app.getUser());
//触发了RMStateStore的STORE_APP事件
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}
RMStateStore的状态机transition定义:
private static final StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>
stateMachineFactory = new StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>(
RMStateStoreState.DEFAULT)
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
RMStateStoreEventType.STORE_APP, new StoreAppTransition())
...
//看到这里执行StoreAppTransition的方法
继续看下去StoreAppTransition:
private static class StoreAppTransition
implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
@Override
public void transition(RMStateStore store, RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
ApplicationStateData appStateData = ApplicationStateData
.newInstance(appState);
LOG.info("Storing info for app: " + appId);
try {
//1. store application state
store.storeApplicationStateInternal(appId, appStateData);
// 2.更改状态至RMAppEventType.APP_NEW_SAVED
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
store.notifyStoreOperationFailed(e);
}
};
}
private void notifyApplication(RMAppEvent event) {
//调用dispatcher触发RMAppEventType.APP_NEW_SAVED
rmDispatcher.getEventHandler().handle(event);
}
在RMAppImpl中我们可以看到RMAppEventType.APP_NEW_SAVED会触发AddApplicationToSchedulerTransition方法
.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
看一下AddApplicationToSchedulerTransition:
private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
//这里会去调用scheduler的APP_ADDED scheduler类有好几个, 比如说FifoScheduler或者FairScheduler
//我们就看一下FifoScheduler的APP_ADDED事件吧
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user,
app.submissionContext.getReservationID()));
}
}
我们到FifoScheduler的APP_ADDED事件看看:
case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}
会去调用scheduler的addApplication方法, 看一下:
public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {
//创建一个S 20000 chedulerApplication
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
//Application放到Scheduler里面
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {
//通过resourceManager的dispatcher去触发RMAppEventType.APP_ACCEPTED
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
}
那么我们就要再回到RMAPPImpl去看看RMAppEventType.APP_ACCEPTED做了什么:
.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())
会去执行StartAppAttemptTransition方法, 这个方法其实就是尝试去启动一次Application, 如果失败 还会尝试, 直到尝试的次数到达最大尝试次数为止
看一下StartAppAttemptTransition方法:
private static final class StartAppAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {
//创建启动attempt
app.createAndStartNewAttempt(false);
};
}
//在createAndStartNewAttempt中创建了RMAppAttempt, 然后去触发RMAppAttemptEventType.START事件
private void
createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt();
handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
transferStateFromPreviousAttempt));
}
private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
// The newly created attempt maybe last attempt if (number of
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.
//每次尝试 +1 知道最大次数到了为止
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}
这里就会去RMAPPAttemptImpl 里面去触发RMAppAttemptEventType.START事件, 看一下这个状态机里面的transition是怎么样的:
private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
RMAppAttemptEvent>
stateMachineFactory = new StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
RMAppAttemptEvent>(RMAppAttemptState.NEW)
// Transitions from NEW State
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
RMAppAttemptEventType.START, new AttemptStartedTransition())
//回去调用本类的AttemptStartedTransition方法
看一下本类的AttemptStartedTransition方法:
private static final class AttemptStartedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
boolean transferStateFromPreviousAttempt = false;
if (event instanceof RMAppStartAttemptEvent) {
transferStateFromPreviousAttempt =
((RMAppStartAttemptEvent) event)
.getTransferStateFromPreviousAttempt();
}
appAttempt.startTime = System.currentTimeMillis();
// Register with the ApplicationMasterService
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);
if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.clientTokenMasterKey =
appAttempt.rmContext.getClientToAMTokenSecretManager()
.createMasterKey(appAttempt.applicationAttemptId);
}
// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.
//将applicationAttempt触发scheduler的APP_ATTEMPT_ADDED事件, 就是FifoScheduler的APP_ATTEMPT_ADDED事件
appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
}
}
我们回到FifoScheduler里面看一下APP_ATTEMPT_ADDED做了什么:
case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
//执行addApplicationAttempt方法
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getIsAttemptRecovering());
}
public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {
//创建SchedulerApplication
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
// TODO: Fix store
//创建FiCaSchedulerApp
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
activeUsersManager, this.rmContext);
if (transferStateFromPreviousAttempt) {
schedulerApp.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}
//设置当前attempt
application.setCurrentAppAttempt(schedulerApp);
//Submit
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(appAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {
//回去触发RMAppAttemptEventType.ATTEMPT_ADDED事件
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
}
那么接下来就又要回到MRAPPAttemptImpl去看RMAppAttemptEventType.ATTEMPT_ADDED事件了:
.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.ATTEMPT_ADDED,
new ScheduleTransition())
会去执行ScheduleTransition方法, 完成这一步后 这次尝试就会变成scheduled状态, 等着scheduler去assignContainer到nodemanager去了:
public static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
//这个就是刚刚开始的时候我们创建的包含container启动脚本的地方
ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
if (!subCtx.getUnmanagedAM()) {
// Need reset #containers before create new attempt, because this request
// will be passed to scheduler, and scheduler will deduct the number after
// AM container allocated
// Currently, following fields are all hard code,
// TODO: change these fields when we want to support
// priority/resource-name/relax-locality specification for AM containers
// allocation.
//一个App Master 一个container 设置一些container的属性
appAttempt.amReq.setNumContainers(1);
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
// AM resource has been checked when submission
//去scheduler里面执行allocate 然后会返回一个Allocation对象, 会等NodeManager去heartBeat的时候, ResourceManager发现这个NM还有资源, 然后就assign这个Allocation到这个NM上面, 再去Launch AM
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST, null, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
}
return RMAppAttemptState.SCHEDULED;
} else {
// save state and then go to LAUNCHED state
appAttempt.storeAttempt();
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
}
到目前为止AM的Allocation以及生成了, 接下去就是通过Nodemanager去分配container, 然后在NM上面启动Container (执行刚刚开始设置的脚本)
这部分会在第二篇文章里面再写, 这篇就到这里吧
相关文章推荐
- Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析
- Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析
- Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (中)
- Hadoop Yarn事件处理框架源码分析
- Hadoop源码分析(1):HDFS读写过程解析
- aiohttp 源码解析之 request 的处理过程
- Tomcat请求处理过程(Tomcat源码解析五)
- Spark streaming技术内幕6 : Job动态生成原理与源码解析
- Openstack Cinder中建立volume过程的源码解析(6)----以及taskflow相关解析
- 解析iOS内存不足时的警告以及处理过程
- hadoop源码分析(2):Map-Reduce的过程解析
- Spark streaming源码分析之Job动态生成原理与源码解析
- Hadoop1.2.1源码解析系列:JobTracker(一)——JobTracker初始化
- C#生成、解析xml文件以及处理报错原因
- Hadoop源码解析之YARN的服务库与事件库
- aiohttp 源码解析之 request 的处理过程
- hadoop1HDFS的读写过程源码解析
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Hadoop源码解析之: TextInputFormat如何处理跨split的行
- Hadoop源码分析HDFS Client向HDFS写入数据的过程解析