您的位置:首页 > 运维架构

对Hadoop二次排序的理解

2012-02-24 18:58 197 查看
MR默认会对键进行排序,然而有的时候我们也有对值进行排序的需求。满足这种需求一是可以在reduce阶段排序收集过来的values,但是,如果有数量巨大的values可能就会导致内存溢出等问题,这就是二次排序应用的场景——将对值的排序也安排到MR计算过程之中,而不是单独来做。

首先我们要了解java中排序的机制,在java中一切都是对象,而在我们的基本数据类型中都有本类的比较策略,这样任何的比较操作都要依赖于这个策略,所以问题也就转化到实现我们自己的策略上来了——策略模式?

Hadoop的IO系统中,自己定义了许多Writable的接口,比如IntWritable,Text等等,这些类里面都通过实现WritableComparator接口提供了自己的比较策略,而这个接口继承自RawComparator接口,也就是最原始的比较器接口,这个原始比较器中未实现的一些方法,允许(或者要求)我们去实现,给优化效率提供了一个钩子,比如我们对于某种数据类型中前4个字节感兴趣,我们就可以只取前4个字节,而不用拿全部。

在权威指南上有个例子,是关于求最大温度的,我做了一点变化,并做了一些实验,通过实现分区(Partitioner)策略可以将具有某些特征的键分配到同一个reduce中;通过实现一个分组(Group)策略,可以保证具有某些特征的键的值会被组合到一起(组合在一起后,也就是reduce阶段中的那个可迭代对象);然后最后实现了一个排序(Sort)策略,这也就是所谓的关键所在了,我们可以提供某种策略来控制键的排序。

在工作曾经做过一个估计用户在线时长的MR,当时没有考虑到效率问题,将排序全部压在reduce阶段,现在想来确实是个隐患,如果有时间的话最好可以做一下修改,改为二次排序方式。

以下是实验的代码:

首先是新定义的数据结构代码:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class IntPair implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;

public String toString(){
return first + "\t" + second;
}

/**
* Set the left and right values.
*/
public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}
/**
* Read the two integers.
* Encoded as: MIN_VALUE -> 0, 0 -> -MIN_VALUE, MAX_VALUE-> -1
*/
@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt() + Integer.MIN_VALUE;
second = in.readInt() + Integer.MIN_VALUE;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first - Integer.MIN_VALUE);
out.writeInt(second - Integer.MIN_VALUE);
}
@Override
public int hashCode() {
return first * 157 + second;
}
@Override
public boolean equals(Object right) {
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}
/** A Comparator that compares serialized IntPair. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntPair.class);
}

public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}

static { // register this comparator
WritableComparator.define(IntPair.class, new Comparator());
}

@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}

public static int compare(int left, int right) {
// TODO Auto-generated method stub
return left > right ? 1 : (left == right ? 0 : -1);
}

}

以下是MR代码:

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.RawComparator;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class UseSecondarySort extends Configured implements Tool{

public static class Map extends Mapper<LongWritable, Text, IntPair, NullWritable>{

private final IntPair key = new IntPair();

public void map(LongWritable inkey, Text invalue, Context context) throws IOException, InterruptedException{

StringTokenizer itr = new StringTokenizer(invalue.toString());

int left = 0;

int right = 0;

if(itr.hasMoreTokens()){

left = Integer.parseInt(itr.nextToken());

if(itr.hasMoreTokens()){

right = Integer.parseInt(itr.nextToken());

}

}

key.set(left, right);

context.write(key, NullWritable.get());

}

}

public static class Reduce extends Reducer<IntPair, NullWritable, IntPair, Text>{

private static final IntPair sign = new IntPair();

public void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException{

sign.set(888888, 888888);

context.write(sign,new Text(this.toString()));

for(NullWritable it : values)

context.write(key, new Text(this.toString()));

}

}

public static class FirstPartitioner extends Partitioner<IntPair, NullWritable>{

@Override

public int getPartition(IntPair key, NullWritable value, int numPartitions) {

return Math.abs(key.getFirst() * 127) % numPartitions;

}

}

public static class FirstGroupingComparator

implements RawComparator<IntPair> {

@Override

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8);

}

@Override

public int compare(IntPair o1, IntPair o2) {

int l = o1.getFirst();

int r = o2.getFirst();

return l == r ? 0 : (l < r ? -1 : 1);

}

}

public static class KeyComparator extends WritableComparator{

protected KeyComparator() {

super(IntPair.class, true);

// TODO Auto-generated constructor stub

}

@Override

public int compare(WritableComparable o1, WritableComparable o2) {

// TODO Auto-generated method stub

IntPair ip1 = (IntPair) o1;

IntPair ip2 = (IntPair) o2;

int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());

if(cmp != 0)

return cmp;

return -IntPair.compare(ip1.getSecond(), ip2.getSecond());

}

// @Override

// public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

// // TODO Auto-generated method stub

// return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);

// //return 0;

// }

}

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

int ret = ToolRunner.run(new UseSecondarySort(), args);

System.exit(ret);

}

@Override

public int run(String[] args) throws Exception {

// TODO Auto-generated method stub

Job job = new Job(getConf());

job.setJobName("UseSecondarySort");

job.setJarByClass(UseSecondarySort.class);

job.setMapOutputKeyClass(IntPair.class);

job.setMapOutputValueClass(NullWritable.class);

job.setOutputKeyClass(IntPair.class);

job.setOutputValueClass(Text.class);

job.setMapperClass(Map.class);

job.setReducerClass(Reduce.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setNumReduceTasks(2);

job.setPartitionerClass(FirstPartitioner.class);

job.setGroupingComparatorClass(FirstGroupingComparator.class);

job.setSortComparatorClass(KeyComparator.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean success = job.waitForCompletion(true);

return success ? 0 : 1;

}

}

原文链接:/article/3645524.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: