MapReduce案例学习(6) 列出工资比公司平均工资要高的员工姓名及其工资
2015-09-20 16:22
501 查看
设计思路:
map阶段:这里需要汇总所有员工的工资计算平均工资,所以用了一个统一的名称作为key以便把所有员工都汇总到起来,然后将员工姓名和工资用逗号分隔拼接为字符串作为value输出;
reduce阶段:所有员工都在汇总到一起,遍历传入的value,对其数据进行分拆获得员工姓名和工资,并将他以姓名作为key,工资作为value加入到一个HashMap对象中。遍历value的同时叠加所有人员的工资和人员数量求出公司的平均工资。最后遍历刚才定义的员工HashMap对象,取出员工工资和平均工资作比较。
map阶段:这里需要汇总所有员工的工资计算平均工资,所以用了一个统一的名称作为key以便把所有员工都汇总到起来,然后将员工姓名和工资用逗号分隔拼接为字符串作为value输出;
reduce阶段:所有员工都在汇总到一起,遍历传入的value,对其数据进行分拆获得员工姓名和工资,并将他以姓名作为key,工资作为value加入到一个HashMap对象中。遍历value的同时叠加所有人员的工资和人员数量求出公司的平均工资。最后遍历刚才定义的员工HashMap对象,取出员工工资和平均工资作比较。
package week06; import java.io.IOException; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; //6) 列出工资比公司平均工资要高的员工姓名及其工资 public class Emp_Test6 extends Configured implements Tool { /** * 计数器 用于计数各种异常数据 */ enum Counter { LINESKIP, } /** * MAP任务 */ public static class Map extends Mapper<LongWritable, Text, Text, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString();// 每行文件 // 输入文件首行,不处理 if (line.contains("empno") == true) { return; } Employee emp = Employee.parser(line); if (emp.isValid()) { context.write(new Text("emp"), new Text(emp.getEname() + "," + emp.getSal())); } else { context.getCounter(Counter.LINESKIP).increment(1); // 出错令计数器+1 return; } } } /** * REDUCE */ public static class Reduce extends Reducer<Text, Text, Text, Text> { float total_salary = 0; int total_num = 0; @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { HashMap<String, Float> m = new HashMap<String, Float>(); for (Text value : values) { String[] s = value.toString().split(","); total_num += 1; total_salary += new Float(s[1]); m.put(s[0], new Float(s[1])); } float pingjun = total_salary / total_num; // System.out.println("pingjun:" + pingjun); for (String k : m.keySet()) { if (m.get(k) > pingjun) { // System.out.println(k + "--salary:" + m.get(k)); context.write(new Text(k), new Text("--员工工资:" + m.get(k) + "--平均工资:" + pingjun)); } } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapred.job.tracker", "192.168.1.201:9001"); String[] ioArgs = new String[] { "emp_in", "emp_out_test6" }; String[] otherArgs = new GenericOptionsParser(conf, ioArgs) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: Test < input path > < output path >"); System.exit(2); } Job job = new Job(conf, "week06_test_06"); // 任务名 job.setJarByClass(Emp_Test6.class); // 指定Class FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 输入路径 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 输出路径 job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码 job.setReducerClass(Reduce.class);// 调用上面Reduce类作为Reduce任务代码 job.setMapOutputKeyClass(Text.class); // 指定map输出的KEY的格式 job.setMapOutputValueClass(Text.class);// 指定map输出的VALUE的格式 job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式 job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式 job.waitForCompletion(true); // 输出任务完成情况 System.out.println("任务名称:" + job.getJobName()); System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否")); System.out.println("输入行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_INPUT_RECORDS").getValue()); System.out.println("输出行数:" + job.getCounters() .findCounter("org.apache.hadoop.mapred.Task$Counter", "MAP_OUTPUT_RECORDS").getValue()); System.out.println("跳过的行:" + job.getCounters().findCounter(Counter.LINESKIP).getValue()); return job.isSuccessful() ? 0 : 1; } /** * 设置系统说明 设置MapReduce任务 */ public static void main(String[] args) throws Exception { // 记录开始时间 DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date start = new Date(); // 运行任务 int res = ToolRunner.run(new Configuration(), new Emp_Test6(), args); // 输出任务耗时 Date end = new Date(); float time = (float) ((end.getTime() - start.getTime()) / 60000.0); System.out.println("任务开始:" + formatter.format(start)); System.out.println("任务结束:" + formatter.format(end)); System.out.println("任务耗时:" + String.valueOf(time) + " 分钟"); System.exit(res); } }
相关文章推荐
- Object-C 空@selector
- 09.20类类型random
- php Hash Table(二) Hash函数
- Linux上free命令的输出
- 1.6文件打包与解压缩(学习过程)
- nodeJs 笔记
- 最长不下降子序列
- cocos3.8屏幕截图
- 在链表中插入节点时候的一个trick
- ObjectAnimator详解(测试用,承接Android动画操作中的测试)
- MYSQL数据库学习----插入、更新、删除
- xcode更新之后插件失效的解决办法
- 友盟推送简单调用
- 1.5环境变量与文件查找(学习过程)
- uCOS-II移植
- 浅析Android-ViewPagerIndicator
- Crisis of HDU 2110 (母函数)
- 2015年9月20日工作日志-----------赵鑫
- 0909 初识编译原理
- [fiddler]目标计算机积极拒绝