您的位置:首页 > 大数据 > Hadoop

Hadoop中Yarnrunner里面submit Job以及AM生成 至Job处理过程源码解析 (上)

2016-09-24 16:46 483 查看
参考了一篇文章, 才看懂了Yarnrunner的整个流程:
http://blog.csdn.net/caodaoxi/article/details/12970993

网上很多文章写的不是很清楚, 有些到AM的启动的时候就错了, 结合最近俩星期看的Yarnrunner部分的源码 我把我的理解写下来, 打算分三部分
上: SubmitJob到schduler为ApplicationMaster申请Container
中:AM到NodeManager启动container 至在NM端执行APPMaster的脚本
下:启动container去执行Job的过程

可能中间也会有一些理解的错误, 导致文章并不对, 希望有人能指出来, 毕竟写个博客本身就是为了记录自己对大数据各种工具的学习过程


先借用上面文章的一部分, 作为一些需要先了解的内容, 不然完全不知道代码在讲什么:
(1)RMApp:每个application对应一个RMApp对象,保存该application的各种信息。

(2)RMAppAttempt:每个RMApp可能会对应多个RMAppAttempt对象,这取决于前面的RMAppAttempt是否执行成功,如果不成功,会启动另外一个,直到运行成功。RMAppAttempt对象称为“application执行尝试”,这RMApp与RMAppAttempt关系类似于MapReduce中的task与taskAttempt的关系。

(3)RMNode:保存各个节点的信息。

(4)RMContainer:保存各个container的信息。

2. 事件调度器

(1)AsyncDispatcher

中央事件调度器,各个状态机的事件调度器会在中央事件调度器中注册,注册方式信息包括:<事件,事件调度器>。该调度器维护了一个事件队列,它会不断扫描整个队列,取出一个事件,检查事件类型,并交给相应的事件调度器处理。

(2)各个子事件调度器
事件类型 状态机 事件处理器
RMAppEvent RMApp ApplicationEventDispatcher
RMAppAttemptEvent RMAppAttempt ApplicationAttemptEventDispatcher
RMNodeEvent RMNode NodeEventDispatcher
SchedulerEvent — SchedulerEventDispatcher
AMLauncherEvent — ApplicationMasterLauncher


好了, 首先我们得从SubmitJob来讲起, 入口是job.waitForCompletion方法, 深入的话会看到最后是调用JobSubmitter类的submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); 来提交的。 submitClient的申明是ClientProtocol接口, 一共有两个实现类:
LocalRunner
YarnRunner

我之前有一篇文章写得是从LocalRunner这边进入, 然后最后是怎么执行Job的。
Yarn的话就是就是通过YarnRunner执行的
我们看一下YarnRunner的submitJob方法:

主要就是创建了ApplicationSubmissionContext对象, 以及通过ResourceMgrDelegate去提交ApplicationSubmissionContext


public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
throws IOException, InterruptedException {

addHistoryToken(ts);

// Construct necessary information to start the MR AM
//这个appContext很重要, 里面拼接了各种环境变量, 以及启动App Master的脚本 这个对象会一直贯穿于各个类之间, 直到AM启动
ApplicationSubmissionContext appContext =
createApplicationSubmissionContext(conf, jobSubmitDir, ts);

// Submit to ResourceManager
try {

//通过ResourceMgrDelegate来sumbit这个appContext, ResourceMgrDelegate类是用来和Resource Manager在通讯的
ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext);

//这个appMaster并不是我们说的ApplicationMaster对象, 这样的命名刚开始也把我迷惑了。。。
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics =
(appMaster == null ?
"application report is null" : appMaster.getDiagnostics());
if (appMaster == null
|| appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " +
diagnostics);
}
return clientCache.getClient(jobId).getJobStatus(jobId);
} catch (YarnException e) {
throw new IOException(e);
}
}



上面讲到了ApplicationSubmissionContext里面存储了启动Application Master的类的脚本, 那么就稍微看一下:
主要里面都是拼接各种变量和命令的, 所以就略去了大部分, 就留下了最重要的那句:

