您的位置:首页 > 其它

2018-08-02 期 MapReduce实现多表查询自连接

2018-08-02 08:56 363 查看
1、员工对象EmployeeBean
package cn.sjq.bigdata.mr.self.join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
/**
* 员工对象EmployeeBean
* 由于该对象需要做为Mapper的输出,因此需要实现Writable接口
* @author songjq
*/
public class EmployeeBean implements Writable {
// 定义成员属性
private int deptNo = 0;
private int empno = 0;
private String ename = "";
private String job = "";
private int mgr = 0;
private String hiredate = "";
private float salary = 0f;
private float comm = 0f;
// 定义老板对象和员工对象标志位flag 0:员工对象 1:老板对象
private int flag = 0;
public int getDeptNo() {
return deptNo;
}
public void setDeptNo(int deptNo) {
this.deptNo = deptNo;
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public int getMgr() {
return mgr;
}
public void setMgr(int mgr) {
this.mgr = mgr;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public float getSalary() {
return salary;
}
public void setSalary(float salary) {
this.salary = salary;
}
public float getComm() {
return comm;
}
public void setComm(float comm) {
this.comm = comm;
}
public int getFlag() {
return flag;
}
public void setFlag(int flag) {
this.flag = flag;
}
public EmployeeBean(int deptNo, int empno, String ename, String job, int mgr, String hiredate, float salary,
float comm, int flag) {
this.deptNo = deptNo;
this.empno = empno;
this.ename = ename;
this.job = job;
this.mgr = mgr;
this.hiredate = hiredate;
this.salary = salary;
this.comm = comm;
this.flag = flag;
}
public EmployeeBean() {
}
/*
* 反序列化 (non-Javadoc)
*
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
@Override
public void readFields(DataInput in) throws IOException {
this.deptNo = in.readInt();
this.empno = in.readInt();
this.ename = in.readUTF();
this.job = in.readUTF();
this.mgr = in.readInt();
this.hiredate = in.readUTF();
this.salary = in.readFloat();
this.comm = in.readFloat();
this.flag = in.readInt();
}
/*
* 序列化 (non-Javadoc)
*
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.deptNo);
out.writeInt(this.empno);
out.writeUTF(this.ename);
out.writeUTF(this.job);
out.writeInt(this.mgr);
out.writeUTF(this.hiredate);
out.writeFloat(this.salary);
out.writeFloat(this.comm);
out.writeInt(this.flag);
}
}
2、EmployeeBossBean最终输出对象
package cn.sjq.bigdata.mr.self.join;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/**
* 员工老板综合JavaBean
* 最终将EmployeeBossBean通过Reduce输出到HDFS
* 输出格式为:
* 通过编写MapReduce程序实现emp表自连接,输出格式为:
BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
7698 BLAKE 7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30
7698 BLAKE 7521 WARD SALESMAN 7698 1981/2/22 1250 500 30
7698 BLAKE 7654 MARTIN SALESMAN 7698 1981/9/28 1250 1400 30

由于EmployeeBossBean最终要输出到HDFS,因此需要序列化EmployeeBossBean,实现WritableComparable接口
* @author songjq
*
*/
public class EmployeeBossBean implements WritableComparable<EmployeeBossBean> {
// 定义成员属性
private int bossno = 0;
private String bossname = "-";
private int deptNo = 0;
private int empno = 0;
private String ename = "-";
private String job = "";
private String hiredate = "-";
private float salary = 0f;
private float comm = 0f;
//定义对象是否输出文件的表头
//如:BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
private boolean isTableHeader = false;
public int getBossno() {
return bossno;
}
public void setBossno(int bossno) {
this.bossno = bossno;
}
public String getBossname() {
return bossname;
}
public void setBossname(String bossname) {
this.bossname = bossname;
}
public int getDeptNo() {
return deptNo;
}
public void setDeptNo(int deptNo) {
this.deptNo = deptNo;
}
public int getEmpno() {
return empno;
}
public void setEmpno(int empno) {
this.empno = empno;
}
public String getEname() {
return ename;
}
public void setEname(String ename) {
this.ename = ename;
}
public String getJob() {
return job;
}
public void setJob(String job) {
this.job = job;
}
public String getHiredate() {
return hiredate;
}
public void setHiredate(String hiredate) {
this.hiredate = hiredate;
}
public float getSalary() {
return salary;
}
public void setSalary(float salary) {
this.salary = salary;
}
public float getComm() {
return comm;
}
public void setComm(float comm) {
this.comm = comm;
}
public boolean isTableHeader() {
return isTableHeader;
}
public void setTableHeader(boolean isTableHeader) {
this.isTableHeader = isTableHeader;
}
public EmployeeBossBean(int bossno, String bossname, int deptNo, int empno, String ename, String job,
String hiredate, float salary, float comm, boolean isTableHeader) {
this.bossno = bossno;
this.bossname = bossname;
this.deptNo = deptNo;
this.empno = empno;
this.ename = ename;
this.job = job;
this.hiredate = hiredate;
this.salary = salary;
this.comm = comm;
this.isTableHeader = isTableHeader;
}

public EmployeeBossBean() {}

/*
* 重写toString
* (non-Javadoc)
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
// BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
// 7698 BLAKE 7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30
if (this.isTableHeader == true) {
return "BOSSNO\t" + formatStr("BOSSNAME", 12) + "DEPTNO\t" + formatStr("DEPTNO", 8) + formatStr("ENAME", 12)
+ formatStr("JOB", 12) + formatStr("HIREDATE", 12) + formatStr("SAL", 6) + "\t"
+ formatStr("COMM", 6) + "\t";
} else {
return this.bossno == 0 ? "\t"
: this.bossno + "\t" + formatStr(this.bossname, 12)
+ formatStr(this.deptNo == 0 ? "-" : String.valueOf(this.deptNo), 8)
+ (this.empno == 0 ? "-" : this.empno) + "\t" + formatStr(this.ename, 12)
+ formatStr(this.job, 12) + formatStr(this.hiredate, 12)
+ formatStr(this.salary == 0f ? "-" : String.valueOf(this.salary), 6) + "\t"
+ formatStr(this.comm == 0f ? "-" : String.valueOf(this.comm), 6) + "\t";
}
}

/**
* 字符串填充空格
* @param str
* @param length
* @return
*/
public static String formatStr(String str, int length)
{
if (str == null)
{
str="";
}
int strLen = str.getBytes().length;
if (strLen == length)
{
return str;
} else if (strLen < length)
{
int temp = length - strLen;
String tem = "";
for (int i = 0; i < temp; i++)
{
tem = tem + " ";
}
return str + tem;
} else
{
return str.substring(0, length);
}
}

/*
* 反序列化
* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
@Override
public void readFields(DataInput in) throws IOException {
this.bossno = in.readInt();
this.bossname = in.readUTF();
this.deptNo = in.readInt();
this.empno = in.readInt();
this.ename = in.readUTF();
this.job = in.readUTF();
this.hiredate = in.readUTF();
this.salary = in.readFloat();
this.comm = in.readFloat();
this.isTableHeader = in.readBoolean();
}

/* 序列化
* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.bossno);
out.writeUTF(this.bossname);
out.writeInt(this.deptNo);
out.writeInt(this.empno);
out.writeUTF(this.ename);
out.writeUTF(this.job);
out.writeUTF(this.hiredate);
out.writeFloat(this.salary);
out.writeFloat(this.comm);
out.writeBoolean(this.isTableHeader);

}
/*
* 采用默认排序即可
* (non-Javadoc)
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
@Override
public int compareTo(EmployeeBossBean o) {
return 0;
}
}
3、Mapper、Reducer、job实现
package cn.sjq.bigdata.mr.self.join;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.junit.Test;
/**
* 通过MapReduce实现emp表自连接
* 原始数据如下:
7369,SMITH,CLERK,7902,1980/12/17,800,,20
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30
7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30
7566,JONES,MANAGER,7839,1981/4/2,2975,,20
7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30
7782,CLARK,MANAGER,7839,1981/6/9,2450,,10
7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20
7839,KING,PRESIDENT,,1981/11/17,5000,,10
7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30
7876,ADAMS,CLERK,7788,1987/5/23,1100,,20
7900,JAMES,CLERK,7698,1981/12/3,950,,30
7902,FORD,ANALYST,7566,1981/12/3,3000,,20
7934,MILLER,CLERK,7782,1982/1/23,1300,,10

最终输出结果如下:
通过编写MapReduce程序实现emp表自连接,输出格式为:
BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
7698 BLAKE 7499 ALLEN SALESMAN 7698 1981/2/20 1600 300 30
7698 BLAKE 7521 WARD SALESMAN 7698 1981/2/22 1250 500 30
7698 BLAKE 7654 MARTIN SALESMAN 7698 1981/9/28 1250 1400 30
实现逻辑:
(1)emp表中mgr为员工老板的编号
(2)老板mgr也是一位员工
(3)关系型数据库中实现逻辑为对emp表进行重命名作为多张表使用,假设这里emp e表示员工,emp b表示boss老板,因此核心where条件为e.empno = b.mgr这样就可以实现emp表的自连接
(4)MapReduce也是利用这样连接条件来实现emp表的自连接
假设 EMPNO ENAME MGR
7521 WARD 7698
7654 MARTIN 7698
作为emp普通员工表
BOSSNO(MGR) (BOSSNAME)ENAME
7698 BLAKE
作为boss老板表,这样连接条件就为emp.empno = boss.bossno
这样,如果我们将mapper中输入的数据作为emp普通员工表,那输出的
<k2 v2>
BOSSNO(MGR) EmployeeBean对象
如果我们将mapper中输入的数据作为老板表,那输出的
<k2 v2>
EMPNO EmployeeBean对象
最后在结合MapReduce原理,相同的Key会采用同一Reduce处理,就可以实现我们emp表的自连接。
* @author songjq
*
*/
public class EmpSelfJoin {

/**
* Mapper端类
* k1:输入偏移量 LongWritable
* v1:读入的一行数据 Text
* k2:输出key IntWritable
* v2:输出value EmployeeBean
* @author songjq
*/
static class EmpSelfJoinMapper extends Mapper<LongWritable, Text, IntWritable, EmployeeBean> {
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {

//读入数据
String line = v1.toString();
//分词
String[] fields = line.split(",");

//定义员工对象
EmployeeBean emp = new EmployeeBean();
// 7369,SMITH,CLERK,7902,1980/12/17,800,,20
emp.setDeptNo(Integer.parseInt(fields[7]));
emp.setEmpno(Integer.parseInt(fields[0]));
emp.setEname(fields[1]);
emp.setJob(fields[2]);
try {
emp.setMgr(Integer.parseInt(fields[3]));
}catch (Exception e) {
emp.setMgr(0);
}

emp.setHiredate(fields[4]);
try {
// 防止fileds[5]为空抛空指针异常
emp.setSalary(Float.parseFloat(fields[5]));
} catch (Exception e) {
emp.setSalary(0);
}
try {
// 防止fileds[6]为空抛空指针异常
emp.setComm(Float.parseFloat(fields[6]));
} catch (Exception e) {
emp.setComm(0);
}

//0 表示员工
emp.setFlag(0);

//将输入的数据作为员工表写出去
context.write(new IntWritable(emp.getMgr()), emp);
//将输入的数据作为老板表写出去
//1 表示老板
emp.setFlag(1);
context.write(new IntWritable(emp.getEmpno()), emp);

}
}

/**
* Reducer端
* k3:输入key IntWritable
* v3:输ru value EmployeeBean集合
* k4:输出key NullWritable
* v4:输出value EmployeeBossBean
* 利用Reduce相同key会被同意Reduce处理原理实现emp表自连接
* @author songjq
*
*/
static class EmpSelfJoinReducer extends Reducer<IntWritable, EmployeeBean, NullWritable, EmployeeBossBean> {
@Override
protected void reduce(IntWritable k3, Iterable<EmployeeBean> v3, Context ctx)
throws IOException, InterruptedException {
//定义最终输出的对象list
ArrayList<EmployeeBossBean> eblist = new ArrayList<EmployeeBossBean>();
//定义老板属性
String bossname = "";

// 老板是否有员工计数器,1表示老板没有员工,大于1表示有员工
for (EmployeeBean emp : v3) {
if (emp.getFlag() == 1) {
// 老板对象
bossname = emp.getEname();
} else if (emp.getFlag() == 0) {
// 员工对象
EmployeeBossBean ebtmp = new EmployeeBossBean();
ebtmp.setDeptNo(emp.getDeptNo());
ebtmp.setEmpno(emp.getEmpno());
ebtmp.setEname(emp.getEname());
ebtmp.setJob(emp.getJob());
ebtmp.setHiredate(emp.getHiredate());
ebtmp.setSalary(emp.getSalary());
ebtmp.setComm(emp.getComm());
ebtmp.setBossno(emp.getMgr());
eblist.add(ebtmp);
}
}
// 将老板名称追加到eblist
for (EmployeeBossBean eb : eblist) {
eb.setBossname(bossname);
ctx.write(NullWritable.get(), eb);
}
}
/*
* 初始化输出文件表头
* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.Reducer#setup(org.apache.hadoop.mapreduce.Reducer.Context)
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//BOSSNO BOSSNAM EMPNO ENAME JOB MGR HIREDATE SAL COMM DEPTNO
EmployeeBossBean eheader = new EmployeeBossBean();
eheader.setTableHeader(true);
context.write(NullWritable.get(), eheader);
}
}

/**
* 提交Job
* @throws Exception
*/
@Test
public void EmpSelfJoinJob() throws Exception {

Job job = Job.getInstance(new Configuration());

job.setJarByClass(EmpSelfJoin.class);

job.setMapperClass(EmpSelfJoinMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(EmployeeBean.class);

job.setReducerClass(EmpSelfJoinReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(EmployeeBossBean.class);

FileInputFormat.setInputPaths(job, new Path("D:\\test\\tmp\\join\\self\\emp.csv"));
FileOutputFormat.setOutputPath(job, new Path("D:\\test\\tmp\\join\\self\\output4"));

job.waitForCompletion(true);

}
}
4、执行输出结果
BOSSNO BOSSNAME DEPTNO DEPTNO ENAME JOB HIREDATE SAL COMM
7566 JONES 20 7902 FORD ANALYST 1981/12/3 3000.0 -
7566 JONES 20 7788 SCOTT ANALYST 1987/4/19 3000.0 -
7698 BLAKE 30 7499 ALLEN SALESMAN 1981/2/20 1600.0 300.0
7698 BLAKE 30 7900 JAMES CLERK 1981/12/3 950.0 -
7698 BLAKE 30 7844 TURNER SALESMAN 1981/9/8 1500.0 -
7698 BLAKE 30 7654 MARTIN SALESMAN 1981/9/28 1250.0 1400.0
7698 BLAKE 30 7521 WARD SALESMAN 1981/2/22 1250.0 500.0
7782 CLARK 10 7934 MILLER CLERK 1982/1/23 1300.0 -
7788 SCOTT 20 7876 ADAMS CLERK 1987/5/23 1100.0 -
7839 KING 10 7782 CLARK MANAGER 1981/6/9 2450.0 -
7839 KING 30 7698 BLAKE MANAGER 1981/5/1 2850.0 -
7839 KING 20 7566 JONES MANAGER 1981/4/2 2975.0 -
7902 FORD 20 7369 SMITH CLERK 1980/12/17 800.0 -
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息