MapReduce功能实现二---排序
2017-07-25 17:56
405 查看
MapReduce功能实现系列:
MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
MapReduce功能实现二---排序
MapReduce功能实现三---Top N
MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五---去重(Distinct)、计数(Count)
MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七---小综合(多个job串行处理计算平均值)
MapReduce功能实现八---分区(Partition)
MapReduce功能实现九---Pv、Uv
MapReduce功能实现十---倒排索引(Inverted Index)
MapReduce功能实现十一---join
情况1:
[hadoop@h71 q1]$ vi ip.txt
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.2.2 ccc
192.168.3.3 ddd
192.168.3.3 ddd
192.168.3.3 ddd
192.168.5.5 fff
[hadoop@h71 q1]$ hadoop fs -put ip.txt /input
java代码(将IP统计并升序输出):
在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac IpTopK.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf tt.jar IpTopK*class(这个tt.jar还必须得和代码中的相对应)
[hadoop@h71 q1]$ hadoop jar tt.jar IpTopK /input/ip.txt /output
[hadoop@h71 q1]$ hadoop fs -lsr /output
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 16:07 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 56 2017-03-18 16:07 /output/part-00000
[hadoop@h71 q1]$ hadoop fs -lsr /output-2
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 16:07 /output-2/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 56 2017-03-18 16:07 /output-2/part-00000
[hadoop@h71 q1]$ hadoop fs -cat /output/part-00000
192.168.1.1 7
192.168.2.2 1
192.168.3.3 3
192.168.5.5 1
[hadoop@h71 q1]$ hadoop fs -cat /output-2/part-00000
1 192.168.5.5
1 192.168.2.2
3 192.168.3.3
7 192.168.1.1
情况2:降序
[hadoop@h71 q1]$ vi test.txt
a 5
b 4
c 74
d 78
e 1
r 64
f 4
注意:分隔符/t(Tab键)或者空格都可以
[hadoop@h71 q1]$ hadoop fs -put test.txt /input
java代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac JiangXu.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar JiangXu*class
[hadoop@h71 q1]$ hadoop jar xx.jar JiangXu /input/test.txt /output
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
d 78
c 74
r 64
a 5
f 4
b 4
e 1
情况3:改进型的WordCount(按词频倒排),官网示例WordCount只统计出单词出现的次数,并未按词频做倒排,下面的代码示例实现了该功能
来自:http://www.cnblogs.com/yjmyzz/p/hadoop-mapreduce-2-sample.html
原理: 依然用到了cleanup,此外为了实现排序,采用了TreeMap这种内置了key排序的数据结构.
这里为了展示更直观,选用了电影<超能陆战队>主题曲的第一段歌词做为输入:
[hadoop@h71 q1]$ vi test.txt
They say we are what we are
But we do not have to be
I am bad behavior but I do it in the best way
I will be the watcher
Of the eternal flame
I will be the guard dog
of all your fever dreams
[hadoop@h71 q1]$ hadoop fs -put test.txt /input
java代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac WordCount2.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar WordCount2*class
[hadoop@h71 q1]$ hadoop jar xx.jar WordCount2 /input/test.txt /output
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
I,the 4
be,we 3
are,do,will 2
But,Of,They,all,am,bad,behavior,best,but,dog,dreams,eternal,fever,flame,guard,have,in,it,not,of,say,to,watcher,way,what,your 1
情况4:自定义排序
对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列
如果利用mapreduce过程的自动排序,只能实现根据第一列排序,现在需要自定义一个继承自WritableComparable接口的类,用该类作为key,就可以利用mapreduce过程的自动排序了。
数据格式:
[hadoop@h71 q1]$ vi haha.txt
7 3
7 5
7 1
5 9
5 6
1 7
Java代码:
注意:KeyValue 中的first second属性必须写成Long类型,而不是long,否则 this.first.hashCode()不成立。对任何实现WritableComparable的类都能进行排序,这可以一些复杂的数据,只要把他们封装成实现了WritableComparable的类作为key就可以了
运行程序后查看结果:
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
1 7
5 6
5 9
7 1
7 3
7 5
MapReduce功能实现一---Hbase和Hdfs之间数据相互转换
MapReduce功能实现二---排序
MapReduce功能实现三---Top N
MapReduce功能实现四---小综合(从hbase中读取数据统计并在hdfs中降序输出Top 3)
MapReduce功能实现五---去重(Distinct)、计数(Count)
MapReduce功能实现六---最大值(Max)、求和(Sum)、平均值(Avg)
MapReduce功能实现七---小综合(多个job串行处理计算平均值)
MapReduce功能实现八---分区(Partition)
MapReduce功能实现九---Pv、Uv
MapReduce功能实现十---倒排索引(Inverted Index)
MapReduce功能实现十一---join
情况1:
[hadoop@h71 q1]$ vi ip.txt
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.2.2 ccc
192.168.3.3 ddd
192.168.3.3 ddd
192.168.3.3 ddd
192.168.5.5 fff
[hadoop@h71 q1]$ hadoop fs -put ip.txt /input
java代码(将IP统计并升序输出):
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class IpTopK { public static class IpTopKMapper1 extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { @Override public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException { String ip = text.toString().split(" ", 5)[0]; outputCollector.collect(new Text(ip), new Text("1")); } } public static class IpTopKReducer1 extends MapReduceBase implements Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterator<Text> iterator, OutputCollector<Text, Text> outputCollector, Reporter reporter) throws IOException { long sum = 0; while(iterator.hasNext()){ sum = sum + Long.parseLong(iterator.next().toString()); } outputCollector.collect(new Text(key), new Text(String.valueOf(sum))); /** * ip1 count * ip2 count * ip3 count */ } } public static class IpTopKMapper2 extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> { @Override public void map(LongWritable longWritable, Text text, OutputCollector<LongWritable, Text> outputCollector, Reporter reporter) throws IOException { String [] ks = text.toString().split("\t"); /** * ks[0] , ip * ks[1], count */ outputCollector.collect(new LongWritable(Long.parseLong(ks[1])), new Text(ks[0])); } } public static class IpTopKReducer2 extends MapReduceBase implements Reducer<LongWritable, Text, LongWritable, Text> { @Override public void reduce(LongWritable key, Iterator<Text> iterator, OutputCollector<LongWritable, Text> outputCollector, Reporter reporter) throws IOException { while(iterator.hasNext()){ outputCollector.collect(key, iterator.next()); } } } public static void main(String [] args) throws IOException { System.out.println(args.length); if(args.length < 2){ System.out.println("args not right!"); return ; } JobConf conf = new JobConf(IpTopK.class); conf.set("mapred.jar","tt.jar"); //set output key class conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(Text.class); //set mapper & reducer class conf.setMapperClass(IpTopKMapper1.class); conf.setCombinerClass(IpTopKReducer1.class); conf.setReducerClass(IpTopKReducer1.class); // set format conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); String inputDir = args[0]; String outputDir = args[1]; // FileInputFormat.setInputPaths(conf, "/user/hadoop/rongxin/locationinput/"); FileInputFormat.setInputPaths(conf, inputDir); FileOutputFormat.setOutputPath(conf, new Path(outputDir)); boolean flag = JobClient.runJob(conf).isSuccessful(); if(flag){ System.out.println("run job-1 successful"); JobConf conf1 = new JobConf(IpTopK.class); conf1.set("mapred.jar","tt.jar"); //set output key class conf1.setOutputKeyClass(LongWritable.class); conf1.setOutputValueClass(Text.class); //set mapper & reducer class conf1.setMapperClass(IpTopKMapper2.class); conf1.setReducerClass(IpTopKReducer2.class); // set format conf1.setInputFormat(TextInputFormat.class); conf1.setOutputFormat(TextOutputFormat.class); conf1.setNumReduceTasks(1); // FileInputFormat.setInputPaths(conf, "/user/hadoop/rongxin/locationinput/"); FileInputFormat.setInputPaths(conf1, outputDir); FileOutputFormat.setOutputPath(conf1, new Path(outputDir + "-2")); boolean flag1 = JobClient.runJob(conf1).isSuccessful(); if(flag1){ System.out.println("run job-2 successful !!"); } } } }注意:这个是hadoop1版本的代码,并且该代码执行了两个mapreduce任务
在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac IpTopK.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf tt.jar IpTopK*class(这个tt.jar还必须得和代码中的相对应)
[hadoop@h71 q1]$ hadoop jar tt.jar IpTopK /input/ip.txt /output
[hadoop@h71 q1]$ hadoop fs -lsr /output
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 16:07 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 56 2017-03-18 16:07 /output/part-00000
[hadoop@h71 q1]$ hadoop fs -lsr /output-2
-rw-r--r-- 2 hadoop supergroup 0 2017-03-18 16:07 /output-2/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 56 2017-03-18 16:07 /output-2/part-00000
[hadoop@h71 q1]$ hadoop fs -cat /output/part-00000
192.168.1.1 7
192.168.2.2 1
192.168.3.3 3
192.168.5.5 1
[hadoop@h71 q1]$ hadoop fs -cat /output-2/part-00000
1 192.168.5.5
1 192.168.2.2
3 192.168.3.3
7 192.168.1.1
情况2:降序
[hadoop@h71 q1]$ vi test.txt
a 5
b 4
c 74
d 78
e 1
r 64
f 4
注意:分隔符/t(Tab键)或者空格都可以
[hadoop@h71 q1]$ hadoop fs -put test.txt /input
java代码:
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JiangXu { public static class SortIntValueMapper extends Mapper<LongWritable, Text, IntWritable, Text> { private final static IntWritable wordCount = new IntWritable(1); private Text word = new Text(); public SortIntValueMapper() { super(); } @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer tokenizer = new StringTokenizer(value.toString()); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken().trim()); wordCount.set(Integer.valueOf(tokenizer.nextToken().trim())); context.write(wordCount, word); } } } /** * 按照key的大小来划分区间,当然,key是int值 */ public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> { @Override public int getPartition(K key, V value, int numReduceTasks) { /** * int值的hashcode还是自己本身的数值 */ //这里我认为大于maxValue的就应该在第一个分区 int maxValue = 50; int keySection = 0; // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0 if (numReduceTasks > 1 && key.hashCode() < maxValue) { int sectionValue = maxValue / (numReduceTasks - 1); int count = 0; while ((key.hashCode() - sectionValue * count) > sectionValue) { count++; } keySection = numReduceTasks - 1 - count; } return keySection; } } /** * int的key按照降序排列 */ public static class IntKeyDescComparator extends WritableComparator { protected IntKeyDescComparator() { super(IntWritable.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { return -super.compare(a, b); } } /** * 把key和value颠倒过来输出 */ public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> { private Text result = new Text(); @Override public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text val : values) { result.set(val.toString()); context.write(result, key); } } } public static void main(String [] args) throws Exception { /** * 这里是map输出的key和value类型 */ Configuration conf = new Configuration(); Job job = new Job(conf, "word count"); job.setJarByClass(JiangXu.class); job.setMapperClass(SortIntValueMapper.class); job.setSortComparatorClass(IntKeyDescComparator.class); job.setPartitionerClass(KeySectionPartitioner.class); job.setReducerClass(SortIntValueReduce.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); /** *这里可以放输入目录数组,也就是可以把上一个job所有的结果都放进去 **/ FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac JiangXu.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar JiangXu*class
[hadoop@h71 q1]$ hadoop jar xx.jar JiangXu /input/test.txt /output
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
d 78
c 74
r 64
a 5
f 4
b 4
e 1
情况3:改进型的WordCount(按词频倒排),官网示例WordCount只统计出单词出现的次数,并未按词频做倒排,下面的代码示例实现了该功能
来自:http://www.cnblogs.com/yjmyzz/p/hadoop-mapreduce-2-sample.html
原理: 依然用到了cleanup,此外为了实现排序,采用了TreeMap这种内置了key排序的数据结构.
这里为了展示更直观,选用了电影<超能陆战队>主题曲的第一段歌词做为输入:
[hadoop@h71 q1]$ vi test.txt
They say we are what we are
But we do not have to be
I am bad behavior but I do it in the best way
I will be the watcher
Of the eternal flame
I will be the guard dog
of all your fever dreams
[hadoop@h71 q1]$ hadoop fs -put test.txt /input
java代码:
import java.io.IOException; import java.util.Comparator; import java.util.StringTokenizer; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount2 { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排 private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() { @Override public int compare(Integer x, Integer y) { return y.compareTo(x); } }); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //reduce后的结果放入treeMap,而不是向context中记入结果 int sum = 0; for (IntWritable val : values) { sum += val.get(); } if (treeMap.containsKey(sum)){ String value = treeMap.get(sum) + "," + key.toString(); treeMap.put(sum,value); } else { treeMap.put(sum, key.toString()); } } protected void cleanup(Context context) throws IOException, InterruptedException { //将treeMap中的结果,按value-key顺序写入contex中 for (Integer key : treeMap.keySet()) { context.write(new Text(treeMap.get(key)), new IntWritable(key)); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount2 <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count2"); job.setJarByClass(WordCount2.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac WordCount2.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar WordCount2*class
[hadoop@h71 q1]$ hadoop jar xx.jar WordCount2 /input/test.txt /output
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
I,the 4
be,we 3
are,do,will 2
But,Of,They,all,am,bad,behavior,best,but,dog,dreams,eternal,fever,flame,guard,have,in,it,not,of,say,to,watcher,way,what,your 1
情况4:自定义排序
对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列
如果利用mapreduce过程的自动排序,只能实现根据第一列排序,现在需要自定义一个继承自WritableComparable接口的类,用该类作为key,就可以利用mapreduce过程的自动排序了。
数据格式:
[hadoop@h71 q1]$ vi haha.txt
7 3
7 5
7 1
5 9
5 6
1 7
Java代码:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class ZiDingYi { private static final String INPUT_PATH = "hdfs://h71:9000/in"; private static final String OUT_PATH = "hdfs://h71:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); if(fileSystem.exists(new Path(OUT_PATH))){ fileSystem.delete(new Path(OUT_PATH),true); } Job job = new Job(conf,ZiDingYi.class.getSimpleName()); FileInputFormat.setInputPaths(job, INPUT_PATH); job.setJarByClass(ZiDingYi.class); //上面这行必须加,不然会报错:Caused by: java.lang.ClassNotFoundException: Class ZiDingYi$MyMapper not found //指定哪个类用来格式化输入文件 job.setInputFormatClass(TextInputFormat.class); //指定自定义的Mapper类 job.setMapperClass(MyMapper.class); //指定输出<k2,v2>的类型 job.setMapOutputKeyClass(newK2.class); job.setMapOutputValueClass(LongWritable.class); //指定分区类 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //指定自定义的reduce类 job.setReducerClass(MyReducer.class); //指定输出<k3,v3>的类型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); //设定输出文件的格式化类 job.setOutputFormatClass(TextOutputFormat.class); //把代码提交给JobTracker执行 job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable,Text, newK2,LongWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splied = value.toString().split(" "); newK2 k2 = new newK2(Long.parseLong(splied[0]),Long.parseLong(splied[1])); final LongWritable v2 = new LongWritable(Long.parseLong(splied[1])); context.write(k2, v2); } } static class MyReducer extends Reducer<newK2, LongWritable, LongWritable, LongWritable>{ @Override protected void reduce(ZiDingYi.newK2 key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException { context.write(new LongWritable(key.first), new LongWritable(key.second)); } } static class newK2 implements WritableComparable<newK2>{ Long first; Long second; public newK2(long first, long second) { this.first = first; this.second = second; } public newK2() { } //这个方法还不能删掉,否则报错:Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: ZiDingYi$newK2.<init>() @Override public void readFields(DataInput input) throws IOException { this.first = input.readLong(); this.second = input.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } //当第一列不同时,升序;当第一列相同时,第二列升序 @Override public int compareTo(newK2 o) { long temp = this.first -o.first; if(temp!=0){ return (int)temp; } return (int)(this.second -o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof newK2)){ return false; } newK2 k2 = (newK2)obj; return(this.first == k2.first)&&(this.second == k2.second); } } }
注意:KeyValue 中的first second属性必须写成Long类型,而不是long,否则 this.first.hashCode()不成立。对任何实现WritableComparable的类都能进行排序,这可以一些复杂的数据,只要把他们封装成实现了WritableComparable的类作为key就可以了
运行程序后查看结果:
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
1 7
5 6
5 9
7 1
7 3
7 5
相关文章推荐
- MapReduce实现排序功能
- MapReduce实现排序功能
- MapReduce实现排序功能
- Redis缓存实现排序功能
- 手工实现GridView排序、删除、编辑、新增数据功能
- php自定义函数实现二维数组排序功能
- JS实现的点击表头排序功能示例
- SpringBoot JPA实现增删改查、分页、排序、事务操作等功能
- ListView点击列头排序功能实现
- Android 实现ListView的A-Z字母排序和过滤搜索功能,实现汉字转成拼音
- JQuery+Ajax实现数据查询、排序和分页功能
- 初学Redis(4)——简单实现Redis缓存中的排序功能
- 2014.09.23 mysql 一条sql语句实现实现搜索功能(加权排序)
- 重写ListView控件,实现点击列头排序的功能
- hadoop的mapreduce实现中 对于key的全排序和对于value的排序
- C#--基于delegate实现不同功能的排序
- MapReduce功能实现十一---join
- PHP简单实现多维数组合并与排序功能示例
- php自定义函数实现二维数组排序功能