大数据学习09:MapReduce基础
2019-03-31 00:03
344 查看
第六章:MapReduce
问题:
1、清空HDFS的回收站
-expunge Permanently delete files in checkpoints older than the retention threshold from trash directory, and create new checkpoint.
2、课程回看
=====================================
一、课程概述
依赖jar包 $HADOOP_HOME/share/hadoop/common $HADOOP_HOME/share/hadoop/common/lib $HADOOP_HOME/share/hadoop/mapreduce $HADOOP_HOME/share/hadoop/mapreducel/lib
二、MapReduce编程基础
案例一 1、分析WordCount的数据处理的过程(流程 P28) ----> 重要 2、开发自己的WordCount程序 案例二:员工表 3、分析:求每个部门的工资总额数据的处理过程 SQL: select deptno,sum(sal) from emp group by deptno; 4、开发实现自己的MapReduce
SalaryTotalMapper
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable,IntWritable> { @Override protected void map(LongWritable key1, Text value1, Context context) throws IOException, InterruptedException { // String data = value1.toString(); //分词 String[] words = data.split(","); //输出k2,v2部门号 v2员工的工资 context.write(new IntWritable(Integer.parseInt(words[7])),new IntWritable(Integer.parseInt(words[5]))); } }
SalaryTotalReduce
public class SalaryTotalReduce extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable> { @Override protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int total = 0; for (IntWritable v: values) { total = total+v.get(); } context.write(key,new IntWritable(total)); } }
驱动类 SalaryTotalMain
public class SalaryTotalMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //1.创建job任务 Configuration conf = new Configuration() ; conf.set("mapreduce.app-submission.cross-platform", "true"); // 跨平台,保证在 Windows 下可以提交 mr job Job job = Job.getInstance(conf); //2.指定jar包位置 job.setJarByClass(SalaryTotalMain.class); //3.关联使用的mapper类 job.setMapperClass(SalaryTotalMapper.class); //4.关联使用reduce类 job.setReducerClass(SalaryTotalReduce.class); //5.设置mapper阶段输出waitForCompletion job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(IntWritable.class); //6设置reduce阶段输出 job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //7设置输入的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); //8.设置输出的路径 FileOutputFormat.setOutputPath(job,new Path(args[1])); //开始执行 //9.提交任务 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
执行命令
[root@linux02 tmp]# hdfs dfs -put *.csv /scott [root@linux02 tmp]# hdfs dfs jar comitstart.jar /input/emp.csv /output/0910/s1
补充一下
idea打包java可执行jar包
三、MapReduce的特性
1、序列化:接口Writable
一个类实现了这个接口,该类的对象就可以作为key value(Map和Reduce的输入和输出)
注意:序列化的顺序,一定要跟反序列的顺序一样
EmployeeBean
//数据: 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 public class Employee implements Writable{ private int empno; private String ename; private String job; private int mgr; private String hiredate; private int sal; private int comm; private int deptno; @Override public void readFields(DataInput input) throws IOException { // 反序列化 this.empno = input.readInt(); this.ename = input.readUTF(); this.job = input.readUTF(); this.mgr = input.readInt(); this.hiredate = input.readUTF(); this.sal = input.readInt(); this.comm = input.readInt(); this.deptno = input.readInt(); } @Override public void write(DataOutput output) throws IOException { // 序列化 output.writeInt(this.empno); output.writeUTF(this.ename); output.writeUTF(this.job); output.writeInt(this.mgr); output.writeUTF(this.hiredate); output.writeInt(this.sal); output.writeInt(this.comm); output.writeInt(this.deptno); } public int getEmpno() { return empno; } public void setEmpno(int empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public int getMgr() { return mgr; } public void setMgr(int mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public int getComm() { return comm; } public void setComm(int comm) { this.comm = comm; } public int getDeptno() { return deptno; } public void setDeptno(int deptno) { this.deptno = deptno; } }
SalaryTotalMapper
public class SalaryTotalMapper extends Mapper<LongWritable, Text, IntWritable, Employee> { @Override protected void map(LongWritable key1, Text value1,Context context) throws IOException, InterruptedException { //数据:7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 String data = value1.toString(); //分词 String[] words = data.split(","); //创建员工对象 Employee e = new Employee(); //设置员工的属性 //员工号 e.setEmpno(Integer.parseInt(words[0])); //姓名 e.setEname(words[1]); //职位 e.setJob(words[2]); //老板号: 注意 可能没有老板号 try{ e.setMgr(Integer.parseInt(words[3])); }catch(Exception ex){ //没有老板号 e.setMgr(-1); } //入职日期 e.setHiredate(words[4]); //月薪 e.setSal(Integer.parseInt(words[5])); //奖金:注意:奖金也可能没有 try{ e.setComm(Integer.parseInt(words[6])); }catch(Exception ex){ //没有奖金 e.setComm(0); } //部门号 e.setDeptno(Integer.parseInt(words[7])); //输出:k2 部门号 v2 员工对象 context.write(new IntWritable(e.getDeptno()), //员工的部门号 e); //员工对象 } }
SalaryTotalReducer
// k3 v3 k4 v4 public class SalaryTotalReducer extends Reducer<IntWritable, Employee, IntWritable, IntWritable> { @Override protected void reduce(IntWritable k3, Iterable<Employee> v3,Context context) throws IOException, InterruptedException { //取出v3中的每个员工 进行工资求和 int total = 0; for(Employee e:v3){ total = total + e.getSal(); } //输出 context.write(k3, new IntWritable(total)); } }
SalaryTotalMain
public class SalaryTotalMain { public static void main(String[] args) throws Exception { // 创建一个job Job job = Job.getInstance(new Configuration()); job.setJarByClass(SalaryTotalMain.class); //指定job的mapper和输出的类型 k2 v2 job.setMapperClass(SalaryTotalMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Employee.class); //指定job的reducer和输出的类型 k4 v4 job.setReducerClass(SalaryTotalReducer.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(IntWritable.class); //指定job的输入和输出的路径 FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //执行任务 job.waitForCompletion(true); } }
复习:Java序列化:接口
如果一个类实现了Serializable接口,该类的对象可以作为InputStream(反序列化)和OutputStream对象(序列化)
2、排序:
(1)基本数据类型:数字 : 升序 字符串 1cca7 : 字典顺序 (2)对象的排序: 按照员工的部门号、薪水排序 select * from emp order by deptno,sal;
3、分区:
(1)什么分区 (2)Demo:按照员工的部门号进行分区
4、合并(Combiner):
在Mapper端,先执行一次Reducer 减少Mapper输出到Reduce的数据量
四、MapReduce的核心: shuffle 洗牌
五、MapReduce编程案例
1、数据去重:distinct
2、多表查询:部门表、员工表
(1)复习:笛卡尔积
(2)等值连接
(3)自连接
3、使用MapReduce实现倒排索引 4、使用MRUnit进行单元测试
六、第一个阶段小结
相关文章推荐
- 函数09 - 零基础入门学习C语言40
- .Net学习笔记----2015-07-22(C#基础复习09,虚方法、抽象方法、接口复习)
- Hadoop学习基础之三:MapReduce
- .Net学习笔记----2015-07-10(基础复习和练习09)
- 大数据学习记录(day6)-图说Mapreduce工作机制
- 黑马程序员_java基础学习笔记09_IO流
- 零基础转行大数据怎么学习?大数据学习路线
- 2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门
- 2018年全新升级大数据学习路线 +资料领取 让你从基础到精通
- 学习Hadoop大数据基础框架
- 大数据学习—基础知识
- 十条0基础入门学习大数据分析的路径,总有一条适合你!
- 黑马程序员——零基础学习iOS开发——09 预处理指令
- [转]Ultra Fractal教程系列09——学习基础技巧01——创建另一个分形
- MapReduce编程基础—学习笔记[2]
- 大数据学习之路:Linux基础
- 学习hadoop大数据基础框架需要什么基础
- 嵌入式开发之C基础学习笔记09--位段,枚举和文件操作
- 【大数据学习】数学基础及应用
- 【零基础学习iOS开发】【02-C语言】09-流程控制