public ApplicationSubmissionContext createApplicationSubmissionContext(
Configuration jobConf,
String jobSubmitDir, Credentials ts) throws IOException {

...

//这里才是设定Appmaster类的地方, MRJobConfig.APPLICATION_MASTER_CLASS = org.apache.hadoop.mapreduce.v2.app.MRAppMaster
//所以最后通过命令在nodemanager那边执行的其实是MRAppMaster类的main方法
vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDOUT);
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
Path.SEPARATOR + ApplicationConstants.STDERR);

Vector<String> vargsFinal = new Vector<String>(8);
// Final command
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
}
vargsFinal.add(mergedCommand.toString());

...

// Setup ContainerLaunchContext for AM container
//根据前面的拼接的命令生成AM的container 在后面会通过这个对象来启动container 从而启动MRAppMaster
ContainerLaunchContext amContainer =
ContainerLaunchContext.newInstance(localResources, environment,
vargsFinal, null, securityTokens, acls);


...
//设置AMContainer
appContext.setAMContainerSpec(amContainer);

...

return appContext;
}


ApplicationSubmissionContext只需要记住amContainer的启动脚本在里面, 后面会用到。 那么继续看一下ResourceMgrDelegate的submitApplication:

public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
return client.submitApplication(appContext);
}

他是通过client来submit的, 这个client实在ResourceMgrDelegate的构造函数里面创建的, 其实就是一个YarnClientImpl对象:

public ResourceMgrDelegate(YarnConfiguration conf) {
super(ResourceMgrDelegate.class.getName());
this.conf = conf;
this.client = YarnClient.createYarnClient();
init(conf);
start();
}


到目前为止, 所有的内容都还是在提交Job的 4000 那台Client机器上, 还没有到ResourceManger那边。

我们看一下YarnClientImpl的submitApplication:

public ApplicationId
submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId();
if (applicationId == null) {
throw new ApplicationIdNotProvidedException(
"ApplicationId is not provided in ApplicationSubmissionContext");
}

//将appContext设置到一个request里面
SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext);

// Automatically add the timeline DT into the CLC
// Only when the security and the timeline service are both enabled
if (isSecurityEnabled() && timelineServiceEnabled) {
addTimelineDelegationToken(appContext.getAMContainerSpec());
}

//TODO: YARN-1763:Handle RM failovers during the submitApplication call.


//通过rmClient提交request, 这个rmClient其实就是ClientRMService类, 是用来和Resource Manager做RPC的call, 通过这个类, 可以直接和RM对话
rmClient.submitApplication(request);

int pollCount = 0;
long startTime = System.currentTimeMillis();


//一直循环, 直到状态变为NEW为止, 如果长时间状态没变, 那么就timeout
while (true) {
try {
YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
LOG.info("Submitted application " + applicationId);
break;
}

long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) {
throw new YarnException("Timed out while waiting for application " +
applicationId + " to be submitted successfully");
}

// Notify the client through the log every 10 poll, in case the client
// is blocked here too long.
if (++pollCount % 10 == 0) {
LOG.info("Application submission is not finished, " +
"submitted application " + applicationId +
" is still in " + state);
}
try {
Thread.sleep(submitPollIntervalMillis);
} catch (InterruptedException ie) {
LOG.error("Interrupted while waiting for application "
+ applicationId
+ " to be successfully submitted.");
}
} catch (ApplicationNotFoundException ex) {
// FailOver or RM restart happens before RMStateStore saves
// ApplicationState
LOG.info("Re-submit application " + applicationId + "with the " +
"same ApplicationSubmissionContext");
rmClient.submitApplication(request);
}
}

return applicationId;
}


看一下rmClient也就是ClientRMService的submitApplication:

public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnException {
ApplicationSubmissionContext submissionContext = request
.getApplicationSubmissionContext();
ApplicationId applicationId = submissionContext.getApplicationId();

// ApplicationSubmissionContext needs to be validated for safety - only
// those fields that are independent of the RM's configuration will be
// checked here, those that are dependent on RM configuration are validated
// in RMAppManager.


//开始各种验证 一不开心就不让干活
String user = null;
try {
// Safety
user = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException ie) {
LOG.warn("Unable to get the current user.", ie);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
ie.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw RPCUtil.getRemoteException(ie);
}

//各种验证
// Check whether app has already been put into rmContext,
// If it is, simply return the response
if (rmContext.getRMApps().get(applicationId) != null) {
LOG.info("This is an earlier submitted application: " + applicationId);
return SubmitApplicationResponse.newInstance();
}

//继续验证
if (submissionContext.getQueue() == null) {
submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
if (submissionContext.getApplicationName() == null) {
submissionContext.setApplicationName(
YarnConfiguration.DEFAULT_APPLICATION_NAME);
}
if (submissionContext.getApplicationType() == null) {
submissionContext
.setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);
} else {
if (submissionContext.getApplicationType().length() > YarnConfiguration.APPLICATION_TYPE_LENGTH) {
submissionContext.setApplicationType(submissionContext
.getApplicationType().substring(0,
YarnConfiguration.APPLICATION_TYPE_LENGTH));
}
}

try {
// call RMAppManager to submit application directly
//干活 通过rmAppManager提交
rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);

LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
"ClientRMService", applicationId);
} catch (YarnException e) {
LOG.info("Exception in submitting application with id " +
applicationId.getId(), e);
RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
e.getMessage(), "ClientRMService",
"Exception in submitting application", applicationId);
throw e;
}

SubmitApplicationResponse response = recordFactory
.newRecordInstance(SubmitApplicationResponse.class);
return response;
}


来看一下rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), user);的具体内容:

protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();

//创建一个RMAppImpl对象 其实就是启动RMApp状态机 以及执行RMAppEvent
RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();

//如果有安全认证enable的话会走这里, 比如kerberos啥的 我就不这么麻烦了 以看懂为主, 直接到else
if (UserGroupInformation.isSecurityEnabled()) {
try {
this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
parseCredentials(submissionContext),
submissionContext.getCancelTokensWhenComplete(),
application.getUser());
} catch (Exception e) {
LOG.warn("Unable to parse credentials.", e);
// Sending APP_REJECTED is fine, since we assume that the
// RMApp is in NEW state and thus we haven't yet informed the
// scheduler about the existence of the application
assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage()));
throw RPCUtil.getRemoteException(e);
}
} else {
// Dispatcher is not yet started at this time, so these START events
// enqueued should be guaranteed to be first processed when dispatcher
// gets started.

//启动RMApp的状态机, 这里rmContext其实是resourceManager的Client代理, 这一步就是让去RM端的dispatcher去处理RMAppEventType.START事件
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.START));
}
}


在文章的开头有写“事件调度器”, 在resourcemanager那边会有AsyncDispatcher来调度所有事件, 这里的话会通过ApplicationEventDispatcher去做RmAppImpl的transition方法, 看一下RmAppImpl类的初始化的时候的各种event和transition:

private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)


// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())

...

//实在太长了, 到这里我们可以看到上面已经捕捉了RMAppEventType.START事件, 会把RMApp的状态从NEW变成NEW_SAVING, 调用RMAppNewlySavingTransition方法



我们看一下RMAppNewlySavingTransition里面做了什么:

private static final class RMAppNewlySavingTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

// If recovery is enabled then store the application information in a
// non-blocking call so make sure that RM has stored the information
// needed to restart the AM after RM restart without further client
// communication
LOG.info("Storing application with id " + app.applicationId);
app.rmContext.getStateStore().storeNewApplication(app);
}
}


只做了storeNewApplication(app)这个动作:

public synchronized void storeNewApplication(RMApp app) {
ApplicationSubmissionContext context = app
.getApplicationSubmissionContext();
assert context instanceof ApplicationSubmissionContextPBImpl;
ApplicationState appState =
new ApplicationState(app.getSubmitTime(), app.getStartTime(), context,
app.getUser());

//触发了RMStateStore的STORE_APP事件
dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));
}


RMStateStore的状态机transition定义:

private static final StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>
stateMachineFactory = new StateMachineFactory<RMStateStore,
RMStateStoreState,
RMStateStoreEventType,
RMStateStoreEvent>(
RMStateStoreState.DEFAULT)
.addTransition(RMStateStoreState.DEFAULT, RMStateStoreState.DEFAULT,
RMStateStoreEventType.STORE_APP, new StoreAppTransition())

...

//看到这里执行StoreAppTransition的方法


继续看下去StoreAppTransition:

private static class StoreAppTransition
implements SingleArcTransition<RMStateStore, RMStateStoreEvent> {
@Override
public void transition(RMStateStore store, RMStateStoreEvent event) {
if (!(event instanceof RMStateStoreAppEvent)) {
// should never happen
LOG.error("Illegal event type: " + event.getClass());
return;
}
ApplicationState appState = ((RMStateStoreAppEvent) event).getAppState();
ApplicationId appId = appState.getAppId();
ApplicationStateData appStateData = ApplicationStateData
.newInstance(appState);
LOG.info("Storing info for app: " + appId);
try {

//1. store application state
store.storeApplicationStateInternal(appId, appStateData);

// 2.更改状态至RMAppEventType.APP_NEW_SAVED
store.notifyApplication(new RMAppEvent(appId,
RMAppEventType.APP_NEW_SAVED));
} catch (Exception e) {
LOG.error("Error storing app: " + appId, e);
store.notifyStoreOperationFailed(e);
}
};
}

private void notifyApplication(RMAppEvent event) {

//调用dispatcher触发RMAppEventType.APP_NEW_SAVED
rmDispatcher.getEventHandler().handle(event);
}



在RMAppImpl中我们可以看到RMAppEventType.APP_NEW_SAVED会触发AddApplicationToSchedulerTransition方法


.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())


看一下AddApplicationToSchedulerTransition:

private static final class AddApplicationToSchedulerTransition extends
RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

//这里会去调用scheduler的APP_ADDED scheduler类有好几个, 比如说FifoScheduler或者FairScheduler
//我们就看一下FifoScheduler的APP_ADDED事件吧
app.handler.handle(new AppAddedSchedulerEvent(app.applicationId,
app.submissionContext.getQueue(), app.user,
app.submissionContext.getReservationID()));
}
}


我们到FifoScheduler的APP_ADDED事件看看:

case APP_ADDED:
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
appAddedEvent.getQueue(), appAddedEvent.getUser(),
appAddedEvent.getIsAppRecovering());
}


会去调用scheduler的addApplication方法, 看一下:

public synchronized void addApplication(ApplicationId applicationId,
String queue, String user, boolean isAppRecovering) {

//创建一个S 20000 chedulerApplication
SchedulerApplication<FiCaSchedulerApp> application =
new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);

//Application放到Scheduler里面
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
if (isAppRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
}
} else {

//通过resourceManager的dispatcher去触发RMAppEventType.APP_ACCEPTED
rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
}
}


那么我们就要再回到RMAPPImpl去看看RMAppEventType.APP_ACCEPTED做了什么:

.addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,
RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())


会去执行StartAppAttemptTransition方法, 这个方法其实就是尝试去启动一次Application, 如果失败 还会尝试, 直到尝试的次数到达最大尝试次数为止

看一下StartAppAttemptTransition方法:

private static final class StartAppAttemptTransition extends RMAppTransition {
@Override
public void transition(RMAppImpl app, RMAppEvent event) {

//创建启动attempt
app.createAndStartNewAttempt(false);
};
}

//在createAndStartNewAttempt中创建了RMAppAttempt, 然后去触发RMAppAttemptEventType.START事件
private void
createAndStartNewAttempt(boolean transferStateFromPreviousAttempt) {
createNewAttempt();
handler.handle(new RMAppStartAttemptEvent(currentAttempt.getAppAttemptId(),
transferStateFromPreviousAttempt));
}


private void createNewAttempt() {
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(applicationId, attempts.size() + 1);
RMAppAttempt attempt =
new RMAppAttemptImpl(appAttemptId, rmContext, scheduler, masterService,
submissionContext, conf,
// The newly created attempt maybe last attempt if (number of
// previously failed attempts(which should not include Preempted,
// hardware error and NM resync) + 1) equal to the max-attempt
// limit.

//每次尝试 +1 知道最大次数到了为止
maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
attempts.put(appAttemptId, attempt);
currentAttempt = attempt;
}


这里就会去RMAPPAttemptImpl 里面去触发RMAppAttemptEventType.START事件, 看一下这个状态机里面的transition是怎么样的:

private static final StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
RMAppAttemptEvent>
stateMachineFactory = new StateMachineFactory<RMAppAttemptImpl,
RMAppAttemptState,
RMAppAttemptEventType,
RMAppAttemptEvent>(RMAppAttemptState.NEW)

// Transitions from NEW State
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,
RMAppAttemptEventType.START, new AttemptStartedTransition())

