您的位置:首页 > 其它

MapReduce中的二次排序

2013-12-19 09:53 190 查看
在MapReduce操作时,我们知道传递的<key,value>会按照key的大小进行排序,最后输出的结果是按照key排过序的。有的时候我们在key排序的基础上,对value也进行排序。这种需求就是二次排序。

我们先看一下Mapper任务的数据处理过程吧,见下图。





在图中,数据处理分为四个阶段:

(1)Mapper任务会接收输入分片,然后不断的调用map函数,对记录进行处理。处理完毕后,转换为新的<key,value>输出。

(2)对map函数输出的<key, value>调用分区函数,对数据进行分区。不同分区的数据会被送到不同的Reducer任务中。

(3)对于不同分区的数据,会按照key进行排序,这里的key必须实现WritableComparable接口。该接口实现了Comparable接口,因此可以进行比较排序。

(4)对于排序后的<key,value>,会按照key进行分组。如果key相同,那么相同key的<key,value>就被分到一个组中。最终,每个分组会调用一次reduce函数。

(5)排序、分组后的数据会被送到Reducer节点。

在MapReduce的体系结构中,我们没有看到对value的排序操作。怎么实现对value的排序哪?这就需要我们变通的去实现这个需求。

变通手段:我们可以把key和value联合起来作为新的key,记作newkey。这时,newkey含有两个字段,假设分别是k,v。这里的k和v是原来的key和value。原来的value还是不变。这样,value就同时在newkey和value的位置。我们再实现newkey的比较规则,先按照key排序,在key相同的基础上再按照value排序。在分组时,再按照原来的key进行分组,就不会影响原有的分组逻辑了。最后在输出的时候,只把原有的key、value输出,就可以变通的实现了二次排序的需求。

下面看个例子,结合着理解。

假设有以下输入数据,这是两列整数,要求先按照第一列整数大小排序,如果第一列相同,按照第二列整数大小排序。

20    21
50    51
50    52
50    53
50    54
60    51
60    53
60    52
60    56
60    57
70    58
60    61
70    54
70    55
70    56
70    57
70    58


分析一下, 这是一个典型的二次排序问题。

我们先对现在第一列和第二列整数创建一个新的类,作为newkey,代码如下

/**
* 把第一列整数和第二列作为类的属性,并且实现WritableComparable接口
*/
public static class IntPair implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;

public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}

@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt();
second = in.readInt();
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first);
out.writeInt(second);
}
@Override
public int hashCode() {
return first+"".hashCode() + second+"".hashCode();
}
@Override
public boolean equals(Object right) {
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}
//这里的代码是关键,因为对key排序时,调用的就是这个compareTo方法
@Override
public int compareTo(IntPair o) {
if (first != o.first) {
return first - o.first;
} else if (second != o.second) {
return second - o.second;
} else {
return 0;
}
}
}


一定要注意上面的compareTo方法,先按照first比较,再按照second比较。在以后调用的时候,key就是first,value就是second。

下面看一下分组比较函数,代码如下

/**
* 在分组比较的时候,只比较原来的key,而不是组合key。
*/
public static class GroupingComparator implements RawComparator<IntPair> {
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return WritableComparator.compareBytes(b1, s1, Integer.SIZE/8, b2, s2, Integer.SIZE/8);
}

@Override
public int compare(IntPair o1, IntPair o2) {
int first1 = o1.getFirst();
int first2 = o2.getFirst();
return first1 - first2;
}
}


一定要注意上面代码中,虽然泛型是IntPair,但是比较的始终是第一个字段,而不是所有的字段。因为要按照原有的key进行分组啊。

如果以上的代码明白,再看一下自定义的Mapper类和Reducer类吧

public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> {

private final IntPair key = new IntPair();
private final IntWritable value = new IntWritable();

@Override
public void map(LongWritable inKey, Text inValue,
Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(inValue.toString());
int left = 0;
int right = 0;
if (itr.hasMoreTokens()) {
left = Integer.parseInt(itr.nextToken());
if (itr.hasMoreTokens()) {
right = Integer.parseInt(itr.nextToken());
}
key.set(left, right);
value.set(right);
context.write(key, value);
}
}
}

public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
private static final Text SEPARATOR = new Text("------------------------------------------------");
private final Text first = new Text();

@Override
public void reduce(IntPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
context.write(SEPARATOR, null);
first.set(Integer.toString(key.getFirst()));
for(IntWritable value: values) {
context.write(first, value);
}
}
}


在map函数中,要注意k2是由哪几个字段组成的;在reduce函数中,要注意输出的k3是IntPair中的第一个字段,而不是所有字段。

好了,看一下驱动代码吧,如下

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

final FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop2:9000"), conf);
fileSystem.delete(new Path(OUTPUT_PATH), true);

Job job = new Job(conf, "secondary sort");
job.setJarByClass(SecondarySortApp.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setGroupingComparatorClass(GroupingComparator.class);

job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}


以上驱动代码中,重大变化是设置了分组比较函数。好了,看看执行结果吧

------------------------------------------------
20    21
------------------------------------------------
50    51
50    52
50    53
50    54
------------------------------------------------
60    51
60    52
60    53
60    56
60    57
60    61
------------------------------------------------
70    54
70    55
70    56
70    57
70    58
70    58


看看,是不是我们想要的结果啊!!

如果读者能够看明白,那么我出个思考题:在以上例子中,按照第一列升序,第二列倒序输出
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: