您的位置:首页 > 其它

MapReduce案例学习(6) 列出工资比公司平均工资要高的员工姓名及其工资

2015-09-20 16:22 501 查看
设计思路:

map阶段:这里需要汇总所有员工的工资计算平均工资,所以用了一个统一的名称作为key以便把所有员工都汇总到起来,然后将员工姓名和工资用逗号分隔拼接为字符串作为value输出;

reduce阶段:所有员工都在汇总到一起,遍历传入的value,对其数据进行分拆获得员工姓名和工资,并将他以姓名作为key,工资作为value加入到一个HashMap对象中。遍历value的同时叠加所有人员的工资和人员数量求出公司的平均工资。最后遍历刚才定义的员工HashMap对象,取出员工工资和平均工资作比较。

package week06;

import java.io.IOException;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//6) 列出工资比公司平均工资要高的员工姓名及其工资
public class Emp_Test6 extends Configured implements Tool {

	/**
	 * 计数器 用于计数各种异常数据
	 */
	enum Counter {
		LINESKIP,
	}

	/**
	 * MAP任务
	 */
	public static class Map extends Mapper<LongWritable, Text, Text, Text> {

		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString();// 每行文件
			// 输入文件首行,不处理
			if (line.contains("empno") == true) {
				return;
			}

			Employee emp = Employee.parser(line);
			if (emp.isValid()) {
				context.write(new Text("emp"), new Text(emp.getEname() + ","
						+ emp.getSal()));
			} else {
				context.getCounter(Counter.LINESKIP).increment(1); // 出错令计数器+1
				return;
			}
		}
	}

	/**
	 * REDUCE
	 */
	public static class Reduce extends Reducer<Text, Text, Text, Text> {
		float total_salary = 0;
		int total_num = 0;

		@Override
		public void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {

			HashMap<String, Float> m = new HashMap<String, Float>();

			for (Text value : values) {
				String[] s = value.toString().split(",");
				total_num += 1;
				total_salary += new Float(s[1]);
				m.put(s[0], new Float(s[1]));
			}

			float pingjun = total_salary / total_num;

			// System.out.println("pingjun:" + pingjun);

			for (String k : m.keySet()) {
				if (m.get(k) > pingjun) {
					// System.out.println(k + "--salary:" + m.get(k));
					context.write(new Text(k), new Text("--员工工资:" + m.get(k)
							+ "--平均工资:" + pingjun));
				}
			}
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		conf.set("mapred.job.tracker", "192.168.1.201:9001");
		String[] ioArgs = new String[] { "emp_in", "emp_out_test6" };
		String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
				.getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: Test < input path > < output path >");
			System.exit(2);
		}

		Job job = new Job(conf, "week06_test_06"); // 任务名
		job.setJarByClass(Emp_Test6.class); // 指定Class

		FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 输入路径
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 输出路径

		job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
		job.setReducerClass(Reduce.class);// 调用上面Reduce类作为Reduce任务代码

		job.setMapOutputKeyClass(Text.class); // 指定map输出的KEY的格式
		job.setMapOutputValueClass(Text.class);// 指定map输出的VALUE的格式

		job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
		job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式

		job.waitForCompletion(true);

		// 输出任务完成情况
		System.out.println("任务名称:" + job.getJobName());
		System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
		System.out.println("输入行数:"
				+ job.getCounters()
						.findCounter("org.apache.hadoop.mapred.Task$Counter",
								"MAP_INPUT_RECORDS").getValue());
		System.out.println("输出行数:"
				+ job.getCounters()
						.findCounter("org.apache.hadoop.mapred.Task$Counter",
								"MAP_OUTPUT_RECORDS").getValue());
		System.out.println("跳过的行:"
				+ job.getCounters().findCounter(Counter.LINESKIP).getValue());
		return job.isSuccessful() ? 0 : 1;
	}

	/**
	 * 设置系统说明 设置MapReduce任务
	 */
	public static void main(String[] args) throws Exception {
		// 记录开始时间
		DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		Date start = new Date();

		// 运行任务
		int res = ToolRunner.run(new Configuration(), new Emp_Test6(), args);

		// 输出任务耗时
		Date end = new Date();
		float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
		System.out.println("任务开始:" + formatter.format(start));
		System.out.println("任务结束:" + formatter.format(end));
		System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");

		System.exit(res);
	}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: