您的位置:首页 > 其它

二次排序说明

2016-04-23 09:47 387 查看
hadoop二次排序,把部分value移至key,组成新class,作为mapreduce框架的新key进行计算。下面根据hadoop2.7.1源代码中样例程序secondarysort.java,总结二次排序相关方法。

<span style="font-size:18px;">package org.apache.hadoop.examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.GenericOptionsParser;

/*
本程序实现对输入的数据表,分别按照第1列、第2列排序后输出。
输入数据表如下:
3 9
3 7
1 4
输出结果为:
1 4
3 7
3 9
*/

public class SecondarySort {

//数据表类。有两个属性first、second,分别表示第1、2个列数字。
public static class IntPair implements WritableComparable<IntPair> {
private int first = 0;
private int second = 0;

public void set(int left, int right) {
first = left;
second = right;
}
public int getFirst() {
return first;
}
public int getSecond() {
return second;
}

@Override
public void readFields(DataInput in) throws IOException {
first = in.readInt() + Integer.MIN_VALUE;//序列化时,加上Inter.Min_VALUE避免数据过小而溢出
second = in.readInt() + Integer.MIN_VALUE;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(first - Integer.MIN_VALUE);
out.writeInt(second - Integer.MIN_VALUE);
}
@Override
public int hashCode() {
return first * 157 + second;
}
@Override
public boolean equals(Object right) {
if (right instanceof IntPair) {
IntPair r = (IntPair) right;
return r.first == first && r.second == second;
} else {
return false;
}
}

public static class Comparator extends WritableComparator {
public Comparator() {
super(IntPair.class);
}

/*
二次排序实现关键,需要在新class中重载compareTo(),返回int。优先比较first,如果两个对象的first不相同,则按照大小分别返回1或-1;如果first相同,则比较second。
mapreduce框架根据compareTo()返回值正负,判断比较的两个对象大小(小于零,小于;等于零,等于;大于零,大于),用于在shuffle中key/value排序使用。
*/
public int compareTo(IntPair o) {
if (first != o.first) {
return first < o.first ? -1 : 1;
} else if (second != o.second) {
return second < o.second ? -1 : 1;
} else {
return 0;
}
}
}

//Partitioner设置Map output的分区。在getPartiton()返回int,表示对应的分区号,从0开始到numPartitions-1,让对应Reducer来拉取相关记录。
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>{
@Override
public int getPartition(IntPair key, IntWritable value,int numPartitions) {
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}

// 设置group comparator,在reducer阶段按照compare()返回值来对key/value进行聚合。
public static class FirstGroupingComparator implements RawComparator<IntPair> {

@Override
public int compare(IntPair o1, IntPair o2) {
int l = o1.getFirst();
int r = o2.getFirst();
return l == r ? 0 : (l < r ? -1 : 1);
}
}

public static class MapClass
extends Mapper<LongWritable, Text, IntPair, IntWritable> {

private final IntPair key = new IntPair();
private final IntWritable value = new IntWritable();

@Override
public void map(LongWritable inKey, Text inValue,
Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(inValue.toString());
int left = 0;
int right = 0;
if (itr.hasMoreTokens()) {
left = Integer.parseInt(itr.nextToken());
if (itr.hasMoreTokens()) {
right = Integer.parseInt(itr.nextToken());
}
key.set(left, right);
value.set(right);
context.write(key, value);
}
}
}

public static class Reduce
extends Reducer<IntPair, IntWritable, Text, IntWritable> {
private static final Text SEPARATOR =
new Text("------------------------------------------------");
private final Text first = new Text();

@Override
public void reduce(IntPair key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
context.write(SEPARATOR, null);
first.set(Integer.toString(key.getFirst()));
for(IntWritable value: values) {
context.write(first, value);
}
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: secondarysort <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "secondary sort");
job.setJarByClass(SecondarySort.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);

job.setPartitionerClass(FirstPartitioner.class);
job.setGroupingComparatorClass(FirstGroupingComparator.class);

job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}

}</span>
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: