Hadoop自定义排序和自定义数据类型使用(setSortComparatorClass和setGroupingComparatorClass)
2014-03-31 20:33
501 查看
1 Mapper
2 Partitioner
3 Reducer
4 自定义数据类型TextInt
6 自定义的Mapper端的排序比较类
7 自定义的Reducer端的排序比较类
8 驱动程序
9 运行和效果
注意:必须得在驱动程序中设置setMapperOutputKey和setMApperOutputValue,默认的是mapper输出value和key类型是Text和Text。
public class SortMapper extends Mapper<Object, Text, TextInt, IntWritable>{ public TextInt textInt = new TextInt(); public IntWritable intp = new IntWritable(0); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { int i = Integer.parseInt(value.toString()); textInt.setStr(key.toString()); textInt.setValue(i); intp.set(i); context.write(textInt,intp); } }
2 Partitioner
public class SortPartitioner extends Partitioner<TextInt, IntWritable>{ @Override public int getPartition(TextInt textInt, IntWritable value, int numReducers) { return textInt.getStr().hashCode() & Integer.MAX_VALUE % numReducers; } }
3 Reducer
public class SortReducer extends Reducer<TextInt, IntWritable, Text, Text>{ @Override protected void reduce(TextInt textInt, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { StringBuffer stringCombine = new StringBuffer(); Iterator<IntWritable> itr = values.iterator(); while(itr.hasNext()) { int value = itr.next().get(); stringCombine.append(value + ","); } int length = stringCombine.length(); if(length > 0) stringCombine.deleteCharAt(length - 1); context.write(new Text(textInt.getStr()), new Text(stringCombine.toString())); } }
4 自定义数据类型TextInt
public class TextInt implements WritableComparable<TextInt> { private String str; private int value; public String getStr() { return str; } public void setStr(String str) { this.str = str; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public void readFields(DataInput in) throws IOException { str = in.readUTF(); value = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(str); out.writeInt(value); } @Override public int compareTo(TextInt o) { return o.getStr().compareTo(this.getStr()); } }
6 自定义的Mapper端的排序比较类
public class TextIntComparator extends WritableComparator{ public TextIntComparator(){ super(TextInt.class, true); } @Override @SuppressWarnings("all") public int compare(WritableComparable a, WritableComparable b) { TextInt o1 = (TextInt) a; TextInt o2 = (TextInt) b; if(! o1.getStr().equals(o2.getStr())) return o1.getStr().compareTo(o2.getStr()); else return o1.getValue() - o2.getValue(); } }
7 自定义的Reducer端的排序比较类
public class TextComparator extends WritableComparator{ public TextComparator(){ super(TextInt.class, true); } @Override @SuppressWarnings("all") public int compare(WritableComparable a, WritableComparable b) { TextInt o1 = (TextInt) a; TextInt o2 = (TextInt) b; return o1.getStr().compareTo(o2.getStr()); } }
8 驱动程序
public class SortMain { public static void main(String[] args) throws IOException{ Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs(); if(otherArgs.length !=2 ) { System.err.println("Usage:sort <int><out>"); System.exit(2); } Job job = new Job(conf,"sort"); job.setJarByClass(SortMain.class); job.setInputFormatClass(KeyValueTextInputFormat.class); job.setMapperClass(SortMapper.class); job.setPartitionerClass(SortPartitioner.class); job.setMapOutputKeyClass(TextInt.class); job.setMapOutputValueClass(IntWritable.class); job.setReducerClass(SortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setSortComparatorClass(TextIntComparator.class); job.setGroupingComparatorClass(TextComparator.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); try { System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } }
9 运行和效果
注意:必须得在驱动程序中设置setMapperOutputKey和setMApperOutputValue,默认的是mapper输出value和key类型是Text和Text。
相关文章推荐
- Hadoop_2.1.0 MapReduce序列图
- 使用Hadoop搭建现代电信企业架构
- 单机版搭建Hadoop环境图文教程详解
- hadoop常见错误以及处理方法详解
- hadoop 单机安装配置教程
- hadoop的hdfs文件操作实现上传文件到hdfs
- hadoop实现grep示例分享
- Apache Hadoop版本详解
- linux下搭建hadoop环境步骤分享
- hadoop client与datanode的通信协议分析
- hadoop中一些常用的命令介绍
- Hadoop单机版和全分布式(集群)安装
- Hadoop安装感悟
- hadoop安装lzo
- HDFS 文件操作
- HBase基本原理
- HDFS DatanodeProtocol——sendHeartbeat
- HDFS DatanodeProtocol——register
- Hadoop集群提交作业问题总结
- Hadoop源码分析 HDFS ClientProtocol——addBlock