MapReduce案例学习(8) 列出工资最高的头三名员工姓名及其工资
2015-09-20 16:37
489 查看
设计思路:因为mapreduce对key能实现自动排序,当key为数字时就按自然顺序排序,是字母时按字典顺序排序。所以处理这个案例时巧妙的使用mapreduce对key值的自动排序功能,将employee对象直接作为key,并重新定义当key为employee对象时,mapreduce的排序规则。
map阶段:将employee对象作为key,可以将员工的工资作为value,或者因为员工工资可以从key中的employee对象中直接取得,所以value可以直接设置为NullWritable
reduce阶段:在对reduce的输入参数value进行遍历时,里面的对象都是根据key自动排好序的,所以直接输出即可。
这里关键的处理步骤是对employee对象方法进行处理,为了其交付给mapreduce自动排序,employee对象必须实现WritableComparable接口,并且重新write、readFields、compareTo方法。
注意:
(1)write、readFields方法是序列化和反序列化的过程,它要求两方法中属性的顺序和类型必须一直,否者将有异常;
(2)compareTo方法重新定义对象的排序规则,我们直接将对象按照工资的大小来进行排序。
map阶段:将employee对象作为key,可以将员工的工资作为value,或者因为员工工资可以从key中的employee对象中直接取得,所以value可以直接设置为NullWritable
reduce阶段:在对reduce的输入参数value进行遍历时,里面的对象都是根据key自动排好序的,所以直接输出即可。
这里关键的处理步骤是对employee对象方法进行处理,为了其交付给mapreduce自动排序,employee对象必须实现WritableComparable接口,并且重新write、readFields、compareTo方法。
注意:
(1)write、readFields方法是序列化和反序列化的过程,它要求两方法中属性的顺序和类型必须一直,否者将有异常;
(2)compareTo方法重新定义对象的排序规则,我们直接将对象按照工资的大小来进行排序。
package week06; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /* * Employee Object */ public class Employee implements WritableComparable<Employee> { private String empno; private String ename; private String job; private String mgr; private String hiredate; private int sal; private int comm; private String deptno; private String dname; private String loc; private boolean valid = true; public void set(Employee emp) { this.valid = emp.isValid(); this.empno = emp.getEmpno(); this.ename = emp.getEname(); this.job = emp.getJob(); this.mgr = emp.getMgr(); this.hiredate = emp.getHiredate(); this.sal = emp.getSal(); this.comm = emp.getComm(); this.deptno = emp.getDeptno(); this.dname = emp.getDname(); this.loc = emp.getLoc(); } //Emp_Test8使用这个compareTo方法 public int compareTo(Employee bean) { if (this.sal >= bean.getSal()) { return -1; } else { return 1; } } //Emp_Test9使用这个compareTo方法 // public int compareTo(Employee bean) { // int total = this.sal + this.comm; // int bean_total = bean.getSal() + bean.getComm(); // if (total >= bean_total) { // return -1; // } else { // return 1; // } // } public void write(DataOutput out) throws IOException { out.writeUTF(empno); out.writeUTF(ename); out.writeUTF(job); out.writeUTF(mgr); out.writeUTF(hiredate); out.writeInt(sal); out.writeInt(comm); out.writeUTF(deptno); out.writeUTF(dname); out.writeUTF(loc); out.writeBoolean(valid); } // 注意必须和write方法中的写入顺序和类型保持一致,否则会出错。 public void readFields(DataInput in) throws IOException { this.empno = in.readUTF(); this.ename = in.readUTF(); this.job = in.readUTF(); this.mgr = in.readUTF(); this.hiredate = in.readUTF(); this.sal = in.readInt(); this.comm = in.readInt(); this.deptno = in.readUTF(); this.dname = in.readUTF(); this.loc = in.readUTF(); this.valid = in.readBoolean(); } /** * 格式化对象 */ public static Employee parser(String line) { // System.out.println(line); // "7369 SMITH CLERK 7902 1980-12-17 800 20" Employee emp = new Employee(); String[] arr = line.split("\t"); if (arr.length >= 8) { emp.setEmpno(arr[0]); emp.setEname(arr[1]); emp.setJob(arr[2]); emp.setMgr(arr[3]); emp.setHiredate(arr[4]); if ("".equals(arr[5]) || arr[5] == null) { emp.setSal(0); } else { try { emp.setSal(new Integer(arr[5])); } catch (Exception e) { emp.setSal(0); } } if ("".equals(arr[6]) || arr[6] == null) { emp.setComm(0); } else { try { emp.setComm(new Integer(arr[6])); } catch (Exception e) { emp.setComm(0); } } emp.setDeptno(arr[7]); if ("10".equals(emp.getDeptno())) { emp.setDname("ACCOUNTING"); emp.setLoc("NEW YORK"); } else if ("20".equals(emp.getDeptno())) { emp.setDname("RESEARCH"); emp.setLoc("DALLAS"); } else if ("30".equals(emp.getDeptno())) { emp.setDname("SALES"); emp.setLoc("CHICAGO"); } else if ("40".equals(emp.getDeptno())) { emp.setDname("OPERATIONS"); emp.setLoc("BOSTON"); } else { emp.setDname("other"); emp.setLoc("other"); } emp.setValid(true); } else { emp.setValid(false); } return emp; } public String getEmpno() { return empno; } public void setEmpno(String empno) { this.empno = empno; } public String getEname() { return ename; } public void setEname(String ename) { this.ename = ename; } public String getJob() { return job; } public void setJob(String job) { this.job = job; } public String getMgr() { return mgr; } public void setMgr(String mgr) { this.mgr = mgr; } public String getHiredate() { return hiredate; } public void setHiredate(String hiredate) { this.hiredate = hiredate; } public int getSal() { return sal; } public void setSal(int sal) { this.sal = sal; } public int getComm() { return comm; } public void setComm(int comm) { this.comm = comm; } public String getDeptno() { return deptno; } public void setDeptno(String deptno) { this.deptno = deptno; } public String getDname() { return dname; } public void setDname(String dname) { this.dname = dname; } public String getLoc() { return loc; } public void setLoc(String loc) { this.loc = loc; } public boolean isValid() { return valid; } public void setValid(boolean valid) { this.valid = valid; } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("valid:" + this.valid); sb.append("\nempno:" + this.empno); sb.append("\nename:" + this.ename); sb.append("\njob:" + this.job); sb.append("\nmgr:" + this.mgr); sb.append("\nhiredate:" + this.hiredate); sb.append("\nsal:" + this.sal); sb.append("\ncomm:" + this.comm); sb.append("\ndeptno:" + this.deptno); sb.append("\ndname:" + this.dname); sb.append("\nloc:" + this.loc); return sb.toString(); } public static void main(String args[]) { String line = "7698 BLAKE MANAGER 7839 1981-05-01 2850 30"; System.out.println(line); Employee emp = Employee.parser(line); System.out.println(emp); } }
package week06; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //8) 列出工资最高的头三名员工姓名及其工资 public class Emp_Test8 extends Configured implements Tool { /** * 计数器 用于计数各种异常数据 */ enum Counter { LINESKIP, } /** * MAP任务 */ public static class Map extends Mapper<LongWritable, Text, Employee, NullWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString();// 每行文件 // 输入文件首行,不处理 if (line.contains("empno") == true) { return; } Employee emp = Employee.parser(line); if (emp.isValid()) { context.write(emp, NullWritable.get()); } else { context.getCounter(Counter.LINESKIP).increment(1); // 出错令计数器+1 return; } } } /** * REDUCE */ public static class Reduce extends Reducer<Employee, NullWritable, Text, Text> { @Override public void reduce(Employee key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(new Text(key.getEname()), new Text("--工资:" + key.getSal())); } } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapred.job.tracker", "192.168.1.201:9001"); String[] ioArgs = new String[] { "emp_in", "emp_out_test8" }; String[] otherArgs = new GenericOptionsParser(conf, ioArgs) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Test < input path > < output path >"); System.exit(2); } Job job = new Job(conf, "week06_test_08"); // 任务名 job.setJarByClass(Emp_Test8.class); // 指定Class FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 输出路径 job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码 job.setReducerClass(Reduce.class);// 调用上面Reduce类作为Reduce任务代码 job.setMapOutputKeyClass(Employee.class); // 指定map输出的KEY的格式 job.setMapOutputValueClass(NullWritable.class);// 指定map输出的VALUE的格式 job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式 job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式 job.waitForCompletion(true); // 输出任务完成情况 System.out.println("任务名称:" + job.getJobName()); System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("输入行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("输出行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 设置MapReduce任务 */ public static void main(String[] args) throws Exception { // 记录开始时间 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 运行任务 int res = ToolRunner.run(new Configuration(), new Emp_Test8(), args); // 输出任务耗时 Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任务开始:" + formatter.format(start)); System.out.println("任务结束:" + formatter.format(end)); System.out.println("任务耗时:" + String.valueOf(time) + " 分钟"); System.exit(res); } }
相关文章推荐
- jAVA 得到Map价值
- 实现杨辉三角的思路及代码
- KVC 键值编码与KVO键值监听的应用(一)
- (转载)Ubuntu下备份系统的方法
- 学习计算器的源代码
- SQL Server使用
- 3Sum, 3Sum Closest, 4 Sum
- Connection reset原因分析和解决方案
- String.Format(C#)
- Java程序设计基础(三):方法
- 2016蘑菇街笔试——看清问题的本质
- String.Format(C#)
- 文字与图像的绘制
- linux常用命令(41):route 命令
- 数据结构--堆的实现(下)
- hadoop2.0安装和配置
- 1.11正则表达式基础(学习过程)
- Linux中的sleep函数
- Android Layout 优化
- 设计模式的认识