您的位置:首页 > 运维架构

自定义-Hadoop自定义分组Group

2016-08-21 10:13 519 查看
自定义分组MyGroup:

主要是继承WritableComparator类,重写compare函数

我这里重写的是该源码函数:

[java]
view plain
copy
print?





/** Compare two WritableComparables. 
   * 
   * <p> The default implementation uses the natural ordering, calling {@link 
   * Comparable#compareTo(Object)}. */  
  @SuppressWarnings("unchecked")  
  public int compare(WritableComparable a, WritableComparable b) {  
    return a.compareTo(b);  
  }  

原始文件数据:

[java]
view plain
copy
print?





hadoop  a  
spark   a  
hive    a  
hbase   a  
tachyon a  
storm   a  
redis   a  

代码:

[java]
view plain
copy
print?





import java.io.IOException;  
  
  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.LongWritable;  
import org.apache.hadoop.io.Text;  
import org.apache.hadoop.io.WritableComparable;  
import org.apache.hadoop.io.WritableComparator;  
import org.apache.hadoop.mapreduce.Job;  
import org.apache.hadoop.mapreduce.Mapper;  
import org.apache.hadoop.mapreduce.Reducer;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import org.apache.hadoop.util.GenericOptionsParser;  
  
  
public class MyGroup {  
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
        Configuration conf = new Configuration();  
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
        if(otherArgs.length!=2){  
            System.err.println("Usage databaseV1 <inputpath> <outputpath>");  
        }  
          
        Job job = Job.getInstance(conf, MyGroup.class.getSimpleName() + "1");  
        job.setJarByClass(MyGroup.class);  
        job.setMapOutputKeyClass(Text.class);  
        job.setMapOutputValueClass(Text.class);  
        job.setOutputKeyClass(Text.class);  
        job.setOutputValueClass(Text.class);  
        job.setMapperClass(MyMapper1.class);  
        job.setGroupingComparatorClass(MyGroupComparator.class);  
        job.setReducerClass(MyReducer1.class);  
        job.setInputFormatClass(TextInputFormat.class);  
        job.setOutputFormatClass(TextOutputFormat.class);  
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  
        job.waitForCompletion(true);  
    }  
    public static class MyMapper1 extends Mapper<LongWritable, Text, Text, Text>{  
        @Override  
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)  
                throws IOException, InterruptedException {  
            String[] spl=value.toString().split("\t");  
            context.write(new Text(spl[0].trim()), new Text(spl[1].trim()));  
        }  
    }  
    public static class MyReducer1 extends Reducer<Text, Text, Text, Text>{  
        @Override  
        protected void reduce(Text k2, Iterable<Text> v2s, Reducer<Text, Text, Text, Text>.Context context)  
                throws IOException, InterruptedException {  
            Long count=0L;  
            for (@SuppressWarnings("unused") Text v2 : v2s) {  
                count++;  
                context.write(new Text("in--"+k2), new Text(count.toString()));  
            }  
            context.write(new Text("out--"+k2), new Text(count.toString()));  
        }  
    }  
    public static class MyGroupComparator extends WritableComparator{  
        public MyGroupComparator(){  
            super(Text.class,true);  
        }  
        @SuppressWarnings("rawtypes")  
        public int compare(WritableComparable a, WritableComparable b) {  
            Text p1 = (Text) a;  
            Text p2 = (Text) b;  
            p1.compareTo(p2);  
            return  0;  
          }  
    }  
}  

默认分组结果是这样的:

[java]
view plain
copy
print?





public static class MyGroupComparator extends WritableComparator{  
        public MyGroupComparator(){  
            super(Text.class,true);  
        }  
        @SuppressWarnings("rawtypes")  
        public int compare(WritableComparable a, WritableComparable b) {  
            Text p1 = (Text) a;  
            Text p2 = (Text) b;  
            return p1.compareTo(p2);  
          }  
    }  

相同Key为一组:

[java]
view plain
copy
print?





in--hadoop      1  
out--hadoop     1  
in--hbase       1  
out--hbase      1  
in--hive        1  
out--hive       1  
in--redis       1  
out--redis      1  
in--spark       1  
out--spark      1  
in--storm       1  
out--storm      1  
in--tachyon     1  
out--tachyon    1  

自定义后:

[java]
view plain
copy
print?





public static class MyGroupComparator extends WritableComparator{  
        public MyGroupComparator(){  
            super(Text.class,true);  
        }  
        @SuppressWarnings("rawtypes")  
        public int compare(WritableComparable a, WritableComparable b) {  
            Text p1 = (Text) a;  
            Text p2 = (Text) b;  
            p1.compareTo(p2);  
            return 0;  
          }  
    }  

所有key均为一组:

[java]
view plain
copy
print?





in--hadoop      1  
in--hbase       2  
in--hive        3  
in--redis       4  
in--spark       5  
in--storm       6  
in--tachyon     7  
out--tachyon    7 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: