您的位置:首页 > 其它

MapReduce案例学习(8) 列出工资最高的头三名员工姓名及其工资

2015-09-20 16:37 489 查看
设计思路:因为mapreduce对key能实现自动排序,当key为数字时就按自然顺序排序,是字母时按字典顺序排序。所以处理这个案例时巧妙的使用mapreduce对key值的自动排序功能,将employee对象直接作为key,并重新定义当key为employee对象时,mapreduce的排序规则。

map阶段:将employee对象作为key,可以将员工的工资作为value,或者因为员工工资可以从key中的employee对象中直接取得,所以value可以直接设置为NullWritable

reduce阶段:在对reduce的输入参数value进行遍历时,里面的对象都是根据key自动排好序的,所以直接输出即可。

这里关键的处理步骤是对employee对象方法进行处理,为了其交付给mapreduce自动排序,employee对象必须实现WritableComparable接口,并且重新write、readFields、compareTo方法。

注意:

(1)write、readFields方法是序列化和反序列化的过程,它要求两方法中属性的顺序和类型必须一直,否者将有异常;

(2)compareTo方法重新定义对象的排序规则,我们直接将对象按照工资的大小来进行排序。

package week06;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

/*
 * Employee Object
 */
public class Employee implements WritableComparable<Employee> {
	private String empno;
	private String ename;
	private String job;
	private String mgr;
	private String hiredate;
	private int sal;
	private int comm;
	private String deptno;
	private String dname;
	private String loc;
	private boolean valid = true;

	public void set(Employee emp) {
		this.valid = emp.isValid();
		this.empno = emp.getEmpno();
		this.ename = emp.getEname();
		this.job = emp.getJob();
		this.mgr = emp.getMgr();
		this.hiredate = emp.getHiredate();
		this.sal = emp.getSal();
		this.comm = emp.getComm();
		this.deptno = emp.getDeptno();
		this.dname = emp.getDname();
		this.loc = emp.getLoc();
	}

	//Emp_Test8使用这个compareTo方法
	 public int compareTo(Employee bean) {
		 if (this.sal >= bean.getSal()) {
			 return -1;
		 } else {
			 return 1;
		 }
	 }

	//Emp_Test9使用这个compareTo方法
//	public int compareTo(Employee bean) {
//		int total = this.sal + this.comm;
//		int bean_total = bean.getSal() + bean.getComm();
//		if (total >= bean_total) {
//			return -1;
//		} else {
//			return 1;
//		}
//	}

	public void write(DataOutput out) throws IOException {
		out.writeUTF(empno);
		out.writeUTF(ename);
		out.writeUTF(job);
		out.writeUTF(mgr);
		out.writeUTF(hiredate);
		out.writeInt(sal);
		out.writeInt(comm);
		out.writeUTF(deptno);
		out.writeUTF(dname);
		out.writeUTF(loc);
		out.writeBoolean(valid);
	}

	// 注意必须和write方法中的写入顺序和类型保持一致,否则会出错。
	public void readFields(DataInput in) throws IOException {
		this.empno = in.readUTF();
		this.ename = in.readUTF();
		this.job = in.readUTF();
		this.mgr = in.readUTF();
		this.hiredate = in.readUTF();
		this.sal = in.readInt();
		this.comm = in.readInt();
		this.deptno = in.readUTF();
		this.dname = in.readUTF();
		this.loc = in.readUTF();
		this.valid = in.readBoolean();
	}

	/**
	 * 格式化对象
	 */
	public static Employee parser(String line) {
		// System.out.println(line);
		// "7369	SMITH	CLERK	7902	1980-12-17	800		20"
		Employee emp = new Employee();
		String[] arr = line.split("\t");
		if (arr.length >= 8) {
			emp.setEmpno(arr[0]);
			emp.setEname(arr[1]);
			emp.setJob(arr[2]);
			emp.setMgr(arr[3]);
			emp.setHiredate(arr[4]);

			if ("".equals(arr[5]) || arr[5] == null) {
				emp.setSal(0);
			} else {
				try {
					emp.setSal(new Integer(arr[5]));
				} catch (Exception e) {
					emp.setSal(0);
				}
			}

			if ("".equals(arr[6]) || arr[6] == null) {
				emp.setComm(0);
			} else {
				try {
					emp.setComm(new Integer(arr[6]));
				} catch (Exception e) {
					emp.setComm(0);
				}
			}

			emp.setDeptno(arr[7]);
			if ("10".equals(emp.getDeptno())) {
				emp.setDname("ACCOUNTING");
				emp.setLoc("NEW YORK");
			} else if ("20".equals(emp.getDeptno())) {
				emp.setDname("RESEARCH");
				emp.setLoc("DALLAS");
			} else if ("30".equals(emp.getDeptno())) {
				emp.setDname("SALES");
				emp.setLoc("CHICAGO");
			} else if ("40".equals(emp.getDeptno())) {
				emp.setDname("OPERATIONS");
				emp.setLoc("BOSTON");
			} else {
				emp.setDname("other");
				emp.setLoc("other");
			}
			emp.setValid(true);
		} else {
			emp.setValid(false);
		}
		return emp;
	}

	public String getEmpno() {
		return empno;
	}

	public void setEmpno(String empno) {
		this.empno = empno;
	}

	public String getEname() {
		return ename;
	}

	public void setEname(String ename) {
		this.ename = ename;
	}

	public String getJob() {
		return job;
	}

	public void setJob(String job) {
		this.job = job;
	}

	public String getMgr() {
		return mgr;
	}

	public void setMgr(String mgr) {
		this.mgr = mgr;
	}

	public String getHiredate() {
		return hiredate;
	}

	public void setHiredate(String hiredate) {
		this.hiredate = hiredate;
	}

	public int getSal() {
		return sal;
	}

	public void setSal(int sal) {
		this.sal = sal;
	}

	public int getComm() {
		return comm;
	}

	public void setComm(int comm) {
		this.comm = comm;
	}

	public String getDeptno() {
		return deptno;
	}

	public void setDeptno(String deptno) {
		this.deptno = deptno;
	}

	public String getDname() {
		return dname;
	}

	public void setDname(String dname) {
		this.dname = dname;
	}

	public String getLoc() {
		return loc;
	}

	public void setLoc(String loc) {
		this.loc = loc;
	}

	public boolean isValid() {
		return valid;
	}

	public void setValid(boolean valid) {
		this.valid = valid;
	}

	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		sb.append("valid:" + this.valid);
		sb.append("\nempno:" + this.empno);
		sb.append("\nename:" + this.ename);
		sb.append("\njob:" + this.job);
		sb.append("\nmgr:" + this.mgr);
		sb.append("\nhiredate:" + this.hiredate);
		sb.append("\nsal:" + this.sal);
		sb.append("\ncomm:" + this.comm);
		sb.append("\ndeptno:" + this.deptno);
		sb.append("\ndname:" + this.dname);
		sb.append("\nloc:" + this.loc);
		return sb.toString();
	}

	public static void main(String args[]) {
		String line = "7698	BLAKE	MANAGER	7839	1981-05-01	2850		30";
		System.out.println(line);
		Employee emp = Employee.parser(line);
		System.out.println(emp);
	}
}


package week06;

import java.io.IOException;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//8) 列出工资最高的头三名员工姓名及其工资
public class Emp_Test8 extends Configured implements Tool {

	/**
	 * 计数器 用于计数各种异常数据
	 */
	enum Counter {
		LINESKIP,
	}

	/**
	 * MAP任务
	 */
	public static class Map extends
			Mapper<LongWritable, Text, Employee, NullWritable> {
		@Override
		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {

			String line = value.toString();// 每行文件
			// 输入文件首行,不处理
			if (line.contains("empno") == true) {
				return;
			}

			Employee emp = Employee.parser(line);
			if (emp.isValid()) {
				context.write(emp, NullWritable.get());
			} else {
				context.getCounter(Counter.LINESKIP).increment(1); // 出错令计数器+1
				return;
			}
		}
	}

	/**
	 * REDUCE
	 */
	public static class Reduce extends
			Reducer<Employee, NullWritable, Text, Text> {

		@Override
		public void reduce(Employee key, Iterable<NullWritable> values,
				Context context) throws IOException, InterruptedException {
			context.write(new Text(key.getEname()),
					new Text("--工资:" + key.getSal()));
		}
	}

	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		conf.set("mapred.job.tracker", "192.168.1.201:9001");
		String[] ioArgs = new String[] { "emp_in", "emp_out_test8" };
		String[] otherArgs = new GenericOptionsParser(conf, ioArgs)
				.getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: Test < input path > < output path >");
			System.exit(2);
		}

		Job job = new Job(conf, "week06_test_08"); // 任务名
		job.setJarByClass(Emp_Test8.class); // 指定Class

		FileInputFormat.addInputPath(job, new Path(otherArgs[0])); // 输入路径
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); // 输出路径

		job.setMapperClass(Map.class); // 调用上面Map类作为Map任务代码
		job.setReducerClass(Reduce.class);// 调用上面Reduce类作为Reduce任务代码

		job.setMapOutputKeyClass(Employee.class); // 指定map输出的KEY的格式
		job.setMapOutputValueClass(NullWritable.class);// 指定map输出的VALUE的格式

		job.setOutputKeyClass(Text.class); // 指定输出的KEY的格式
		job.setOutputValueClass(Text.class); // 指定输出的VALUE的格式

		job.waitForCompletion(true);

		// 输出任务完成情况
		System.out.println("任务名称:" + job.getJobName());
		System.out.println("任务成功:" + (job.isSuccessful() ? "是" : "否"));
		System.out.println("输入行数:"
				+ job.getCounters()
						.findCounter("org.apache.hadoop.mapred.Task$Counter",
								"MAP_INPUT_RECORDS").getValue());
		System.out.println("输出行数:"
				+ job.getCounters()
						.findCounter("org.apache.hadoop.mapred.Task$Counter",
								"MAP_OUTPUT_RECORDS").getValue());
		System.out.println("跳过的行:"
				+ job.getCounters().findCounter(Counter.LINESKIP).getValue());
		return job.isSuccessful() ? 0 : 1;
	}

	/**
	 * 设置系统说明 设置MapReduce任务
	 */
	public static void main(String[] args) throws Exception {
		// 记录开始时间
		DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
		Date start = new Date();

		// 运行任务
		int res = ToolRunner.run(new Configuration(), new Emp_Test8(), args);

		// 输出任务耗时
		Date end = new Date();
		float time = (float) ((end.getTime() - start.getTime()) / 60000.0);
		System.out.println("任务开始:" + formatter.format(start));
		System.out.println("任务结束:" + formatter.format(end));
		System.out.println("任务耗时:" + String.valueOf(time) + " 分钟");

		System.exit(res);
	}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: