您的位置:首页 > 其它

mapreduce的二次排序 SecondarySort

2015-09-08 15:27 302 查看
在看Hadoop The definitive guide 时,关于二次排序,在设置好setGroupingComparatorClass 后一直不明白为什么reduce的入参就是要查询的年最高温度,代码里没有看到是怎么实现的:

代码:

// cc MaxTemperatureUsingSecondarySort Application to find the maximum temperature by sorting temperatures in the key
import java.io.IOException;

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

// vv MaxTemperatureUsingSecondarySort
public class MaxTemperatureUsingSecondarySort
extends Configured implements Tool {

static class MaxTemperatureMapper
extends Mapper<LongWritable, Text, IntPair, NullWritable> {

private NcdcRecordParser parser = new NcdcRecordParser();

@Override
protected void map(LongWritable key, Text value,
Context context) throws IOException, InterruptedException {

parser.parse(value);
if (parser.isValidTemperature()) {
/*[*/context.write(new IntPair(parser.getYearInt(),
parser.getAirTemperature()), NullWritable.get());/*]*/
}
}
}

static class MaxTemperatureReducer
extends Reducer<IntPair, NullWritable, IntPair, NullWritable> {

<span style="color:#ff0000;"> @Override
protected void reduce(IntPair key, Iterable<NullWritable> values,
Context context) throws IOException, InterruptedException {

/*[*/context.write(key, NullWritable.get());/*]*/
}</span>
}

public static class FirstPartitioner
extends Partitioner<IntPair, NullWritable> {

@Override
public int getPartition(IntPair key, NullWritable value, int numPartitions) {
// multiply by 127 to perform some mixing
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}

public static class KeyComparator extends WritableComparator {
protected KeyComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
if (cmp != 0) {
return cmp;
}
return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
}
}
 
<span style="color:#ff0000;">//此处只是将相同的年份归并为一个组</span>
public static class GroupComparator extends WritableComparator {
protected GroupComparator() {
super(IntPair.class, true);
}
@Override
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return IntPair.compare(ip1.getFirst(), ip2.getFirst());
}
}

@Override
public int run(String[] args) throws Exception {
Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
if (job == null) {
return -1;
}

job.setMapperClass(MaxTemperatureMapper.class);
/*[*/job.setPartitionerClass(FirstPartitioner.class);/*]*/
/*[*/job.setSortComparatorClass(KeyComparator.class);/*]*/
/*[*/job.setGroupingComparatorClass(GroupComparator.class);/*]*/
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(IntPair.class);
job.setOutputValueClass(NullWritable.class);

return job.waitForCompletion(true) ? 0 : 1;
}

public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
System.exit(exitCode);
}
}
// ^^ MaxTemperatureUsingSecondarySort


查询后发现二次排序的园里是:
1:在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。

2:在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: