您的位置:首页 > 运维架构

MaxCompute OpenMR的7个性能优化策略

2017-03-14 18:42 447 查看

原文链接:http://click.aliyun.com/m/14005/
1. 输入表的列裁剪

对于列数特别多的输入表,Map阶段处理只需要其中的某几列,可以通过在添加输入表时明确指定输入的列,减少输入量;
例如只需要c1,c2俩列,可以这样设置:
InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);
设置之后,你在map里的读取到的Record也就只有c1,c2俩列,如果之前是使用列名获取Record数据的,不会有影响,而用下标获取的需要注意这个变化。

2. 减少中间环节

如果有多个MR作业,之间有关联关系,前一个作业的输出是后一个作业的输入,可以考虑采用Pipeline的模式,将多个串行的MR作业合并为一个,这样可以用更少的作业数量完成同样的任务,一方面减少中间落表造成的的多余磁盘IO,提升性能;另一方面减少作业数量使调度更加简单,增强流程的可维护性。具体使用方法参见Pipeline示例

3. 避免资源重复读取

资源的读取尽量放置到setup阶段读取,避免资源的多次读取的性能损失,另外系统也有64次读取的限制,资源的读取参见使用资源示例

4. 减少对象构造开销

对于Map/Reduce阶段每次都会用到的一些java对象,避免在map/reduce函数里构造,可以放到setup阶段,避免多次构造产生的开销;
{
...
Record word;
Record one;

public void setup(TaskContext context) throws IOException {

// 创建一次就可以,避免在map中每次重复创建
word = context.createMapOutputKeyRecord();

one = context.createMapOutputValueRecord();

one.set(new Object[]{1L});

}
...
}

5. 合理选择partition column或自定义partitioner

合理选择partition columns,可以使用JobConf#setPartitionColumns这个方法进行设置(默认是key schema定义的column),设置后数据将按照指定的列计算hash值分发到reduce中去, 避免数据倾斜导致作业长尾现象,如有必要也可以选择自定义partitioner,自定义partitioner的使用方法如下:
import com.aliyun.odps.mapred.Partitioner;

public static class MyPartitioner extends Partitioner {@Overridepublic int getPartition(Record key, Record value, int numPartitions) {  // numPartitions即对应reducer的个数
// 通过该函数决定map输出的key value去往哪个reducer
String k = key.get(0).toString();  return k.length() % numPartitions;
}
}
在jobconf里进行设置:
jobconf.setPartitionerClass(MyPartitioner.class)
另外需要在jobconf里明确指定reducer的个数:
jobconf.setNumReduceTasks(num)

6. 合理使用combiner

如果map的输出结果中有很多重复的key,可以合并后输出,combine后可以减少网络带宽传输和一定shuffle的开销,如果map输出本来就没有多少重复的,就不要用combiner,用了反而可能会有一些额外的开销。combiner实现的是和reducer相同的接口,例如一个WordCount程序的combiner可以定义如下:
/**
* A combiner class that combines map output by sum them.
*/
public static class SumCombiner extends ReducerBase {

private Record count;    @Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}    @Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long c = 0;      while (values.hasNext()) {
Record val = values.next();
c += (Long) val.get(0);
}
count.set(0, c);
context.write(key, count);
}
}

7. 设置合理的split size

map默认的split size是256MB,split size的大小决定了map的个数多少,如果用户的代码逻辑比较耗时,map需要较长时间结束,可以通过JobConf#setSplitSize方法适当调小split size的大小。然而split size也不宜设置太小,否则会占用过多的计算资源。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  文章 资源 影响