Hadoop1.x MapReduce 实现二次排序 实现WritableComparable接口
2017-08-02 17:23
501 查看
一、前言
利用MapReduce来实现,首先按照第一列升序排列,当第一列相同时,第二列升序排列 3 3 3 2 3 1 2 2 2 1 1 1 ------------------------------------- 预期结果 1 1 2 1 2 2 3 1 3 2 3 3
主要思路:
因为map输出的
<key,value>是按照key来排序,value不能参与排序,所以这里就自定义一个key 其实现WritableComparable类,具体自定义方式见代码中的NewK2的实现部分。
二、代码
package sort; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner; public class sort { static final String INPUT_PATH = "hdfs://hadoop1:9000/input"; static final String OUT_PATH = "hdfs://hadoop1:9000/out"; public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf); if(fileSystem.exists(new Path(OUT_PATH))){ fileSystem.delete(new Path(OUT_PATH), true); } final Job job = new Job(conf,sort.class.getSimpleName()); //指定输入目录 FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); //指定输入数据进行格式化的类 job.setInputFormatClass(TextInputFormat.class); //指定自定义Mapper类 job.setMapperClass(MyMapper.class); //指定Mapper输出的key,value类型 job.setMapOutputKeyClass(NewK2.class); job.setMapOutputValueClass(LongWritable.class); //分区 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //指定自定义的Reducer类 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(LongWritable.class); //指定输出目录 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); //指定输出的格式化类 job.setOutputFormatClass(TextOutputFormat.class); //将整个作业提交给JobTracker job.waitForCompletion(true); } static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ @Override protected void map(LongWritable key, Text v1, Mapper<LongWritable, Text, NewK2, LongWritable>.Context context) throws IOException, InterruptedException { String[] splited = v1.toString().split("\t"); final long k2Long = Long.parseLong(splited[0]); final long v2Long = Long.parseLong(splited[1]); NewK2 k2 = new NewK2(k2Long,v2Long); context.write(k2, new LongWritable(v2Long)); } } static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ @Override protected void reduce( NewK2 k2, Iterable<LongWritable> v2s, Reducer<NewK2, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException { context.write(new LongWritable(k2.first), new LongWritable(k2.second)); } } static class NewK2 implements WritableComparable<NewK2>{ Long first;//第一列数 Long second;//第二列数 public NewK2(){} public NewK2(Long first,Long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } /** * key排序是会调用该方法 * 如果当第一列不同时,第一列升序,当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; if(minus != 0){ return (int)minus; } return (int)(this.second - o.second); } @Override public int hashCode() { return this.first.hashCode() + this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 ok2 = (NewK2)obj; return (this.first == ok2.first) && (this.second == ok2.second); } } }
相关文章推荐
- (Hadoop学习-2)mapreduce实现二次排序
- Hadoop之MapReduce自定义二次排序流程实例详解
- Hadoop和Spark分别实现二次排序
- Hadoop 学习笔记 (十) MapReduce实现排序 全局变量
- python 实现Hadoop的partitioner和二次排序
- python 实现Hadoop的partitioner和二次排序
- python 实现Hadoop的partitioner和二次排序
- Hadoop---mapreduce排序和二次排序以及全排序
- Hadoop之MapReduce自定义二次排序流程实例详解
- hadoop 二次排序join的实现
- Hadoop之MapReduce自定义二次排序流程实例详解
- MapReduce实现自定义二次排序
- 分别使用Hadoop和Spark实现二次排序
- Hadoop streaming 编写MapReduce程序-二次排序,多文件输入
- Hadoop二次排序及MapReduce处理流程实例详解
- Hadoop Mapreduce分区、分组、二次排序过程详解[转]
- hadoop二次排序实现join
- Hadoop自定义Writable实现二次排序
- Hadoop二次排序及MapReduce处理流程实例详解
- Hadoop MapReduce 深入理解!二次排序案例!