MapReduce实现Reduce端Join操作实例
2015-06-15 22:56
513 查看
使用案例:
联接两张表 Table EMP:(新建文件EMP,第一行属性名不要) Name Sex Age DepNo zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 Table Dep:(新建文件DEP,第一行属性名不要) DepNo DepName 1 Sales 2 Dev 3 Mgt Inner join: select Name,Sex,Age,DepName from EMP inner join DEP on EMP.DepNo=DEP.DepNo Result: Name Sex Age DepName zhang male 20 Sales li female 25 Dev wang female 30 Mgt zhou male 35 Dev
接下来使用MapReduce实进行Join操作。
Reduce端进行Join操作
reduce端联接比map端联接更普遍,因为输入的数据不需要特定的结构;效率低,因为所有数据必须经过shuffle过程,但是编写简单。基本思路:
1、Map端读取所有文件,并在输出的内容里加上标识代表数据是从哪个文件里来的;
2、在reduce处理函数里,按照标识对数据进行保存;
3、然后根据Key的Join来求出结果直接输出;
package Join; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class EmpJoinDep implements WritableComparable{ private String Name=""; private String Sex=""; private int Age=0; private int DepNo=0; private String DepName=""; private String table=""; public EmpJoinDep() {} public EmpJoinDep(EmpJoinDep empJoinDep) { this.Name = empJoinDep.getName(); this.Sex = empJoinDep.getSex(); this.Age = empJoinDep.getAge(); this.DepNo = empJoinDep.getDepNo(); this.DepName = empJoinDep.getDepName(); this.table = empJoinDep.getTable(); } public String getName() { return Name; } public void setName(String name) { Name = name; } public String getSex() { return Sex; } public void setSex(String sex) { this.Sex = sex; } public int getAge() { return Age; } public void setAge(int age) { this.Age = age; } public int getDepNo() { return DepNo; } public void setDepNo(int depNo) { DepNo = depNo; } public String getDepName() { return DepName; } public void setDepName(String depName) { DepName = depName; } public String getTable() { return table; } public void setTable(String table) { this.table = table; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(Name); out.writeUTF(Sex); out.writeInt(Age); out.writeInt(DepNo); out.writeUTF(DepName); out.writeUTF(table); } @Override public void readFields(DataInput in) throws IOException { this.Name = in.readUTF(); this.Sex = in.readUTF(); this.Age = in.readInt(); this.DepNo = in.readInt(); this.DepName = in.readUTF(); this.table = in.readUTF(); } //不做任何排序 @Override public int compareTo(Object o) { return 0; } @Override public String toString() { return "EmpJoinDep [Name=" + Name + ", Sex=" + Sex + ", Age=" + Age + ", DepName=" + DepName + "]"; } }
package Join; import java.io.IOException; import java.net.URI; import java.util.LinkedList; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class ReduceJoin { private final static String INPUT_PATH = "hdfs://liguodong:8020/inputjoin"; private final static String OUTPUT_PATH = "hdfs://liguodong:8020/outputmapjoin"; public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, EmpJoinDep>{ private EmpJoinDep empJoinDep = new EmpJoinDep(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] values = value.toString().split("\\s+"); if(values.length==4){ empJoinDep.setName(values[0]); empJoinDep.setSex(values[1]); empJoinDep.setAge(Integer.parseInt(values[2])); empJoinDep.setDepNo(Integer.parseInt(values[3])); empJoinDep.setTable("EMP"); context.write(new IntWritable(Integer.parseInt(values[3])), empJoinDep); } if(values.length==2){ empJoinDep.setDepNo(Integer.parseInt(values[0])); empJoinDep.setDepName(values[1]); empJoinDep.setTable("DEP"); context.write(new IntWritable(Integer.parseInt(values[0])), empJoinDep); } } } public static class MyReducer extends Reducer<IntWritable, EmpJoinDep, NullWritable, EmpJoinDep>{ @Override protected void reduce(IntWritable key, Iterable<EmpJoinDep> values, Context context) throws IOException, InterruptedException { String depName = ""; List<EmpJoinDep> list = new LinkedList<EmpJoinDep>(); //1 emp //1 dep for (EmpJoinDep val : values) { list.add(new EmpJoinDep(val)); //如果是部门表,如果部门编号为1,则获取该部门的名字。 if(val.getTable().equals("DEP")){ depName = val.getDepName(); } } //如果上面部门编号是1,则这里也是1。 for (EmpJoinDep listjoin : list) { //如果是员工表,则需要设置员工的所属部门。 if(listjoin.getTable().equals("EMP")){ listjoin.setDepName(depName); context.write(NullWritable.get(), listjoin); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH),conf); if(fileSystem.exists(new Path(OUTPUT_PATH))) { fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(conf, "Reduce Join"); job.setJarByClass(ReduceJoin.class); FileInputFormat.addInputPath(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(EmpJoinDep.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(EmpJoinDep.class); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
运行结果:
**上传数据:** [root@liguodong file]# vi EMP [root@liguodong file]# vi DEP [root@liguodong file]# hdfs dfs -mkdir /inputjoin [root@liguodong file]# hdfs dfs -put EMP /inputjoin/ [root@liguodong file]# hdfs dfs -put DEP /inputjoin/ [root@liguodong file]# hdfs dfs -cat /inputjoin/DEP 1 Sales 2 Dev 3 Mgt [root@liguodong file]# hdfs dfs -cat /inputjoin/EMP zhang male 20 1 li female 25 2 wang female 30 3 zhou male 35 2 [root@liguodong file]# hdfs dfs -cat /outputmapjoin/p* EmpJoinDep [Name=zhang, Sex=male, Age=20, DepName=Sales] EmpJoinDep [Name=zhou, Sex=male, Age=35, DepName=Dev] EmpJoinDep [Name=li, Sex=female, Age=25, DepName=Dev] EmpJoinDep [Name=wang, Sex=female, Age=30, DepName=Mgt]
相关文章推荐
- Java 转义字符
- 233_尚学堂_高淇_java300集最全视频教程_【GOF23设计模式】_原型模式_prototype_浅复制_深复制_反序列化实现深复制
- 小故事,大道理
- 【Oracle篇】Oracle初试水
- 光标
- 为什么重写equals方法的同时也要重写hashcode方法?
- 15年湘潭邀请赛E题 Magic Triangle
- 生活随笔
- UIImageView
- 要么实现,要么遗憾
- LeetCode---(118)Pascal's Triangle
- 观察者模式-Observer
- Java并发编程-08-使用锁实现同步
- Map结构总结
- Theano2.1.4-基础知识之图结构
- Theano2.1.4-基础知识之图结构
- 43Exchange 2010升级到Exchange 2013-CAS切换邮件流测试
- 黑马程序员————————IO流 FileWriter类 和 FileReader类的一些基本用法
- 改变字体颜色的API函数
- java集合类的相关总结