您的位置:首页 > 运维架构

hadoop二次排序实现join

2017-04-03 12:58 253 查看
package join;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.io.WritableUtils;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class j {

    public static class TextPair implements WritableComparable<TextPair>{

        public Text first;

        public Text second;

        public TextPair(){

            this.first=new Text();

            this.second=new Text();

        }

        public TextPair(String f,String s) {

            this.first=new Text(f);

            this.second=new Text(s);        

        }

        @Override

        public void write(DataOutput out) throws IOException {

            first.write(out);

            second.write(out);

        }

        @Override

        public void readFields(DataInput in) throws IOException {

            first.readFields(in);

            second.readFields(in);

        }

        @Override

        public int compareTo(TextPair t) {

            int i=first.compareTo(t.first);

            if(i==0){

                return second.compareTo(t.second);

            }

            return i;

        }

        

    }    

    public static class map1 extends Mapper<LongWritable, Text,TextPair, Text > {

        public void map (LongWritable key,Text value,Context context) throws IOException, InterruptedException{

            String[] s=value.toString().split(",");

            context.write(new TextPair(s[0],"0"),new Text(s[1]));

        }

    }

    public static class map2 extends Mapper<LongWritable, Text,TextPair, Text> {

        public void map (LongWritable key,Text value,Context context) throws IOException, InterruptedException{

            String[] s=value.toString().split(",");

            context.write(new TextPair(s[0],"1"),new Text(s[1]));

        }

    }

    public static class reduce extends Reducer<TextPair,Text,Text,Text>{

        public void reduce(TextPair key,Iterable<Text> value,Context context) throws IOException, InterruptedException{

            Iterator<Text> i=value.iterator();

            Text t1=new Text(i.next());//注意此处必须用new复制一个对象,否则在下面的迭代中t1会改变,不再是第一个值

            Text t2;

            while(i.hasNext()){

                t2=i.next();

                context.write(key.first,new Text(t2.toString()+"\t"+t1.toString()));

                System.out.println(t1.toString());

            }

        }

    }
    public static class UDF_partition extends Partitioner<TextPair,Text>{

//partiton发生在map阶段最后,确定每个kv对的分组编号,发往不同的reduce

        @Override

        public int getPartition(TextPair key, Text value, int numpartition) {

            return (key.first.hashCode()&Integer.MAX_VALUE)%numpartition;

        }    

    }
    public static class UDF_group extends WritableComparator{

//group发生在reduce之前,将((1,2),a),((1,3),b),((1,2),a),((2,2),d),((2,2),a)分组,变为k-list(v)的形式,k取同组的第一个k

        public UDF_group(){

            super(TextPair.class,true);//注意此处构造方法中的参数true是父类的构造函数中不可缺少的

        }

        public int compare(WritableComparable a,WritableComparable b){

            if(a instanceof TextPair && b instanceof TextPair){

                return ((TextPair)a).first.compareTo(((TextPair)b).first);    

            }

            retu
b946
rn super.compare(a, b);

        }

    }

    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException{

        Configuration con=new Configuration();

        Job job=new Job(con);

        job.setJarByClass(j.class);

        

        MultipleInputs.addInputPath(job, new Path(args[0]), TextInputFormat.class,map1.class);

        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class,map2.class);

        FileOutputFormat.setOutputPath(job,new Path(args[2]));

        job.setReducerClass(reduce.class);

        

        job.setPartitionerClass(UDF_partition.class);

        job.setGroupingComparatorClass(UDF_group.class);

        

        job.setMapOutputKeyClass(TextPair.class);

        job.setMapOutputValueClass(Text.class);

        

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(Text.class);

        

        System.exit(job.waitForCompletion(true)?0:1);

    }

    

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: