hadoop之定制自己的sort过程
2015-07-31 20:52
441 查看
Key排序
1. 继承WritableComparator在hadoop之Shuffle和Sort中,可以看到mapper的输出文件spill文件需要在内存中排序,并且在输入reducer之前,不同的mapper的数据也会排序,排序是根据数据的key进行的.
如果key是用户自定义的类型,并没有默认的比较函数时,就需要自己定义key的比较函数,也就是继承WritableComparator.事例代码如下:
public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数 // 这里是先比较年份,再比较温度,按温度降序排序 int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse } }
例子中对IntPair定义了新的compare函数,并在main函数中通过下面的方式实现替换:
job.setSortComparatorClass(KeyComparator.class);
2.实现 WritableComparable接口
看下面的例子代码:
static class NewK2 implements WritableComparable<NewK2>{ Long first; Long second; public NewK2(){} public NewK2(long first, long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; if(minus !=0){ return (int)minus; } return (int)(this.second - o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } }
如果是按照上述的例子实现的,不需要在main函数中设置其他的代码.
Group排序
一般来说,如果用户自定义了key的排序过程,那么在reducer之前的对数据进行分组的过程就要重新编写,而且一般来说,partitioner也需要重新定义,请参考hadoop之定制自己的Partitioner .shuffle阶段,虽然使用的是hash的方法,我们并不能保证映射到同一个reducer的key的hash值都是一样的,对于不同的hash值要进行分群,然后再执行reduce.下面是自定义groupcomparator的例子:
public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里是按key的第一个参数来聚合,就是年份 return IntPair.compare(ip1.getFirst(), ip2.getFirst()); } }
例子中实现了对于IntPair类型的分群比较函数的重新定义.在main函数中通过下面的方式进行调用:
job.setGroupingComparatorClass(GroupComparator.class);
二次排序
下面是对地区温度进行的统计,要求输出各个年份的最大温度,例子中定制了自己的partitioner:FirstPartitioner来对组合后的类型进行分组,实际上还是按照年份进行的分组;定制了自己的keycomparator:KeyComparator,先比较年份,然后再比较温度;定制了自己的分群比较类:GroupComparator,也是按照年份进行分群,然后扔给reducer进行处理.值得一提的是,为什么不用传统的mapreduce,按照年份进行进行map,然后在reduce中,遍历每年不同的温度,找到最大值呢?原因之一就是效率的问题,sort操作本身就要在MP框架中执行,而且已经做了很多优化,通过设置比较的不同手段,很容易实现比较,然而在reducer处理中进行遍历,显然比上面的sort过程要慢.下面是例子的完整代码,摘自Hadoop- The Definitive Guide, 4th Edition.
public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {
// Map任务
static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> {
private NcdcRecordParser parser = new NcdcRecordParser();
public void map(LongWritable key, Text value,
OutputCollector<IntPair, NullWritable> output, Reporter reporter)
throws IOException {
parser.parse(value); // 解析输入的文本
if (parser.isValidTemperature()) {
// 这里把年份与温度组合成一个key,value为空
output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get());
}
}
}
// Reduce任务
static class MaxTemperatureReducer extends MapReduceBase
implements Reducer<IntPair, NullWritable, IntPair, NullWritable> {
public void reduce(IntPair key, Iterator<NullWritable> values,
OutputCollector<IntPair, NullWritable> output, Reporter reporter)
throws IOException {
// 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列
// 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值
output.collect(key, NullWritable.get());
}
}
// 切分器,这里是按年份* 127 % reduceNum来进行切分的
public static class FirstPartitioner
implements Partitioner<IntPair, NullWritable> {
@Override
public void configure(JobConf job) {}
@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
// 聚合key的一个比较器
public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数 // 这里是先比较年份,再比较温度,按温度降序排序 int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse } }
// 设置聚合比较器
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
// 这里是按key的第一个参数来聚合,就是年份
return IntPair.compare(ip1.getFirst(), ip2.getFirst());
}
}
@Override
public int run(String[] args) throws IOException {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}
job.setMapperClass(MaxTemperatureMapper.class);
job.setPartitionerClass(FirstPartitioner.class);
job.setSortComparatorClass(KeyComparator.class);
job.setGroupingComparatorClass(GroupComparator.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(IntPair.class); // 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了.
job.setOutputValueClass(NullWritable.class); // 输出的value为NULL,因为这里的实际value已经组合到了key中
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
System.exit(exitCode);
}
}
相关文章推荐
- hadoop之定制自己的Partitioner
- hadoop之Shuffle和Sort
- Linux自动共享USB设备:udev+Samba
- Tomcat免安装版的环境变量配置以及Eclipse下的Tomcat配置和测试
- hadoop学习笔记---20150731
- opencv Remap 图像的映射
- arm-linux-androideabi-gcc 预定义宏(编译器版本4.8)
- opencv 霍夫圆检测
- Nginx访问日志管理
- Concurrency model and Event Loop
- opencv 霍夫线变换
- opencv copyto函数
- linux如何设置静态ip
- 转)Linux学习路线
- OpenCV函数 Canny 检测边缘
- OpenCV函数 Laplacian 算子实现
- opencv sobel导数
- opencv图像边界的填充
- opencv 线性滤波器
- linux服务器可以ping通,但是访问不了--Ip地址:端口--