Hadoop基础教程-第7章 MapReduce进阶(7.5 MapReduce 连接)
2017-06-21 21:27
441 查看
第7章 MapReduce进阶
7.4 MapReduce 连接
连接操作,也就是常说的join操作,是数据分析时经常用到的操作。比如有两份数据data1和data2,进行关键词连接是一个很通用的问题,如果数据量比较小,可以在内存中完成连接。如果数据量比较大,在内存进行连接操会发生内存溢出。MapReduce join就是用来解决大数据的连接问题。
7.4.1 准备数据
这里准备了Oracle数据库中的经典数据。dept.txt文件存放部门数据。
[root@node1 data]# cat dept.txt 10,ACCOUNTING,NEW YORK 20,RESEARCH,DALLAS 30,SALES,CHICAGO 40,OPERATIONS,BOSTON [root@node1 data]#
emp.txt文件存放雇员数据。
[root@node1 data]# cat emp.txt 7369,SMITH,CLERK,7902,17-12-80,800,,20 7499,ALLEN,SALESMAN,7698,20-2-81,1600,300,30 7521,WARD,SALESMAN,7698,22-2-81,1250,500,30 7566,JONES,MANAGER,7839,02-4-81,2975,,20 7654,MARTIN,SALESMAN,7698,28-9-81,1250,1400,30 7698,BLAKE,MANAGER,7839,01-5-81,2850,,30 7782,CLARK,MANAGER,7839,09-6-81,2450,,10 7839,KING,PRESIDENT,,17-11-81,5000,,10 7844,TURNER,SALESMAN,7698,08-9-81,1500,0,30 7900,JAMES,CLERK,7698,03-12-81,950,,30 7902,FORD,ANALYST,7566,03-12-81,3000,,20 7934,MILLER,CLERK,7782,23-1-82,1300,,10
上传到HDFS
hdfs dfs -mkdir -p input
hdfs dfs -put emp.txt input
hdfs dfs -put dept.txt input
[root@node1 data]# hdfs dfs -mkdir -p input [root@node1 data]# hdfs dfs -put emp.txt input [root@node1 data]# hdfs dfs -put dept.txt input [root@node1 data]# hdfs dfs -ls input Found 2 items -rw-r--r-- 3 root hbase 82 2017-06-23 11:04 input/dept.txt -rw-r--r-- 3 root hbase 513 2017-06-23 11:04 input/emp.txt [root@node1 data]#
7.4.2 问题描述
求解每个雇员所在部门,输出格式:雇员名,部门名比如
RESEARCH,SMITH SALES,ALLEN
7.4.3 编程
这个问题与SQL中的连接操作类似,将问题转换未1:N问题。一个部门有多个雇员,一个雇员在唯一的部门。转换为1:N问题,部门是1端,雇员是多段
具体思路是,在map阶段读入emp.txt和dept.txt文件,将join的字段作为map输出key,再将每条记录标记上文件名作为map输出value;在reduce阶段做笛卡尔积。
(1)定义Mapper类
package cn.hadron.mr.join; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class JoinMapper extends Mapper<LongWritable, Text, Text, Text>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //当前读取文件的路径 String filePath=((FileSplit)context.getInputSplit()).getPath().toString(); String joinKey=""; String joinValue=""; String fileFlag=""; String[] array=value.toString().split(","); //判定当前行数据来自哪个文件 if(filePath.contains("dept.txt")){ fileFlag="l";//left joinKey=array[0];//部门编号 joinValue=array[1];//部门名 }else if(filePath.contains("emp.txt")){ fileFlag="r";//right joinKey=array[array.length-1];//部门编号 joinValue=array[1];//雇员名 } //输出键值对,并标记来源文件 context.write(new Text(joinKey),new Text(joinValue+","+fileFlag)); } }
(2)定义Reducer类
package cn.hadron.mr.join; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class JoinReducer extends Reducer<Text, Text, Text, Text>{ //相同部门的数据,发送到同一个reduce @Override protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException { Iterator<Text> it=values.iterator(); String deptName=""; List<String> empNames=new ArrayList<>(); while(it.hasNext()){ //取一行记录 String[] array=it.next().toString().split(","); //判定当前记录来源于哪个文件,并根据文件格式解析记录获取相应的信息 if("l".equals(array[1])){//只有1条记录的flag=l deptName=array[0]; }else if("r".equals(array[1])){ empNames.add(array[0]); } } //求解笛卡尔积,对每个dept的1条记录与emp中多条记录作一次迭代 for(String en:empNames){ context.write(new Text(deptName), new Text(en)); } } }
(3)主方法
package cn.hadron.mr.join; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class RunJob { public static void main(String[] args) { // 设置环境变量HADOOP_USER_NAME,其值是root System.setProperty("HADOOP_USER_NAME", "root"); // Configuration类包含了Hadoop的配置 Configuration config = new Configuration(); // 设置fs.defaultFS config.set("fs.defaultFS", "hdfs://192.168.80.131:8020"); // 设置yarn.resourcemanager节点 config.set("yarn.resourcemanager.hostname", "node1"); try { FileSystem fs = FileSystem.get(config); Job job = Job.getInstance(config); job.setJarByClass(RunJob.class); job.setJobName("JoinDemo"); // 设置Mapper和Reducer类 job.setMapperClass(JoinMapper.class); job.setReducerClass(JoinReducer.class); // 设置reduce方法输出key和value的类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); // 指定输入输出路径 FileInputFormat.addInputPath(job, new Path("/user/root/input/")); Path outpath = new Path("/user/root/output/"); if (fs.exists(outpath)) { fs.delete(outpath, true); } FileOutputFormat.setOutputPath(job, outpath); // 提交任务,等待执行完成 boolean f = job.waitForCompletion(true); if (f) { System.out.println("job任务执行成功"); } } catch (Exception e) { e.printStackTrace(); } } }
7.4.4 运行
run as –> Java ApplicationEclipse控制台输出信息:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.util.Shell). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. job任务执行成功
查看结果
hdfs dfs -ls output
[root@node1 ~]# hdfs dfs -ls output Found 2 items -rw-r--r-- 3 root hbase 0 2017-06-23 14:05 output/_SUCCESS -rw-r--r-- 3 root hbase 168 2017-06-23 14:05 output/part-r-00000 [root@hds117 ~]# hdfs dfs -cat output/part-r-00000 ACCOUNTING MILLER ACCOUNTING KING ACCOUNTING CLARK RESEARCH FORD RESEARCH JONES RESEARCH SMITH SALES JAMES SALES TURNER SALES BLAKE SALES MARTIN SALES WARD SALES ALLEN
相关文章推荐
- Hadoop基础教程-第7章 MapReduce进阶(7.3 MapReduce API)(草稿)
- Hadoop基础教程-第7章 MapReduce进阶(7.2 MapReduce工作机制)(草稿)
- Hadoop基础教程-第7章 MapReduce进阶(7.4 自定义Key类型)
- Hadoop基础教程-第7章 MapReduce进阶(7.1 MapReduce过程)(草稿)
- Hadoop基础教程-第7章 MapReduce进阶(7.7 MapReduce 全排序)
- Hadoop基础教程-第7章 MapReduce进阶(7.6 MapReduce 二次排序)
- Hadoop教程(四):理解MapReduce、MapReduce计数器和连接、MapReduce Hadoop程序连接数据
- Hadoop基础教程-第12章 Hive:进阶(12.4 Hive Metastore)(草稿)
- 使用Eclipse插件连接配置Mapreduce说明与教程(hadoop-eclipse-plugin 2.6)
- Hadoop基础教程-第6章 MapReduce入门(6.1 MapReduce介绍)(草稿)
- *****MapReduce连接:重分区连接【里面分析了org.apache.hadoop.contrib.utils.join包中的基础数据join原理和优化后的抽象类】
- Hadoop基础教程-第12章 Hive:进阶(12.3 HiveServer2)(草稿)
- Hadoop基础教程-第6章 MapReduce入门(6.5 温度统计)(草稿)
- Hadoop基础教程-第6章 MapReduce入门(6.2 解读WordCount)(草稿)
- Hadoop基础教程-第6章 MapReduce入门(6.3 加速WordCount)(草稿)
- Hadoop基础教程-第6章 MapReduce入门(6.4 MapReduce程序框架)(草稿)
- Hadoop基础教程-第12章 Hive:进阶(12.1 内置函数)(草稿)
- hadoop基础教程(二) MapReduce 单词统计
- Hadoop基础教程-第12章 Hive:进阶(12.5 Hive外表)(草稿)
- Hadoop基础教程-第12章 Hive:进阶(12.2 自定义函数)(草稿)