您的位置:首页 > 其它

map和reduce 个数的设定 (Hive优化)经典

2013-04-01 16:32 246 查看
一、 控制hive任务中的map数:

1. 通常情况下,作业会通过input的目录产生一个或者多个map任务。

主要的决定因素有: input的文件总个数,input的文件大小,集群设置的文件块大小(目前为128M, 可在hive中通过setdfs.block.size;命令查看到,该参数不能自定义修改);

2. 举例:

a) 假设input目录下有1个文件a,大小为780M,那么hadoop会将该文件a分隔成7个块(6个128m的块和1个12m的块),从而产生7个map数

b) 假设input目录下有3个文件a,b,c,大小分别为10m,20m,130m,那么hadoop会分隔成4个块(10m,20m,128m,2m),从而产生4个map数

即,如果文件大于块大小(128m),那么会拆分,如果小于块大小,则把该文件当成一个块。

3. 是不是map数越多越好?

答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,

而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。

而且,同时可执行的map数是受限的。

4. 是不是保证每个map处理接近128m的文件块,就高枕无忧了?

答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,

如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。

针对上面的问题3和4,我们需要采取两种方式来解决:即减少map数和增加map数;

如何合并小文件,减少map数?

假设一个SQL任务:

Select count(1) from popt_tbaccountcopy_mes where pt =‘2012-07-04’;

该任务的inputdir /group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04

共有194个文件,其中很多是远远小于128m的小文件,总大小9G,正常执行会用194个map任务。

Map总共消耗的计算资源: SLOTS_MILLIS_MAPS= 623,020

我通过以下方法来在map执行前合并小文件,减少map数:

set mapred.max.split.size=100000000;

setmapred.min.split.size.per.node=100000000;

setmapred.min.split.size.per.rack=100000000;

sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

再执行上面的语句,用了74个map任务,map消耗的计算资源:SLOTS_MILLIS_MAPS=333,500

对于这个简单SQL任务,执行时间上可能差不多,但节省了一半的计算资源。

大概解释一下,100000000表示100M, sethive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;这个参数表示执行前进行小文件合并,

前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),

进行合并,最终生成了74个块。

ps:Map的个数取决于对输入切割(split)的份数 •压缩文件不可切割 –Map数等于输入的文件数 •非压缩文件及Sequence文件可切割

CombineFileInputFormat是一个被推荐使用的InputFormat. 它有三个配置:

mapred.min.split.size.per.node, 一个节点上split的至少的大小

mapred.min.split.size.per.rack 一个交换机下split至少的大小

mapred.max.split.size 一个split最大的大小

它的主要思路是把输入目录下的大文件分成多个map的输入, 并合并小文件, 做为一个map的输入. 具体的原理是下述三步:

1.根据输入目录下的每个文件,如果其长度超过mapred.max.split.size,以block为单位分成多个split(一个split是一个map的输入),每个split的长度都大于mapred.max.split.size, 因为以block为单位, 因此也会大于blockSize, 此文件剩下的长度如果大于mapred.min.split.size.per.node, 则生成一个split,
否则先暂时保留.

2. 现在剩下的都是一些长度效短的碎片,把每个rack下碎片合并, 只要长度超过mapred.max.split.size就合并成一个split, 最后如果剩下的碎片比mapred.min.split.size.per.rack大, 就合并成一个split, 否则暂时保留.

3. 把不同rack下的碎片合并, 只要长度超过mapred.max.split.size就合并成一个split, 剩下的碎片无论长度, 合并成一个split.

举例: mapred.max.split.size=1000

mapred.min.split.size.per.node=300

mapred.min.split.size.per.rack=100

输入目录下五个文件,rack1下三个文件,长度为2050,1499,10, rack2下两个文件,长度为1010,80. 另外blockSize为500.

经过第一步, 生成五个split: 1000,1000,1000,499,1000. 剩下的碎片为rack1下:50,10; rack2下10:80

由于两个rack下的碎片和都不超过100, 所以经过第二步, split和碎片都没有变化.

第三步,合并四个碎片成一个split, 长度为150.

如果要减少map数量, 可以调大mapred.max.split.size, 否则调小即可.

其特点是: 一个块至多作为一个map的输入,一个文件可能有多个块,一个文件可能因为块多分给做为不同map的输入, 一个map可能处理多个块,可能处理多个文件。

如何适当的增加map数?

当input的文件都很大,任务逻辑复杂,map执行非常慢的时候,可以考虑增加Map数,来使得每个map处理的数据量减少,从而提高任务的执行效率。

假设有这样一个任务:

Select data_desc,

count(1),

count(distinct id),

sum(casewhen …),

sum(casewhen ...),

sum(…)

from a group by data_desc

如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,

这样就可以用多个map任务去完成。

set mapred.reduce.tasks=10;

create table a_1 as

select * from a

distribute by rand(123);

这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。

每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。

看上去,貌似这两种有些矛盾,一个是要合并小文件,一个是要把大文件拆成小文件,这点正是重点需要关注的地方,

根据实际情况,控制map数量需要遵循两个原则:使大数据量利用合适的map数;使单个map任务处理合适的数据量;

二、 控制hive任务的reduce数:

1. Hive自己如何确定reduce数:

reduce个数的设定极大影响任务执行效率,不指定reduce个数的情况下,Hive会猜测确定一个reduce个数,基于以下两个设定:

hive.exec.reducers.bytes.per.reducer(每个reduce任务处理的数据量,默认为1000^3=1G)

hive.exec.reducers.max(每个任务最大的reduce数,默认为999)

计算reducer数的公式很简单N=min(参数2,总输入数据量/参数1)

即,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务;

如:select pt,count(1) from popt_tbaccountcopy_mes where pt ='2012-07-04' group by pt;

/group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04总大小为9G多,因此这句有10个reduce

zyy ps:作者有误,如果reduce的输入(map的输出)总大小不超过1G,那么只会有一个reduce任务-这句不对,MR估算时不是map的输出(他怎么估算?!)而是map的输入数据量大小!所以才有
/group/p_sdo_data/p_sdo_data_etl/pt/popt_tbaccountcopy_mes/pt=2012-07-04总大小为9G多,因此这句有10个reduce

2. 调整reduce个数方法一:

调整hive.exec.reducers.bytes.per.reducer参数的值;

set hive.exec.reducers.bytes.per.reducer=500000000; (500M)

select pt,count(1) from popt_tbaccountcopy_mes where pt ='2012-07-04' group by pt; 这次有20个reduce

3. 调整reduce个数方法二;

set mapred.reduce.tasks = 15;

select pt,count(1) from popt_tbaccountcopy_mes where pt ='2012-07-04' group by pt;这次有15个reduce

4. reduce个数并不是越多越好;

同map一样,启动和初始化reduce也会消耗时间和资源;

另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

5. 什么情况下只有一个reduce;

很多时候你会发现任务中不管数据量多大,不管你有没有设置调整reduce个数的参数,任务中一直都只有一个reduce任务;

其实只有一个reduce任务的情况,除了数据量小于hive.exec.reducers.bytes.per.reducer参数值的情况外,还有以下原因:

a) 没有groupby的汇总,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt ='2012-07-04' group by pt; 写成 select count(1) frompopt_tbaccountcopy_mes where pt = '2012-07-04';

这点非常常见,希望大家尽量改写。

b) 用了Orderby

c) 有笛卡尔积

通常这些情况下,除了找办法来变通和避免,我暂时没有什么好的办法,因为这些操作都是全局的,所以hadoop不得不用一个reduce去完成;

同样的,在设置reduce个数的时候也需要考虑这两个原则:使大数据量利用合适的reduce数;使单个reduce任务处理合适的数据量;

ps:


一些概念

maptask的个数由输入文件大小决定,所以选择合适的reducer的个数对充分利用Hadoop集群的性能有重要的意义。

Hadoop中每个task均对应于tasktracker中的一个slot,系统中mapperslots总数与reducerslots总数的计算公式如下:

mapperslots总数=集群节点数×mapred.tasktracker.map.tasks.maximum (our default is 18)

reducerslots总数=集群节点数×mapred.tasktracker.reduce.tasks.maximum

设置reducer的个数比集群中全部的reducerslot略少可以使得全部的reducetask可以同时进行,而且可以容忍一些reducetask失败。

•old-mr-api 的切割大小的影响参数

–mapred.map.tasks 期望的map个数 默认值:1

–mapred.min.split.size 切割出的split最小size 默认:1

•old-mr-api的切割大小算法

–splitSize = max[minSize, min(goalSize, blockSize)]

–minSize = ${mapred.min.split.size}

–goalSize = totalSize / ${mapred.map.tasks}

•mapred.map.tasks可以增大map数 •mapred.min.split.size可以减少map数
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: