您的位置:首页 > 运维架构

Hadoop2.6.0官方MapReduce文档翻译 之 一

2016-04-10 18:53 447 查看

一、前提条件:

    1、已经安装了Hadoop,并且正确配置了运行环境,Hadoop已经正常运行中;

二、概述:

        Hadoop MapReduce是一套软件框架,可以轻松编写程序处理大数据集(几千兆的数据集)的合计问题, 程序能并行在可靠的,可容错的大集群(成千个节点)商用硬件上。
        "MapReduce 工作"(MapReduce job)通过许多完全并行的“map任务”(map task),将输入的数据集处理成为许多独立的“数据块”(chunk)。
        "map任务"(Maps)处理完成后,将结果输出到“reduce任务”(reduce task)。通常输入和输出都存储在hadoop文件系统中。
        框架负责调度任务,监视任务运行情况和重启失败的任务。
        通常“计算节点”(compute node)和“存储节点”(storage node)是一样的,因为MapReduce框架和Hadoop分布式文件系统是运行在相同的节点配置上。相同配置的好处是,框架可以很方便的把调度作业,实施在有数据的节点上运行,避免过度消耗集群间的带宽。
        MapReduce框架由一个主节点的ResuorceManager,每个隶属节点的NodeManager和每个应用程序的 MRAppMaster组成的。
        最低限度,应用程序需要指定Input和Output的位置,同时还要通过接口或者虚类实现map和reduce方法。所有这些和其他的参数,都将包含在Job配置(Job Configuration)中。
        然后,Hadoop作业客户端(Hadoop job client),向ResourceManager提交作业(作业可以是jar包,也可以是可执行文件)及配置,ResourceManager负责将作业及配置分配到隶属节点(slaves),然后调整任务和监视任务的执行状态,并将任务运行状态、监听信息提供给Hadoop作业客户端。
        虽然,Hadoop框架是有java写的,但是MapReduce程序不一定要用Java来写。
        比如:
        Hadoop Streaming就是一个通用的方式,允许用户通过任何可运行的Mapper和Reducer来运行作业(如:Shell Utilities)。
        Hadoop Pipes 是一个SWIG工具,兼容C++应用程序接口(C++ API)实现MapReduce应用程序。

三、Input和Output介绍:    

        MapReduce框架唯一操作方式就是基于对“键-值”(<key,value>)对的操作。MapReduce框架视向job输入的数据集为“键-值”对,Job输出的结果集也为“键-值”对,“键-值”可以是不同的类型。
        key和value的类型必须是框架能够序列化的类型,因此,该类型必须实现Writable接口。此外,key的类型还必须实现WritableComparable,以方便框架进行分类。
        Input和Output的工作流程如下:
        (Input)<k1,v1> -> map<k2,v2> -> combine<k2,v2> -> reduce<k3,v3>(Output)

四、WordCount实例:   

        在进入细节之前,我们来通过一个MapReduce实例,让大家了解一下MapReduce是如何工作的。WordCount程序,实现单词出现次数的统计功能。
        1、设置好环境变量:~/.bashrc;
        2、编译java文件,生成class文件;
        3、打包class文件为jar文件;
        4、运行jar文件中的class;

五、WordCount实例解析过程:

        1、Mapper的实现,通过map方法,每次只处理一行输入数据,由指定的TextInputFormat提供行数据;
        2、通过“空格”分隔符,取得单词,输入<key,value>;
        3、(本地聚合)指定combiner ,每个map的输出,都会经过本地的combiner进行本地聚合(每个job配置中combiner和reducer是一样的),对所有的key进行分类;
        4、Reducer的实现,通过reduce方法,对所有values中出现的每个Key进行合计;
        5、在main方法中,为Job对象指定各方面的设置,诸如:input和output文件目录、key和value数据类型、input和output的format等等。然后通过Job对象的waitForCompletion 提交作业,监视作业的执行情况。

六、MapReduce的用户接口: 

        本章节将介绍MapReduce框架面向用户提供实现的大量细节。这样更有利于用户在更多细节上去实现、配置、调优MapReduce程序。但也请您注意,java帮忙文档中的每个class、interface同样都是最综合的,最有用的学习教程。
        首先,让我们花点时间了解Mapper和Reducer接口。MapReduce应用程序通常以实现Mapper和Reducer接口,来提供map和reduce方法。
        然后,我们还会讨论其他核心接口 ,包括:Job,Partitioner(分割器),InputFormat,OutputFormat,当然还有其他的接口。
        最后,我们将讨论MapReduce框架的特色实现,诸如:DistributedCache,IsolationRunner等等。

6.1 Mapper

        Mapper的maps将输入的key-value键值对,映射到一个中间的key-value结果集中。maps是一个独立的任务,作用是将输入的记录转变为中间记录。转换后的中间记录和输入的记录可以是不同的类型。一个输入的key-value可能会输出0个或者多个key-value。
        Hadoop MapReduce框架,MapReduce作业的InputFormat会产生InputSplit,框架为每个InputSplit生产一个map任务。
        总的来说,Mapper是通过Job.setMapperClass(class)的方式来实现的。然后,框架调用map(WritableComparable,Writable,Context) 去处理InputSplit中的每个key-value。output的key-value
键值对集合,是通过调用context.writ(WritableComparable,Writable) 方法来收集的。产生的所有中间记录的key-value键值对集合,通过框架进行分组处理后,传递给Reducer,由Reducer决定最终输出。 
        Mapper的所有输出,在完成分组后,被分割到每个Reducer中,被分割的数量与作业的Reducer数量是一致。用户也可以通过实现一个自定义的Partitioner(分割器)来控制指定的keys输出给特定的Reducer。一般情况下,用户会通过Job.setCombinerClass(class) 方法指定一个combiner (聚合器),实现对map的输出进行本地化聚合,这样的好处是减少Mapper与Reducer之间的数据传输量。map的输出(如果通过combiner,那就是combiner的输出),一般以(key-len,key,value-len,value) 的格式进行存储。通过 Configuration(配置)设置CompressionCodeC ,应用程序可以控制对输出数据进行压缩。
    Map的数量,通常由输入的文件大小来决定的(即文件被分成几个block (块))。map的并行水平一般是10-100个每节点,最好的map是执行时间少于1分钟的map。假如,要输入一个10TB的文件,每个块是128M,就需要82000个map,除非,通过Configuration.set(MRJobConfig.NUM_MAPS,int) 设置,提高map的大小。
       应用程序可以重写方法cleanup(Context) 方法,来执行想要清除的需求。 
  
        应用程序可以使用Counter 来报告统计数据。
        应用程序可以通过指定Comparator (比较器),来控制框架进行分组,一般是调用Job.setGroupingComparatorClass(class) 方法来实现的。

 6.2 Reducer

        作业的reduce数量可以通过Job.setNumReduceTasks(int) 进行自定义设置。总的来讲,Reducer是通过Job.setReducerClass(class) 方法来实现的,通过重写这个方法来完成初始化工作。然后,框架调用reduce(WritableComparable,Iterable<Writable>,Context)  方法,处理由Mapper传输来的key-value集合( <key,
(list of values)> )。
       应用程序可以重写方法cleanup(Context) 方法,来执行想要清除的需求。

        Reducer有三个主要的阶段:Shuffle(洗牌),Sort (分类)and reduce(归纳)。

6.2.1 Shuffle

        在这个阶段,框架从Http中取得所有Mapper输出的相关分块。

6.2.2 Sort

        在这个阶段,框架按照keys对所有的inputs进行分类(因为不同的Mapper可以输出相同的key)。
        Shuffle和Sort是同时出现的,因为当取出map-outputs的同时,也完成了合并。

6.2.3 Secondary Sort

        如果在reduction之前,要对中间数据集进行分类的key不相同,则可以通过Job.setSortComparatorClass(class) 来指定一个Comparator (比较器)进行处理。因此,Job.setSortComparatorClass(class) 可以用来控制如何对中间数据集进行分类,往往被用于结合模拟二次排序。

6.2.4 Reduce

        在这个阶段,reduce(WritableComparable,Iterable<Writable>,Context) 被框架调用,对输入的分组进行归纳处理。ruduce通过Context.writ(WritableComparator,Writable) 
将output写到FileSystem 文件系统。
        应用程序可以使用Counter 来报告统计数据。
        Reducer的output没有进行排序。
        reduce适合的数量大概是0.95或1.75*(<节点数>*<每个节点最大的容器数(容器数一般指节点的CPU core数量)>)。对于0.95,当map结束时,所有的reduce能够立即启动;对于1.75,较快的节点结束完第一轮reduce后,可以开始第二轮的reduce任务,从而提高负载均衡。增加reduce的数量,会加大框架的开销,但是相应的增强了负载均衡,降低了故障成本。
    如果不需要reduce,没有reduce也是合法的。在这种情况下,map-tasks的output将直接写入FileSystem,output的路径通过FileOutputFormat.setOutputPath(Job,Path) 进行设置。写入FileSystem的output并没有实现分组,而是直接写入。

七、Partitioner:  

        Partitioner控制对中间数据集中的key进行分块。Partitioner通常由一个 hash function 实现。 partition的数量和Job的reduce数量一样。因此,它控制着哪些keys会被发送到哪个reduce。默认的Partitioner为HashPartitioner。

八、Counter:  

        Counter是MapReduce应用程序报告统计的设备。Mapper和Reducer实现可以使用Counter报告统计。Hadoop MapReduce 自带一个普遍的,有用的库:mappers,reducers和partitioner。

九、Job Configuration: 

        Job是一个MapReduce的作业配置。Job是用户描述MapReduce作业在Hadoop 框架中执行的主要接口。框架尝试如实的按照业务描述完成作业的执行,然而:

有一些配置参数,被系统管理员定义为final (详见:Final
Parameters 帮助文档),这些参数不能被修改。
而有一些配置参数,则可以直接设置(如:Job.setNumReduceTasks(int) ),其他的参数之间的相互影响非常微秒,剩余的框架配置和作业配置则更为复杂(如: Configuration.set(JobContext.NUM_MAPS,
int)  )。

    Job通常要指定Mapper,Combine(如果有需要),Partitioner,Reducer,InputFormat,OutputFormat的实现。
    FileInputFormat 表示输入文件集,通过 FileInputFormat.setInputPaths(Job,
Path...) /  FileInputFormat.addInputPath(Job, Path),
 FileInputFormat.setInputPaths(Job, String...) /  FileInputFormat.addInputPaths(Job, String) 
进行设置。
    FileOutputFormat 表示作业输出的存放文件,通过FileOutputFormat.setOutputPath(Path) 进行设置。
        可选的,作业还可以指定一些更先进的设置,诸如:

是否启用Comparator ;
是否启用DistributedCache 存放文件;
是否启用对中间数据集、作业输出进行压缩;
作业的任务是否允许在 Speculative manner(推测模式)下运行,通过 setMapSpeculativeExecution(boolean))/
setReduceSpeculativeExecution(boolean) 进行设置;
每个作业的最大尝试次数,通过  setMaxMapAttempts(int)/ setMaxReduceAttempts(int) 进行设置,等等。

        当然的,用户可以用  Configuration.set(String, String)/ Configuration.get(String) 设置或获取任意的参数。然而,获取大量只读数据,则要用DistributedCache 。

9.1 推展阅读:Speculative  推测模式

        在Hadoop分布式集群环境下,由于负载不均衡或者资源分布不均,亦或者程序Bug,导致同一个job的多个task的运行速度不一致,甚至差别很大(如:一个job的某个task进度只有10%,而其他的task都已经运行完毕)。这种情况就比较糟糕,最慢的task将拖慢整个作业的执行进度。为了避免这种现象,Hadoop会为该task启用Speculative
task ,让 Speculative task 与原来慢的task一起运行,谁快就取谁的结果。Speculative 是通过资源换时间的思路,通过占用更多的资源来提高作业的执行效率。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: