您的位置:首页 > 其它

MapReduce实现Reduce端Join操作实例

2015-06-15 22:56 513 查看

使用案例:

联接两张表
Table EMP:(新建文件EMP,第一行属性名不要)
Name        Sex      Age        DepNo
zhang       male     20           1     
li         female    25           2
wang       female    30           3
zhou        male     35           2

Table Dep:(新建文件DEP,第一行属性名不要)
DepNo        DepName
1            Sales
2            Dev
3            Mgt

Inner join:
select Name,Sex,Age,DepName from EMP inner join DEP on EMP.DepNo=DEP.DepNo

Result:
Name        Sex      Age        DepName
zhang       male     20          Sales     
li         female    25          Dev
wang       female    30          Mgt
zhou        male     35          Dev


接下来使用MapReduce实进行Join操作。

Reduce端进行Join操作

reduce端联接比map端联接更普遍,因为输入的数据不需要特定的结构;效率低,因为所有数据必须经过shuffle过程,但是编写简单。

基本思路:

1、Map端读取所有文件,并在输出的内容里加上标识代表数据是从哪个文件里来的;

2、在reduce处理函数里,按照标识对数据进行保存

3、然后根据Key的Join来求出结果直接输出;

package Join;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class EmpJoinDep implements WritableComparable{

    private String Name="";
    private String Sex="";
    private int Age=0;
    private int DepNo=0;
    private String DepName="";
    private String table="";
    public EmpJoinDep() {}

    public EmpJoinDep(EmpJoinDep empJoinDep) {
        this.Name = empJoinDep.getName();
        this.Sex = empJoinDep.getSex();
        this.Age = empJoinDep.getAge();
        this.DepNo = empJoinDep.getDepNo();
        this.DepName = empJoinDep.getDepName();
        this.table = empJoinDep.getTable();
    }

    public String getName() {
        return Name;
    }

    public void setName(String name) {
        Name = name;
    }

    public String getSex() {
        return Sex;
    }

    public void setSex(String sex) {
        this.Sex = sex;
    }

    public int getAge() {
        return Age;
    }

    public void setAge(int age) {
        this.Age = age;
    }

    public int getDepNo() {
        return DepNo;
    }

    public void setDepNo(int depNo) {
        DepNo = depNo;
    }

    public String getDepName() {
        return DepName;
    }

    public void setDepName(String depName) {
        DepName = depName;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(Name);
        out.writeUTF(Sex);
        out.writeInt(Age);
        out.writeInt(DepNo);
        out.writeUTF(DepName);
        out.writeUTF(table);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.Name = in.readUTF();
        this.Sex = in.readUTF();
        this.Age = in.readInt();
        this.DepNo = in.readInt();
        this.DepName = in.readUTF();
        this.table = in.readUTF();  
    }

    //不做任何排序
    @Override
    public int compareTo(Object o) {
        return 0;
    }

    @Override
    public String toString() {
        return "EmpJoinDep [Name=" + Name + ", Sex=" + Sex + ", Age=" + Age
                + ", DepName=" + DepName + "]";
    }

}


package Join;

import java.io.IOException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class ReduceJoin {
    private final static String INPUT_PATH = "hdfs://liguodong:8020/inputjoin";
    private final static String OUTPUT_PATH = "hdfs://liguodong:8020/outputmapjoin";

    public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, EmpJoinDep>{
        private EmpJoinDep empJoinDep = new EmpJoinDep();

        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            String[] values = value.toString().split("\\s+");
            if(values.length==4){
                empJoinDep.setName(values[0]);
                empJoinDep.setSex(values[1]);
                empJoinDep.setAge(Integer.parseInt(values[2]));
                empJoinDep.setDepNo(Integer.parseInt(values[3]));
                empJoinDep.setTable("EMP");
                context.write(new IntWritable(Integer.parseInt(values[3])), empJoinDep);
            }

            if(values.length==2){
                empJoinDep.setDepNo(Integer.parseInt(values[0]));
                empJoinDep.setDepName(values[1]);
                empJoinDep.setTable("DEP");
                context.write(new IntWritable(Integer.parseInt(values[0])), empJoinDep);
            }   
        }
    }

    public static class MyReducer extends Reducer<IntWritable, EmpJoinDep, NullWritable, EmpJoinDep>{

        @Override
        protected void reduce(IntWritable key, Iterable<EmpJoinDep> values,
                Context context)
                throws IOException, InterruptedException {
            String depName = "";
            List<EmpJoinDep> list = new LinkedList<EmpJoinDep>();
            //1  emp
            //1  dep
            for (EmpJoinDep val : values) {
                list.add(new EmpJoinDep(val));
                //如果是部门表,如果部门编号为1,则获取该部门的名字。
                if(val.getTable().equals("DEP")){
                    depName = val.getDepName();
                }
            }
            //如果上面部门编号是1,则这里也是1。
            for (EmpJoinDep listjoin : list) {
                //如果是员工表,则需要设置员工的所属部门。
                if(listjoin.getTable().equals("EMP")){
                    listjoin.setDepName(depName);
                    context.write(NullWritable.get(), listjoin);
                }

            }

        }

    } 

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf);
        if(fileSystem.exists(new Path(OUTPUT_PATH)))
        {
            fileSystem.delete(new Path(OUTPUT_PATH),true);
        }
        Job job = Job.getInstance(conf, "Reduce Join"); 

        job.setJarByClass(ReduceJoin.class);

        FileInputFormat.addInputPath(job, new Path(INPUT_PATH));  

        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(EmpJoinDep.class);

        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(EmpJoinDep.class);

        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }  
}


运行结果:

**上传数据:**
[root@liguodong file]# vi EMP
[root@liguodong file]# vi DEP
[root@liguodong file]# hdfs dfs -mkdir /inputjoin
[root@liguodong file]# hdfs dfs -put EMP /inputjoin/
[root@liguodong file]# hdfs dfs -put DEP /inputjoin/
[root@liguodong file]# hdfs dfs -cat /inputjoin/DEP
1            Sales
2            Dev
3            Mgt
[root@liguodong file]# hdfs dfs -cat /inputjoin/EMP
zhang       male     20           1
li         female    25           2
wang       female    30           3
zhou        male     35           2

[root@liguodong file]# hdfs dfs -cat /outputmapjoin/p*
EmpJoinDep [Name=zhang, Sex=male, Age=20, DepName=Sales]
EmpJoinDep [Name=zhou, Sex=male, Age=35, DepName=Dev]
EmpJoinDep [Name=li, Sex=female, Age=25, DepName=Dev]
EmpJoinDep [Name=wang, Sex=female, Age=30, DepName=Mgt]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: