您的位置:首页 > 大数据 > Hadoop

Hadoop MapReduce 一文详解MapReduce及工作机制

2021-06-01 11:25 1021 查看

@[TOC]

前言-MR概述

MapReduce是一个分布式计算框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。主要由两部分组成:编程模型和运行时环 境。其中,编程模型为用户提供了非常易用的编程接口,用户只需要像编写串行程序 一样实现几个简单的函数即可实现一个分布式程序,而其他比较复杂的工作,如节点 间的通信、节点失效、数据切分等,全部由MapReduce运行时环境完成,用户无须 关心这些细节。

1.Hadoop MapReduce设计思想及优缺点

设计思想

Hadoop MapReduce诞生于搜索领域,主要解决搜索引擎面临的海量数据处理扩展性差的问 题。它的实现很大程度上借鉴了谷歌MapReduce的设计思想,包括简化编程接口、 提高系统容错性等。

优点:

易于编程

​ 传统的分布式程序设计(如MPI)非常复杂,用户需要关注的细节 非常多,比如数据分片、数据传输、节点间通信等,因而设计分布式程序的门槛非常 高。Hadoop的一个重要设计目标便是简化分布式程序设计,将所有并行程序均需要关注的设计细节抽象成公共模块并交由系统实现,而用户只需专注于自己的应用程序逻辑实现,这样简化了分布式程序设计且提高了开发效率。

良好的扩展性

​ 随着公司业务的发展,积累的数据量(如搜索公司的网页量) 会越来越大,当数据量增加到一定程度后,现有的集群可能已经无法满足其计算能力和存储能力,这时候管理员可能期望通过添加机器以达到线性扩展集群能力的目的。

高容错性

​ 在分布式环境下,随着集群规模的增加,集群中的故障率(这里 的“故障”包括磁盘损坏、机器宕机、节点间通信失败等硬件故障和坏数据或者用户 程序bug产生的软件故障)会显著增加,进而导致任务失败和数据丢失的可能性增 加。为此,Hadoop通过计算迁移或者数据迁移等策略提高集群的可用性与容错性。

适合PB级以上海量数据的离线处理

​ 可以实现上千台服务器集群并发工作,提供数据处理能力。

缺点:

不擅长实时计算

​ MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

不擅长流式计算

​ 流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

不擅长DAG(有向图)计算

​ 多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

2. Hadoop MapReduce核心思想

从MapReduce自身的命名特点可以看出,MapReduce由两个阶段组成:Map阶段 和Reduce阶段。

(1)分布式的运算程序往往需要分成至少2个阶段。

