您的位置:首页 > 运维架构

Hadoop之yarn的工作流程

2015-07-22 18:11 477 查看
yarn通用资源管理框架主要由以下几个部分组成

ResourceManager(RM):由Scheduler调度器和ApplicationsManager(ASM:资源管理器)2个组件组成,ResourceManager和每个NodeManager
(NM)构成一个资源估算框架,管理协调分配集群中的资源,对在系统中所有应用的资源分配拥有最终最高级别的仲裁权。

ApplicationMaster(AM):用来协调应用程序下Task的运行。它和MapReduce Task都运行在 Container中,这个Container由RM(ResourcesManager)调度(启动/停止)并由NM(NodeManager)管理,并且监控所有Task的运行情况,在任务运行失败时,重新为任务申请资源以启动任务。

注:(MRAppMaster是mapreduce的ApplicationMaster实现)

Nodemanager(NM):用来启动和监控本地计算机资源单位Container的利用情况,是每个节点上的资源和任务管理器,定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态,并且接受并处理来自AM的Container启动/停止等请求。

Container:Container是yarn资源的抽象,它封装了某个节点上的多维度资源(内存,cpu,磁盘,网络等),当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。yarn会为每个任务分配一个Container,且该任务只能使用该Container描述的资源,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。(目前yarn只支持cpu和内存2种资源)



来源:http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/YARN.html

1:用户向YARN中提交JOB,当在配置文件中设置mapreduce.framework.name为yarn时候,MapReduce2.0继承接口ClientProtocol的模式就激活了。RM会生成新的Job ID(即Application ID),接着Client计算输入分片,拷贝资源(包括Job JAR文件、配置文件,分片信息)到HDFS,最后用submitApplication函数提交JOB给RM。

获取新的JobID源码(org.apache.hadoop.mapred.YARNRunner):

@Override
  public JobID getNewJobID() throws IOException, InterruptedException {
    return resMgrDelegate.getNewJobID();
  }


submitApplication提交应用程序源码(org.apache.hadoop.mapred.YARNRunner):

@Override
  public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    
    addHistoryToken(ts);
    
    // Construct necessary information to start the MR AM
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    try {
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }
通过Eclipse的Hadoop插件,可以查看应用程序的相关信息:



2:RM接受submitApplication方法提交的JOB,则将其请求交给Scheduler(调度器)处理,Scheduler(调度器)分配Container,同时RM在NM上分配应用程序第一个Container来启动ApplicationMaster进程,MRAppMatser会初始化一定数量的记录对象(bookkeeping)来跟踪JOB的运行进度,
并收取每个TASK的进度和完成情况,接着MRAppMaster收集计算后的输入分片情况,如果应用程序很小,能在同一个JVM上运行,则用uber模式,下面会讲满足什么情况才采用uber模式。

3:如果不在uber模式下运行,则Application Master会为所有的Map和Reducer task向RM请求Container,所有的请求都通过heartbeat(心跳)传递,心跳也传递其他信息,例如关于map数据本地化的信息,分片所在的主机和机架地址信息,这些信息帮助调度器来做出调度的决策,调度器尽可能遵循数据本地化或者机架本地化的原则分配Container。

在Yarn中,例如,用yarn.scheduler.capacity.minimum- allocation-mb设置最小申请资源1G,用yarn.scheduler.capacity.maximum-allocation-mb设置 最大可申请资源10G 这样一个Task申请的资源内存可以灵活的在1G~10G范围内

4:获取到Container后,NM上的Application Master就联系NM启动Container,Task最后被一个叫org.apache.hadoop.mapred.YarnChild的main类执行,不过在此之前各个资源文件已经从分布式缓存拷贝下来,这样才能开始运行map Task或者reduce Task。PS:YarnChild是一个(dedicated)的JVM。

5:当Yarn运行同时,各个Container会报告它的进度和状态给Application Master,客户端会每秒轮询检测Application Master,这样就随时收到更新信息,这些信息可以通过Web UI来查看。

6:客户端每5秒轮询检查Job是否完成,期间需要调用函数Job类下waitForCompletion()方法,Job结束后该方法返回。轮询时间间隔可以用配置文件的属性mapreduce.client.completion.pollinterval来设置

7:应用程序运行完成后, MRAppMaster向ResourceManager 注销并关闭自己。

YARN能够调度CPU和内存,有些任务使用CPU比较多,有些任务就比较占内存,所以要根据任务的特点合理的利用计算机资源。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: