您的位置:首页 > 运维架构

【hadoop学习笔记】How MapReduce Works

2013-03-18 23:09 387 查看
Anatomy of a MapReduce Job Run

mapreduce job的整个工作过程如下:

1. Job Submission

用户通过JobClient.runJob(conf)开始执行一个job(注:最新版本的hadoop,应该是用job.waitForCompeletion(true))。JobClient创建一个JobClient实例并且调用submitJob()来提交job:首先会向jobtracker请求jobID;然后检查该job的output路径,如果output路径已经存在,job不会被提交而且程序抛出异常;接下来对job的input进行分割,分割出错也会导致job失败;前述步骤都成功之后,job的JAR和一些配置文件会被拷贝到jobtracker的文件系统中(job
JAR默认会拷贝10份,这样集群中的tasktracker执行task的时候就可以访问);最后JobClient通过在jobtracker上调用submitJob()提交job。

2. Job Initialization

jobtracker接收到job提交之后,把该job放入一个内部队列中,job调度会从队列里挑选出要执行的job并初始化。初始化包括创建一个代表该job的对象,它封装了job的所有task,还包括一个记录簿用于跟踪task的状态和进度。job scheduler首先从文件系统中获取job input的split,然后为每个split创建一个map task;而reduce的数量取决于用户的配置,默认是1个。

3. Task Assignment

tasktracker的工作是定期地向jobtracker发送心跳。Heartbeats证明该tasktracker正常运行,并且能够作为信息传输的通道,比如告诉jobtracker它现在可以执行task,jobtracker就分配一个task给它。一个tasktracker能够同时运行两个map或者两个reduce。

4. Task Execution

首先,共享的文件系统中的job JAR和相关文件被拷贝到tasktracker的文件系统;然后在本地创建一个工作目录,un-jars job JAR到该目录;最后创建一个TaskRunner实例运行该task。TaskRunner会新建一个JVM执行每个task,所以mapreduce代码里的bug不会影响到tasktracker。

5. Progress and Status Updates

每个job和task都有状态,包括job和task的running, successfully completed, failed描述,map reduce的进度,task counter的值等。task可以通过设置flag来说明它的状态改变会发送给tasktracker,tasktracker会定期地向jobtracker发送心跳,这样所有的task状态改变都会被发送给jobtacker。JobClient通过每秒钟查询jobtacker了解整个job的最新状态。

6. Job Completion

jobtracker接收到所有task成功完成后,将job的状态更新为successfully。

Failures

Task Failure

在这几种情况下task会失败:运行时错误、Streaming process exits with a nonzero code、JVM异常退出。

tasktracker在一段时间内没有接收到进度更新就会标记这个task failed,然后该child JVM被kill掉。task失败后,有10分钟的时间作为缓冲,如果timeout,该slot就被释放用于执行其他task。可以在mapred.task.timeout修改时间,如果设为0就是disable
timeout那么slot一直都不会被释放掉,长久以来会影响集群的效率

task失败后,jobtracker就会重新安排该task执行,但是会避免在之前失败的那个tasktracker上执行。一个task失败4次之后就不会再尝试了,而且整个job都也会失败。这个值可以在mapred.map.max.attempts或mared.reduce.max.attempts修改

Shuffle and Sort

MapReduce需要确保每个reduce的输入是按经过key排序的,这个过程是sort;把Map的输出作为reduce的输入,这个过程是shuffle。

1. The Map Side

每个map task有一个circular memory作为输出的写入,这个buffer默认是100M。当buffer里的内容大小达到了某个临界值,这些内容会溢出到硬盘。map的输出只会不停的往buffer里写,如果buffer满了,map会阻塞直到buffer溢出有足够的空间写入。在写到硬盘之前,首先会把数据按照reducers进行分割,而且对每个partition进行key值排序。每次buffer达到spill
threshold,会创建一个新的spill file,所以当map输出完之后会有多个spill files,在task完成之前这些spill files会合并成一个完成分割和排序的输出文件。

可以使用Combiner函数让map的输出更紧凑,或者是压缩map输出,这样可以减少map输出到reduce输入的数据传输。

2. The Reduce Side

copy phase:reduce在map完成后开始拷贝它的输出作为自己的输入。默认有5个线程能够同时进行reduce input的拷贝(mapred.reduce.parallel.copies)。如果map的输出比较小会直接拷贝到reduce tasktracker的内存里,这样会提高reduce的执行速度,否则会拷贝到硬盘里。

sort(merge) phase:合并map的输出并维护它们的顺序。也就是进行合并和排序。

reduce phase:对没对key-value调用reduce函数,并将输出写入到HDFS。

3. Configuration Tuning

对shuffle进行参数配置以提高MapReduce运行效率。

(1)给shuffle尽量多的内存,但是这样就需要让map和reduce函数尽量少占用内存,避免去做一些消耗内存的计算(mapred.child.java.opts);(2)对于map,减少写入硬盘的spill files的数量,增加io.sort.mb值;(3)对于reduce,如果reduce的intermediate数据都在内存里会是最好。

Task Execution

1. Speculative Execution

在hadoop集群里运行的task失败是很正常的事。为了不给整个集群带来影响,当某个task一直不能完成(该task运行至少一分钟但没有任何进展),Hadoop会另外运行一个等同的task作为原task的备份,这是task的speculative execution。这两个task其中一个提前完成,另外一个就会被kill掉。speculative execution只是作为一种优化,不是为了让job运行更加稳定,因为如果某个task出错,它的一个备份也会出错。

2. Task JVM Reuse

Hadoop的每个task都在单独的JVM上运行。当一个job有大量短时间存活的task时,JVM的重用能够带来效率的提升。mapred.job.reuse.jvm.num.task用来设置一个JVM能运行task的最大数量,默认是1个。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: