mapreduce 默认排序算法
2015-05-15 10:43
141 查看
public class sortapp { private static final String INPUT_PATH="hdfs://localhost:9000/input"; private static final String OUT_PATH="hdfs://loalhost:9000/output"; public static void main(String args[]) throws Exception{ Configuration conf=new Configuration(); final FileSystem filesystem=FileSystem.get(new URI(INPUT_PATH),conf); final Path outpath=new Path(OUT_PATH); if(filesystem.exists(outpath)){ filesystem.delete(outpath,true); } final Job job=new Job(conf,sortapp.class.getSimpleName()); //制定输入文件路径 FileInputFormat.setInputPaths(job, INPUT_PATH); job.setInputFormatClass(TextInputFormat.class); //1.2指定自定义的Mapper类 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(NewK2.class);//指定输出<k2,v2>的类型 job.setMapOutputValueClass(LongWritable.class); //1.3 指定分区类 job.setPartitionerClass(HashPartitioner.class); job.setNumReduceTasks(1); //2.2 指定自定义的reduce类 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(LongWritable.class);//指定输出<k3,v3>的类型 job.setOutputValueClass(LongWritable.class); //2.3 指定输出到哪里 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH)); job.setOutputFormatClass(TextOutputFormat.class);//设定输出文件的格式化类 job.waitForCompletion(true);//把代码提交给JobTracker执行 } static class MyMapper extends Mapper<LongWritable, Text, NewK2, LongWritable>{ protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,NewK2,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { final String[] splited = value.toString().split("\t"); final NewK2 k2 = new NewK2(Long.parseLong(splited[0]), Long.parseLong(splited[1])); final LongWritable v2 = new LongWritable(Long.parseLong(splited[1])); context.write(k2, v2); }; } static class MyReducer extends Reducer<NewK2, LongWritable, LongWritable, LongWritable>{ protected void reduce(NewK2 k2, java.lang.Iterable<LongWritable> v2s, org.apache.hadoop.mapreduce.Reducer<NewK2,LongWritable,LongWritable,LongWritable>.Context context) throws java.io.IOException ,InterruptedException { context.write(new LongWritable(k2.first), new LongWritable(k2.second)); }; } /** * 问:为什么实现该类? * 答:因为原来的v2不能参与排序,把原来的k2和v2封装到一个类中,作为新的k2 * */ static class NewK2 implements WritableComparable<NewK2>{ Long first; Long second; public NewK2(){} public NewK2(long first, long second){ this.first = first; this.second = second; } @Override public void readFields(DataInput in) throws IOException { this.first = in.readLong(); this.second = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; if(minus !=0){ return (int)minus; } return (int)(this.second - o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } } } } }
相关文章推荐
- MapReduce:默认Counter的含义
- 【转】MapReduce:默认Counter的含义
- 如何去掉MapReduce输出的默认分隔符
- hive中设置查询不启动mapreduce作业(默认不启动)hive-site.xml关键配置
- MapReduce:默认Counter的含义
- MapReduce里面的两种排序算法
- Jdk 1.7.0_17中提供的默认的排序算法
- 排序算法及其在MapReduce的应用
- 026_默认的MapReduce Driver(最小驱动问题)
- hadoop中mapreduce的默认设置
- MapReduce:默认Counter的含义
- MapReduce:默认Counter的含义
- MapReduce:默认Counter的含义
- Hadoop 1.x MapReduce 默认驱动配置
- MapReduce: JobTracker默认task scheduling策略
- MapReduce:默认Counter的含义
- MapReduce:默认Counter的含义
- 基本排序算法及其在MapReduce的应用
- Jdk 1.7.0_17中提供的默认的排序算法