您的位置:首页 > 移动开发

YARN源码分析之ApplicationMaster启动流程之RM端

2017-07-12 00:02 681 查看
任何一个计算框架或者说一个服务要运行在yarn上,都需要一个master来对job进行管理,这个master就是ApplicationMaster。

ApplicationMaster是一个job的大脑,下面就以MapReduce为例,介绍下ApplicationMaster的启动流程。

首先client向RM提交一个application请求,RM创建一个application,然后再创建一个appattempt,后期的调度和任务的拆解都是对这个appattempt进行的,当appattempt的状态从
ALLOCATED_SAVING
变成
ALLOCATED
时,由
AttemptStoredTransition.transition
调用
appAttempt.launchAttempt()
进行启动,下面来看下具体代码:

// RMAppAttemptImpl.java
private void launchAttempt(){
// Send event to launch the AM Container
// 通过异步调度器得到该事件注册的handle (在ResourceManager中注册)
// AMLauncherEvent 对应的handle是ApplicationMasterLauncher
eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this));
}


AMLauncherEvent对应的handle是ApplicationMasterLauncher,事件类型是LAUNCH,在
ApplicationMasterLauncher.handle
中会调用
launch(application)
,代码如下:

private void launch(RMAppAttempt application) {
// 创建一个线程
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
// 将线程放入阻塞队列中
masterEvents.add(launcher);
}


只从这个方法来分析,首先创建了一个launcher线程,然后将其放入一个队列中,等待另一个线程从队列中取出进行操作,这是典型的生产者消费者模型。那么我们就来看下
ApplicationMasterLauncher
(ApplicationMasterLauncher是一个事件也是一个服务)关于这块代码的具体实现。

先看下
createRunnableLauncher


protected Runnable createRunnableLauncher(RMAppAttempt application,
AMLauncherEventType event) {
Runnable launcher =
new AMLauncher(context, application, event, getConfig());
return launcher;
}


这里只是new了一个AMLauncher,AMLauncher实现了Runnable接口,是执行AM操作的线程,只执行
launch
cleanup


launcher线程创建之后add到阻塞队列masterEvents中,那么必然会有另一个线程来队列中take launcher,这个线程是
LauncherThread
类型的
launcherHandlingThread
,launcherHandlingThread将launcher取出丢给线程池去执行,代码如下:

private class LauncherThread extends Thread {
public LauncherThread() {
super("ApplicationMaster Launcher");
}
@Override
public void run() {
while (!this.isInterrupted()) {
Runnable toLaunch;
try {
// 从阻塞队列中取出
toLaunch = masterEvents.take();
// 交给线程池执行
// this.launcherPool = new ThreadPoolExecutor(10, 10, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
launcherPool.execute(toLaunch);
} catch (InterruptedException e) {
LOG.warn(this.getClass().getName() + " interrupted. Returning.");
return;
}
}
}
}


放入线程池之后,launcher线程就开始执行,调用的是
AMLauncher.run


public void run() {
switch (eventType) {
case LAUNCH:
try {
LOG.info("Launching master" + application.getAppAttemptId());
launch();
handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
} catch(Exception ie) {
String message = "Error launching " + application.getAppAttemptId()
+ ". Got exception: " + StringUtils.stringifyException(ie);
LOG.info(message);
handler.handle(new RMAppAttemptLaunchFailedEvent(application
.getAppAttemptId(), message));
}
break;
case CLEANUP:
...
break;
default:
LOG.warn("Received unknown event-type " + eventType + ". Ignoring.");
break;
}
}


之前放入阻塞队列masterEvents的事件类型是LAUNCH,则此处调用
launch()
方法:

private void launch() throws IOException, YarnException {
// 得到对应node的rpc客户端
connect();
ContainerId masterContainerID = masterContainer.getId();
ApplicationSubmissionContext applicationContext =
application.getSubmissionContext();
LOG.info("Setting up container " + masterContainer
+ " for AM " + application.getAppAttemptId());
ContainerLaunchContext launchContext =
createAMContainerLaunchContext(applicationContext, masterContainerID);
// 构建request
StartContainerRequest scRequest =
StartContainerRequest.newInstance(launchContext,
masterContainer.getContainerToken());
List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
list.add(scRequest);
StartContainersRequest allRequests =
StartContainersRequest.newInstance(list);
// 远程调用startContainers
StartContainersResponse response =
containerMgrProxy.startContainers(allRequests);
if (response.getFailedRequests() != null
&& response.getFailedRequests().containsKey(masterContainerID)) {
Throwable t =
response.getFailedRequests().get(masterContainerID).deSerialize();
parseAndThrowException(t);
} else {
LOG.info("Done launching container " + masterContainer + " for AM "
+ application.getAppAttemptId());
}
}


AMLaunch.launch先在
connect()
中拿到对应node的rpc客户端
containerMgrProxy
,然后构造request,最后调用rpc函数
startContainers()
并返回response。

以上是RM端MRAppMaster的启动流程,下一篇将继续介绍nodemanager端端启动流程。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  yarn mapreduce 源码