(2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。

(3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。

(4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

每一个Map阶段和Reduce阶段都可以由多个Map Task和Reduce Task

实际应用中我们只需编写map()和reduce()两个函数,即可完成简单的分布式程序的 设计。

map()函数以key/value对作为输入,产生另外一系列key/value对作为中间输出 写入本地磁盘。

MapReduce框架会自动将这些中间数据按照key值进行聚集,且key 值相同(用户可设定聚集策略,默认情况下是对key值进行哈希取模)的数据被统一 交给reduce()函数处理。

reduce()函数以key及对应的value列表作为输入,经合并key相同的value值后, 产生另外一系列key/value对作为最终输出写入HDFS。

hadoop MapReduce对外提供了5个可编程组件,分别是 InputFormat、Mapper、Partitioner、Reducer和OutputFormat

3.MapReduce工作机制

剖析MapReduce运行机制

过程描述

  • 客户端:提交MapReduce作业
  • YARN资源管理器,负责协调集群上计算机资源的分配
  • YARN节点管理器,负责启动和监视集群中机器上的计算容器(container)
  • MapReduce的application master,负责协调运行MapReduce作业的任务。他和MapReduce任务在容器中运行,这些容器有资源管理器分配并由节点管理器进行管理
  • 分布式文件系统(一般为HDFS),用来与其他实体间共享作业文件

第一阶段:作业提交(图1-4步)

步骤:

Job的submit()方法创建一个内部的JobSummiter实例,并且调用其submitJobInternal()方法。提交作业后,waitForCompletion()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功,就显示作业计数器;如果失败,则导致作业失败的错误被记录到控制台。

1.客户端提交作业Job,并轮询监控作业进度和状态;

2.Job tracker向RM申请一个新应用ID,用作MR作业的ID;(图中整个流程的步骤2)

3.Job 计算作业的输入分片。如果无法进行分片计算,比如:输入路径不存在,作业就不提交,将错误返回给MR程序;

4.如果上一步OK,Job将运行作业所需要的资源(包括作业的JAR文件、配置文件和计算所得的输入分片)复制到一个以作业ID命名的目录下的共享文件系统中(一般都是HDFS)。作业JAR的复本较多(复本的数量有作业提交的时候MR的参数控制:mapreduce.client.submit.file.replication属性,默认为10),因此运行作业的任务时,集群中有很多个复本可供节点管理器(Node Manager )访问读取;

5.上传完毕后,通过调用资源管理器的submitApplication()方法提交作业。

第二阶段:作业初始化(图5-7步)

步骤:

1.资源管理器收到调用它的submitApplication()消息后,便将请求传递给YARN调度器。调度器分配一个容器(container),并让容器运行MRAppMaster程序,本质是RM(资源管理器)在NM(节点管理器)的管理下在容器中启动application master的进程;(5a、5b)

2.MR作业的application master是一个Java应用程序,其主类是MRAppMaster。由于MRAppMaster要能相应客户端对于应用程序运行状态的查询和状态,因此application master对作业的初始化是通过创建多个簿记对象(就像一个多重账单,形象的命名为簿记对象)以保持对作业进度的跟踪来完成的。接下来,它接受来自共享文件系统的、在客户端计算的输入分片。然后对每一个分片创建一个map任务对象以及有mapreduce.job.reduces属性(通过作业的setNumReducetasks()方法设置)确定的多个reduce任务对象。任务ID在此时分配。

application master必须决定如何运行构成MapReduce作业的各个任务。如果作业很小,就选择和自己在同一个JVM上运行任务。与在一个节点上顺序运行这些任务相比,当application master判断在新的容器中分配和运行任务的开销大于并行运行它们的开销时,就会发生这一情况。这样的作业称为uberized。或者作为Uber任务运行。

小作业是如何定义的?

默认情况下,小作业就是小于10个mapper且只有1个reducer且输入大小小于一个HDFS块的作业。其参数可以通过如下属性进行设置:

  • mapreduce.job.ubertask.maxmaps :最大map任务数量
  • mapreduce.job.ubertask.maxreduces:最大reduce任务数量
  • mapreduce.job.ubertask.maxbytes:处理文件的最大容量,byte为单位
  • mapreduce.job.ubertask.enable:true表示为启用Uber任务

最后,在任何任务运行之前,application master调用setupJob()方法设置OutputCommitter。FileOutputCommitter为默认值,表示将建立作业的最终输出目录及任务输出的临时工作空间。

第三阶段:任务的分配(图8)

当作业不适合作业Uber任务运行,那么application master就会为该作业中的所有map任务和reduce任务向资源管理器(RM)请求容器。

步骤:

  1. 首先Map任务发出请求,该请求优先级要高于reduce任务的请求,这是因为所有的map任务必须在reduce排序阶段能够启动前完成。直到有5% 的map任务已经完成时,为reduce任务申请容器的请求才会发出。
  2. reduce任务能够在集群中任意位置运行,但是map任务的请求有着数据本地化局限,这也是调度器所关注的。在理想情况下,任务时数据本地化的。也就是说任务在分片驻留的同一节点上运行。次选的情况是,任务是可以机架本地化(rack local),即和分片在同一机架上而非同一节点上运行。有一些任务既不是数据本地化也不是机架本地化,他们会从别的机架,而不是运行所在的机器上获取自己的数据。对于一个特定的作业运行,可以通过查看做的计数器来确定在每个本地化层次上运行的任务的数量。
  3. 请求为任务指定了内存需求和CPU数。默认情况下,每个map任务和reduce任务都分配到1G内存和一个虚拟内核。这些值也是可以在每个作业的基础上进行配置。配置属性如下:
      mapreduce.map.memory.mb:map任务内存大小
    • mapreduce.reduce.memory.mb:reduce任务内存大小
    • mapreduce.map.cpu.vcores:map任务cpu核数
    • mapreduce.reduce.cpu.vcores:reduce任务CPU核数

第四阶段:任务的执行(图9-11)

步骤:

  1. 一旦资源管理器的调度器为任务分配了一个特点节点上的容器,application master就通过与节点管理器通信来启动容器.
  2. 该任务有主类为YarnChild的一个Java应用程序执行。在它运行任务之前,首先将任务需要的资源本地化,包括任务的配置、JAR文件和所有来自分布式缓存的文件;
  3. 最后运行map任务或reduce任务。

YarnChild 在指定的JVM中运行,因此用户定义的map和reduce函数(甚至是YarnChild)中的任何缺陷不会影响到节点管理器,例如导致其崩溃或挂起。

每个任务都能够运行搭建(setup)和提交(commit)动作,他们和任务本身在同一个JVM中运行,并有作业的OutputCommitter确定。对于基于文件的作业,提交动作将任务输出由临时位置搬移到最终位置。提交协议确保当推测执行被启用时,只有一个任务副本被提交,其他都取消。

推测执行

MapReduce模型将作业分解成任务,然后并行地运行任务以使作业的整体执行时间少于各个任务顺序执行的时间。这就使得作业执行时间对运行缓慢的任务很敏感,因为只运行一个缓慢的任务会使整个作业所用的时间远远长于执行其他任务的时间,当一个作业由几百或几千个任务组成时,可能出现少数“拖后腿”的任务,这是很常见的。

任务执行缓慢的可能有很多种,但是检测具体原因是比较困哪的(比如,硬件方面当前节点的性能低于其他节点或者软件应用配置如内存,JVM,reduce个数等等的问题),尽管执行时间比预期长,但是任务最终是成功执行的。hadoop 不会尝试诊断或者修复执行慢的任务,相反,在下一个任务运行比预期慢的时候,它会尽量检测,并启动另一个相同的任务作为备份,这就是所谓的推测执行。

我们很容易想到的是,如果同时启动了两个重复的任务,对于资源的开销时比较大,且相同任务之间会产生互相竞争(比如资源,当同任务在同一节点启动时的内存、cpu等),这都不是我们想要的,在什么情况下对某个任务启动推测执行的时机是没有绝对的,我们可以相对于各自类型任务(map和reduce)的平均执行进度作为一个基准【比如,map阶段的任务平均耗时10s,当一个map任务执行了20S还没有执行完成,那我们有理由相信这个任务可能是出问题,我们应该启动一个副本任务】。在此基准上将执行速度明显低于平均水平的那一部分任务进行推测执行副本。当一个推测执行的任务完成之后,其他正在运行的重复任务都将被中止执行,这个很容易理解,我们已经得到了想要的结果,就不需要副本任务了。

但是推测执行开启就一定是好事么?小可爱们可以思考下,什么情况下推测执行会产生负面的影响。

我们都清楚推测执行是需要开启相同的任务,那就是一个任务需要的资源是更多的,那对于一个繁忙的集群,执行推测执行就会消耗更多的资源,减少了集群整体的吞吐量,因而此时推测执行对于整体而言是不利的,就是因为一个任务慢导致整个集群都慢。

对于reduce任务,我们知道Reduce任务是需要将Map任务的输出结果汇集之后,如果reduce任务有大量的任务推测执行,对于集群的网络IO是会产生较大的影响,对于集群整体也会产生影响。另外,reduce可能会产生数据倾斜,对于此,一个reduce任务因为数据散列问题本身执行就慢,开启推测执行反而不会有积极的影响。因此,对于reduce任务,关闭推测执行是相对好的选择。

第五阶段:作业完成

当application master收到作业最后一个任务已完成的通知后,便把作业的状态设置为成功。然后,在Job轮询状态时,便知道任务已成功完成。于是Job打印一条消息告知用户,然后从waitForCompletion()方法返回。Job的统计信息和计数值也在这个时候输出到控制台。

如果我们希望当任务执行完成之后,application master可以主动通知我们,而不是等待Job轮询才能获取到任务的完成状态。可以在application master进行相应的设置,这时application master会发送一个HTTP作业通知。客户端通过设置属性【mapreduce.job.end-notification.url】进行相应的设置。

最后,作业完成时,各个分配的container和application master会清理其工作状态,任务运行期间的中间输出将被删除,OutputCommitter的commitJob()方法会被调用。作业信息有作业历史服务器存档,以便日后需要时查看。

在这我们就已经走完了MR程序整个运行过程,对于其中的部分细节我们在下边在来介绍一下。

Tips 知识点:进度和状态更新

MapReduce作业是长时间运行的批量作业,运行时间范围从数秒到数小时。在这个很长的时间内,用户想要去获取关于作业的一些运行反馈是很重要的。一个作业和它的每个任务都有一个状态(status),包括:作业或任务的状态(比如:运行中,成功完成,失败)、map和reduce的进度、作业计数器的值、状态消息或描述(可以由用户代码设置)。之前我们也有提到用户是可以随时查看作业的状态信息的,那这些状态是怎么通过客户端进行通信得到的呢?

任务在运行时,对其进度(progress,即任务完成百分比)保持追踪。对Map任务,任务进度是已处理输入所占的比例。对reduce任务,情况会复杂一些,但系统仍然会估计已处理reduce输入的比例。整个过程分为三部分,与shuffle的三个阶段相对应。比如,如果任务已经执行reducer一半的输入,那么任务的进度便是5/6,这是因为已经完成复制和排序阶段(每个占1/3),并且完成reduce阶段的一半(1/6)。

任务也有一组计数器,负责对任务运行过程中各个时间进行计数,这些计数器要么内置框架中,比如写入的map输出记录数,要么是用户自定义的。

当map任务或reduce任务运行时,子进程和自己的父application master通过umbilical接口通信。默认每隔3秒,任务通过这个umbilical接口向自己的application master报告进度和状态(包括计数器),application master会形成一个作业的汇聚视图。注意:application master不会主动获取任务的进度,是被动接受task任务的上报。

在作业期间,客户端每秒钟轮训一次application master以接收最新状态(轮询间隔通过mapreduce.client,progressmonitor.ploointerval设置)。客户端也可以通过使用job的getStatus()方法得到一个JobStatus实例,其内包含作业的所有状态信息。

4.MR各组成部分工作机制原理

4.1概览:

4.2 MapTask工作机制

MapTask的整体计算如上图所示,共分为5个阶段,如下:

其中最重要的部分是输出结果在内存和磁盘中的组织方式,具体涉及Collect、Spill和Combine三个阶段,对于这三个阶段我们介绍时会深入介绍。

(1)Read阶段:MapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。

(2)Map阶段:该阶段主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。

(3)Collect收集阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。 输出深入讲解 当map函数处理完一对key/value产生新的key/value后,会调用collect()函数输出结果。在输出结果时,OutputCollecter对象会根据作业是否有Reduce Task进行不同的处理,如果没有Reduce Task阶段,则把结果直接输出到HDFS。如果后续有对应的Reduce Task,则开始组织封装结果:

  • 1.获取对应记录的分区号partition,然后写到环形缓冲区;
  • 2.环形缓冲区中,当数据写入到一定阈值后,会有专属的写出线程(SpillThread)将数据写到一个临时文件中此操作称之为落盘,当所有数据处理完毕后,对所有临时文件进行一次合并以生成一个最终文件。环形缓存区使得Collect阶段和Spill阶段可以并行进行。
  • 3.数据(新的key/value)写入是由两部分组成,索引和真实key/value。通过让索引和数据共享环形缓冲区,提升整个缓冲区的效率:存放如下

指针equator,该指针界定了索引和数据的共同起始存放位置,从该位置开始,索引和数据分别沿相反的方向增长内存使用空间。当内存使用达到80%(默认情况下) 的时候,落盘也是从该指针开始读取数据到指定的位置,数据和索引的落盘是分开进行的,索引落盘一般是当索引大小超过1MB才开始进行落盘。

(4)Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情

步骤1:利用快速排序算法对缓存区区间内的数据进行排序,排序方式是,先按照分区编号Partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。

环形缓冲区认知

(5)Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

​ 当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

​ 在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

​ 让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

4.3 ReduceTask工作机制

(1)Copy阶段/Shuffle阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

总体上看,Shuffle&Merge阶段可进一步划分为三个子阶段。 (1)准备运行完成的Map Task列表: GetMapEventsThread线程周期性通过RPC从TaskTracker获取已完成Map Task列表,并保存到映射表mapLocations(保存了TaskTracker Host与已完成任务列表的映射关系)中。为防止出现网络热点,Reduce Task通过对所有TaskTracker Host进行“混洗”操作以打乱数据拷贝顺序,并将调整后的Map Task输出数据位置保存到scheduledCopies列表中。 (2)远程拷贝数据 Reduce Task同时启动多个MapOutputCopier线程,这些线程从scheduledCopies列表中获取Map Task输出位置,并通过HTTP Get远程拷贝数据。对于获取的数据分片,如果大小超过一定阈值,则存放到磁盘上,否则直接放到内存中。 (3)合并内存文件和磁盘文件 为了防止内存或者磁盘上的文件数据过多,Reduce Task启动了LocalFSMerger和InMemFSMergeThread两个线程分别对内存和磁盘上的文件进行合并。

(3)Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。

(4)Reduce阶段:reduce()函数将计算结果写到HDFS上。

前面提到,各个Map Task已经事先对自己的输出分片进行了局部排序,因此,Reduce Task只需进行一次归并排序即可保证数据整体有序。为了提高效率,Hadoop将Sort阶段和Reduce阶段并行化。在Sort阶段,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向该小顶堆根节点的迭代器,且该迭代器保证了以下两个约束条件:

1.磁盘上文件数目小于io.sort.factor(默认是10)。

2.当Reduce阶段开始时,内存中数据量小于最大可用内存(JVM Max HeapSize)的mapred.job.reduce.input.buffer.percent(默认是0)。

在Reduce阶段,Reduce Task不断地移动迭代器,以将key相同的数据顺次交给reduce()函数处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程,这样,Sort和Reduce可并行进行。

4.4shuffle 阶段

4.4.1 定义:

我们将MR过程中,将Map输出作为输入传给reducer的过程成为shuffle,但一般情况下,我们把从map端产生输出到reduce消化输入的整个过程都称之为shuffle。

大致流程如下:

也就是说shuffle阶段包括了从Map端Collect阶段开始一直到Reduce端Sort阶段的整个过程,也可以看出这是整个MR的核心过程,在生产中优化MR更多的是在shuffle阶段的各个过程做文章,提高整个MR的处理效率。

小结:

通过这篇文章我们了解了什么是MapReduce。在Hadoop中MapReduce是计算处理的逻辑关键,而Shuffle阶段又是整个MR的核心关键,Shuffle是部分Map task阶段和Reduce Task阶段的处理过程诚挚为Shuffle阶段。对于我们进行MR调优的大部分操作都是在Shuffle阶段的机制中去优化各个节点的处理,进而提升MR的处理效率。针对于如何调优MR,我们会在下个篇章进行一些生产中常见的调优的策略,而对于MR的理解能帮助我们更好的去进行调优处理。

好了,今天的文章就到这里结束了。路漫漫其修远兮,吾将上下而求索。希望这篇文章对于大家有所帮助。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: