您的位置:首页 > 运维架构

Hadoop基础教程-第7章 MapReduce进阶(7.5 MapReduce 连接)

2017-06-21 21:27 441 查看

第7章 MapReduce进阶

7.4 MapReduce 连接

连接操作,也就是常说的join操作,是数据分析时经常用到的操作。

比如有两份数据data1和data2,进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。如果数据量比较大,在内存进行连接操会发生内存溢出。MapReduce join就是用来解决大数据的连接问题。

7.4.1 准备数据

这里准备了Oracle数据库中的经典数据。

dept.txt文件存放部门数据。

[root@node1 data]# cat dept.txt
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
[root@node1 data]#


emp.txt文件存放雇员数据。

[root@node1 data]# cat emp.txt
7369,SMITH,CLERK,7902,17-12-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2-81,1250,500,30
7566,JONES,MANAGER,7839,02-4-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5-81,2850,,30
7782,CLARK,MANAGER,7839,09-6-81,2450,,10
7839,KING,PRESIDENT,,17-11-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9-81,1500,0,30
7900,JAMES,CLERK,7698,03-12-81,950,,30
7902,FORD,ANALYST,7566,03-12-81,3000,,20
7934,MILLER,CLERK,7782,23-1-82,1300,,10


上传到HDFS

hdfs dfs -mkdir -p input

hdfs dfs -put emp.txt input

hdfs dfs -put dept.txt input

[root@node1 data]# hdfs dfs -mkdir -p input
[root@node1 data]# hdfs dfs -put emp.txt input
[root@node1 data]# hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -ls input
Found 2 items
-rw-r--r--   3 root hbase         82 2017-06-23 11:04 input/dept.txt
-rw-r--r--   3 root hbase        513 2017-06-23 11:04 input/emp.txt
[root@node1 data]#


7.4.2 问题描述

求解每个雇员所在部门,输出格式:雇员名,部门名

比如

RESEARCH,SMITH
SALES,ALLEN


7.4.3 编程

这个问题与SQL中的连接操作类似,将问题转换未1:N问题。

一个部门有多个雇员,一个雇员在唯一的部门。转换为1:N问题,部门是1端,雇员是多段

具体思路是,在map阶段读入emp.txt和dept.txt文件,将join的字段作为map输出key,再将每条记录标记上文件名作为map输出value;在reduce阶段做笛卡尔积。

(1)定义Mapper类

package cn.hadron.mr.join;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//当前读取文件的路径
String filePath=((FileSplit)context.getInputSplit()).getPath().toString();
String joinKey="";
String joinValue="";
String fileFlag="";
String[] array=value.toString().split(",");
//判定当前行数据来自哪个文件
if(filePath.contains("dept.txt")){
fileFlag="l";//left
joinKey=array[0];//部门编号
joinValue=array[1];//部门名
}else if(filePath.contains("emp.txt")){
fileFlag="r";//right
joinKey=array[array.length-1];//部门编号
joinValue=array[1];//雇员名
}
//输出键值对,并标记来源文件
context.write(new Text(joinKey),new Text(joinValue+","+fileFlag));
}
}


(2)定义Reducer类

package cn.hadron.mr.join;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class JoinReducer extends Reducer<Text, Text, Text, Text>{
//相同部门的数据,发送到同一个reduce
@Override
protected void reduce(Text key, Iterable<Text> values,Context context)
throws IOException, InterruptedException {
Iterator<Text> it=values.iterator();
String deptName="";
List<String> empNames=new ArrayList<>();
while(it.hasNext()){
//取一行记录
String[] array=it.next().toString().split(",");
//判定当前记录来源于哪个文件,并根据文件格式解析记录获取相应的信息
if("l".equals(array[1])){//只有1条记录的flag=l
deptName=array[0];
}else if("r".equals(array[1])){
empNames.add(array[0]);
}
}
//求解笛卡尔积,对每个dept的1条记录与emp中多条记录作一次迭代
for(String en:empNames){
context.write(new Text(deptName), new Text(en));
}

}
}


(3)主方法

package cn.hadron.mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {
public static void main(String[] args) {
// 设置环境变量HADOOP_USER_NAME,其值是root
System.setProperty("HADOOP_USER_NAME", "root");
// Configuration类包含了Hadoop的配置
Configuration config = new Configuration();
// 设置fs.defaultFS
config.set("fs.defaultFS", "hdfs://192.168.80.131:8020");
// 设置yarn.resourcemanager节点
config.set("yarn.resourcemanager.hostname", "node1");
try {
FileSystem fs = FileSystem.get(config);
Job job = Job.getInstance(config);
job.setJarByClass(RunJob.class);
job.setJobName("JoinDemo");
// 设置Mapper和Reducer类
job.setMapperClass(JoinMapper.class);
job.setReducerClass(JoinReducer.class);
// 设置reduce方法输出key和value的类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 指定输入输出路径
FileInputFormat.addInputPath(job, new Path("/user/root/input/"));
Path outpath = new Path("/user/root/output/");
if (fs.exists(outpath)) {
fs.delete(outpath, true);
}
FileOutputFormat.setOutputPath(job, outpath);
// 提交任务,等待执行完成
boolean f = job.waitForCompletion(true);
if (f) {
System.out.println("job任务执行成功");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}


7.4.4 运行

run as –> Java Application

Eclipse控制台输出信息:

log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
job任务执行成功


查看结果

hdfs dfs -ls output

[root@node1 ~]# hdfs dfs -ls output
Found 2 items
-rw-r--r--   3 root hbase          0 2017-06-23 14:05 output/_SUCCESS
-rw-r--r--   3 root hbase        168 2017-06-23 14:05 output/part-r-00000
[root@hds117 ~]# hdfs dfs -cat output/part-r-00000
ACCOUNTING  MILLER
ACCOUNTING  KING
ACCOUNTING  CLARK
RESEARCH    FORD
RESEARCH    JONES
RESEARCH    SMITH
SALES   JAMES
SALES   TURNER
SALES   BLAKE
SALES   MARTIN
SALES   WARD
SALES   ALLEN
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