mapreduce的二次排序 SecondarySort
2015-09-08 15:27
302 查看
在看Hadoop The definitive guide 时,关于二次排序,在设置好setGroupingComparatorClass 后一直不明白为什么reduce的入参就是要查询的年最高温度,代码里没有看到是怎么实现的:
代码:
查询后发现二次排序的园里是:
1:在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。
2:在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。
代码:
// cc MaxTemperatureUsingSecondarySort Application to find the maximum temperature by sorting temperatures in the key import java.io.IOException; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; // vv MaxTemperatureUsingSecondarySort public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value); if (parser.isValidTemperature()) { /*[*/context.write(new IntPair(parser.getYearInt(), parser.getAirTemperature()), NullWritable.get());/*]*/ } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> { <span style="color:#ff0000;"> @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { /*[*/context.write(key, NullWritable.get());/*]*/ }</span> } public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { // multiply by 127 to perform some mixing return Math.abs(key.getFirst() * 127) % numPartitions; } } public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse } }
<span style="color:#ff0000;">//此处只是将相同的年份归并为一个组</span> public static class GroupComparator extends WritableComparator { protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; return IntPair.compare(ip1.getFirst(), ip2.getFirst()); } } @Override public int run(String[] args) throws Exception { Job job = JobBuilder.parseInputAndOutput(this, getConf(), args); if (job == null) { return -1; } job.setMapperClass(MaxTemperatureMapper.class); /*[*/job.setPartitionerClass(FirstPartitioner.class);/*]*/ /*[*/job.setSortComparatorClass(KeyComparator.class);/*]*/ /*[*/job.setGroupingComparatorClass(GroupComparator.class);/*]*/ job.setReducerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } } // ^^ MaxTemperatureUsingSecondarySort
查询后发现二次排序的园里是:
1:在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。
2:在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。
相关文章推荐
- CentOS6.5升级为CentOS7.0
- iOS测试-GHUnit配置
- SIP 中的Dialog,session,transaction和call
- How to use Hibernate annotations @ManyToOne and @OneToMany for associations
- weblogic的集群与配置
- 3DsMax 创建阴影贴图的技术指南
- Maven快速安装配置
- 二分图最大匹配 Hopcroft-Karp算法模板
- FFMPEG学习1利用FFMPEG和SDL简单实现播放器
- fackbook的Fresco (FaceBook推出的Android图片加载库-Fresco)
- fackbook的Fresco (FaceBook推出的Android图片加载库-Fresco)
- fackbook的Fresco (FaceBook推出的Android图片加载库-Fresco)
- fackbook的Fresco (FaceBook推出的Android图片加载库-Fresco)
- fackbook的Fresco (FaceBook推出的Android图片加载库-Fresco)
- javascript 发送http请求
- UILabel根据内容自动调整高度
- Android 5.0 内置第三方apk
- mathtype批量修改公式的字体大小
- 服务端与管理端配合的有关说明
- android studio 常遇错误,界面,Gradle详细讲解