YARN源码分析之ApplicationMaster启动流程之RM端
2017-07-12 00:02
681 查看
任何一个计算框架或者说一个服务要运行在yarn上,都需要一个master来对job进行管理,这个master就是ApplicationMaster。
ApplicationMaster是一个job的大脑,下面就以MapReduce为例,介绍下ApplicationMaster的启动流程。
首先client向RM提交一个application请求,RM创建一个application,然后再创建一个appattempt,后期的调度和任务的拆解都是对这个appattempt进行的,当appattempt的状态从
AMLauncherEvent对应的handle是ApplicationMasterLauncher,事件类型是LAUNCH,在
只从这个方法来分析,首先创建了一个launcher线程,然后将其放入一个队列中,等待另一个线程从队列中取出进行操作,这是典型的生产者消费者模型。那么我们就来看下
先看下
这里只是new了一个AMLauncher,AMLauncher实现了Runnable接口,是执行AM操作的线程,只执行
launcher线程创建之后add到阻塞队列masterEvents中,那么必然会有另一个线程来队列中take launcher,这个线程是
放入线程池之后,launcher线程就开始执行,调用的是
之前放入阻塞队列masterEvents的事件类型是LAUNCH,则此处调用
AMLaunch.launch先在
以上是RM端MRAppMaster的启动流程,下一篇将继续介绍nodemanager端端启动流程。
ApplicationMaster是一个job的大脑,下面就以MapReduce为例,介绍下ApplicationMaster的启动流程。
首先client向RM提交一个application请求,RM创建一个application,然后再创建一个appattempt,后期的调度和任务的拆解都是对这个appattempt进行的,当appattempt的状态从
ALLOCATED_SAVING变成
ALLOCATED时,由
AttemptStoredTransition.transition调用
appAttempt.launchAttempt()进行启动,下面来看下具体代码:
// RMAppAttemptImpl.java private void launchAttempt(){ // Send event to launch the AM Container // 通过异步调度器得到该事件注册的handle (在ResourceManager中注册) // AMLauncherEvent 对应的handle是ApplicationMasterLauncher eventHandler.handle(new AMLauncherEvent(AMLauncherEventType.LAUNCH, this)); }
AMLauncherEvent对应的handle是ApplicationMasterLauncher,事件类型是LAUNCH,在
ApplicationMasterLauncher.handle中会调用
launch(application),代码如下:
private void launch(RMAppAttempt application) { // 创建一个线程 Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH); // 将线程放入阻塞队列中 masterEvents.add(launcher); }
只从这个方法来分析,首先创建了一个launcher线程,然后将其放入一个队列中,等待另一个线程从队列中取出进行操作,这是典型的生产者消费者模型。那么我们就来看下
ApplicationMasterLauncher(ApplicationMasterLauncher是一个事件也是一个服务)关于这块代码的具体实现。
先看下
createRunnableLauncher,
protected Runnable createRunnableLauncher(RMAppAttempt application, AMLauncherEventType event) { Runnable launcher = new AMLauncher(context, application, event, getConfig()); return launcher; }
这里只是new了一个AMLauncher,AMLauncher实现了Runnable接口,是执行AM操作的线程,只执行
launch和
cleanup。
launcher线程创建之后add到阻塞队列masterEvents中,那么必然会有另一个线程来队列中take launcher,这个线程是
LauncherThread类型的
launcherHandlingThread,launcherHandlingThread将launcher取出丢给线程池去执行,代码如下:
private class LauncherThread extends Thread { public LauncherThread() { super("ApplicationMaster Launcher"); } @Override public void run() { while (!this.isInterrupted()) { Runnable toLaunch; try { // 从阻塞队列中取出 toLaunch = masterEvents.take(); // 交给线程池执行 // this.launcherPool = new ThreadPoolExecutor(10, 10, 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>()); launcherPool.execute(toLaunch); } catch (InterruptedException e) { LOG.warn(this.getClass().getName() + " interrupted. Returning."); return; } } } }
放入线程池之后,launcher线程就开始执行,调用的是
AMLauncher.run
public void run() { switch (eventType) { case LAUNCH: try { LOG.info("Launching master" + application.getAppAttemptId()); launch(); handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(), RMAppAttemptEventType.LAUNCHED)); } catch(Exception ie) { String message = "Error launching " + application.getAppAttemptId() + ". Got exception: " + StringUtils.stringifyException(ie); LOG.info(message); handler.handle(new RMAppAttemptLaunchFailedEvent(application .getAppAttemptId(), message)); } break; case CLEANUP: ... break; default: LOG.warn("Received unknown event-type " + eventType + ". Ignoring."); break; } }
之前放入阻塞队列masterEvents的事件类型是LAUNCH,则此处调用
launch()方法:
private void launch() throws IOException, YarnException { // 得到对应node的rpc客户端 connect(); ContainerId masterContainerID = masterContainer.getId(); ApplicationSubmissionContext applicationContext = application.getSubmissionContext(); LOG.info("Setting up container " + masterContainer + " for AM " + application.getAppAttemptId()); ContainerLaunchContext launchContext = createAMContainerLaunchContext(applicationContext, masterContainerID); // 构建request StartContainerRequest scRequest = StartContainerRequest.newInstance(launchContext, masterContainer.getContainerToken()); List<StartContainerRequest> list = new ArrayList<StartContainerRequest>(); list.add(scRequest); StartContainersRequest allRequests = StartContainersRequest.newInstance(list); // 远程调用startContainers StartContainersResponse response = containerMgrProxy.startContainers(allRequests); if (response.getFailedRequests() != null && response.getFailedRequests().containsKey(masterContainerID)) { Throwable t = response.getFailedRequests().get(masterContainerID).deSerialize(); parseAndThrowException(t); } else { LOG.info("Done launching container " + masterContainer + " for AM " + application.getAppAttemptId()); } }
AMLaunch.launch先在
connect()中拿到对应node的rpc客户端
containerMgrProxy,然后构造request,最后调用rpc函数
startContainers()并返回response。
以上是RM端MRAppMaster的启动流程,下一篇将继续介绍nodemanager端端启动流程。
相关文章推荐
- spark 1.6.0 core源码分析2 master启动流程
- Spark On YARN启动流程源码分析
- Spark1.3从创建到提交:1)master和worker启动流程源码分析
- spark源码分析Master与Worker启动流程篇
- HBase 0.1.0版本源码分析--Master启动流程
- spark core源码分析2 master启动流程
- Spark集群启动之Master、Worker启动流程源码分析
- YARN源码分析之ApplicationMaster启动流程之NM端
- YARN源码分析之ApplicationMaster分配策略
- Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)
- 源码分析之application启动流程
- Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(一)
- spark core源码分析2 master启动流程
- Yarn源码分析之MRAppMaster上MapReduce作业处理总流程(二)
- Hadoop源码解析之ApplicationMaster启动流程
- HBase 0.94 master启动过程源码分析
- MySQL源码分析及核心内幕之4 -- 源码服务端main函数开始及启动流程
- ngx源码分析--启动流程
- [android源码分析]bluetoothd service的启动的总体流程分析
- Android中ICS4.0源码Launcher启动流程分析【android源码Launcher系列一】