//回去调用本类的AttemptStartedTransition方法


看一下本类的AttemptStartedTransition方法:

private static final class AttemptStartedTransition extends BaseTransition {
@Override
public void transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {

boolean transferStateFromPreviousAttempt = false;
if (event instanceof RMAppStartAttemptEvent) {
transferStateFromPreviousAttempt =
((RMAppStartAttemptEvent) event)
.getTransferStateFromPreviousAttempt();
}
appAttempt.startTime = System.currentTimeMillis();

// Register with the ApplicationMasterService
appAttempt.masterService
.registerAppAttempt(appAttempt.applicationAttemptId);

if (UserGroupInformation.isSecurityEnabled()) {
appAttempt.clientTokenMasterKey =
appAttempt.rmContext.getClientToAMTokenSecretManager()
.createMasterKey(appAttempt.applicationAttemptId);
}

// Add the applicationAttempt to the scheduler and inform the scheduler
// whether to transfer the state from previous attempt.

//将applicationAttempt触发scheduler的APP_ATTEMPT_ADDED事件, 就是FifoScheduler的APP_ATTEMPT_ADDED事件

appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(
appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));
}
}


我们回到FifoScheduler里面看一下APP_ATTEMPT_ADDED做了什么:

case APP_ATTEMPT_ADDED:
{
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;

//执行addApplicationAttempt方法
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
appAttemptAddedEvent.getIsAttemptRecovering());
}


public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
boolean transferStateFromPreviousAttempt,
boolean isAttemptRecovering) {

//创建SchedulerApplication
SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
// TODO: Fix store

//创建FiCaSchedulerApp
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE,
activeUsersManager, this.rmContext);

if (transferStateFromPreviousAttempt) {
schedulerApp.transferStateFromPreviousAttempt(application
.getCurrentAppAttempt());
}

//设置当前attempt
application.setCurrentAppAttempt(schedulerApp);

//Submit
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
if (isAttemptRecovering) {
if (LOG.isDebugEnabled()) {
LOG.debug(appAttemptId
+ " is recovering. Skipping notifying ATTEMPT_ADDED");
}
} else {

//回去触发RMAppAttemptEventType.ATTEMPT_ADDED事件
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
}
}


那么接下来就又要回到MRAPPAttemptImpl去看RMAppAttemptEventType.ATTEMPT_ADDED事件了:

.addTransition(RMAppAttemptState.SUBMITTED,
EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,
RMAppAttemptState.SCHEDULED),
RMAppAttemptEventType.ATTEMPT_ADDED,
new ScheduleTransition())


会去执行ScheduleTransition方法, 完成这一步后 这次尝试就会变成scheduled状态, 等着scheduler去assignContainer到nodemanager去了:

public static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {

//这个就是刚刚开始的时候我们创建的包含container启动脚本的地方
ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
if (!subCtx.getUnmanagedAM()) {
// Need reset #containers before create new attempt, because this request
// will be passed to scheduler, and scheduler will deduct the number after
// AM container allocated

// Currently, following fields are all hard code,
// TODO: change these fields when we want to support
// priority/resource-name/relax-locality specification for AM containers
// allocation.

//一个App Master 一个container 设置一些container的属性
appAttempt.amReq.setNumContainers(1);
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);

// AM resource has been checked when submission

//去scheduler里面执行allocate 然后会返回一个Allocation对象, 会等NodeManager去heartBeat的时候, ResourceManager发现这个NM还有资源, 然后就assign这个Allocation到这个NM上面, 再去Launch AM
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST, null, null);
if (amContainerAllocation != null
&& amContainerAllocation.getContainers() != null) {
assert (amContainerAllocation.getContainers().size() == 0);
}
return RMAppAttemptState.SCHEDULED;
} else {
// save state and then go to LAUNCHED state
appAttempt.storeAttempt();
return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;
}
}
}


到目前为止AM的Allocation以及生成了, 接下去就是通过Nodemanager去分配container, 然后在NM上面启动Container (执行刚刚开始设置的脚本)

这部分会在第二篇文章里面再写, 这篇就到这里吧
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: