您的位置:首页 > 运维架构

hadoop本地调试模式,测试 mapreducer

2016-04-05 15:05 423 查看
这次使用的是 win下开发 MR代码,可以进行debug模式进行调试,当没有问题的时候,可以修改一下源文件名 ,然后上传到linux运行

基本数据

dept文件内容:
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON

emp文件内容:
7369,SMITH,CLERK,7902,17-12月-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4月-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30
7782,CLARK,MANAGER,7839,09-6月-81,2450,,10
7839,KING,PRESIDENT,,17-11月-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
7900,JAMES,CLERK,7698,03-12月-81,950,,30
7902,FORD,ANALYST,7566,03-12月-81,3000,,20
7934,MILLER,CLERK,7782,23-1月-82,1300,,10

java代码

求每个部门最早进入公司的员工姓名
package cn.itcast.hadoop.mr.test;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

//求每个部门最早进入公司的员工姓名
public class Test3 extends Configured implements Tool{
private static String[]  arrs;
//MAP方法
public static class MapClass3 extends Mapper<LongWritable, Text, Text, Text>{
private   Map bumenmap = new HashMap();
BufferedReader in = null;
@Override
protected void setup( Context context)
throws IOException, InterruptedException {
BufferedReader in = null;

StringBuffer sb = new StringBuffer();
File f = new File("E:\\hadjar\\in\\dept.txt");
String line;
if (f.exists()) {
in = new BufferedReader(new InputStreamReader(
new FileInputStream(f)));
while (null != (line = in.readLine())) {
String[] lins = line.split(",");
// 装进数组用于查询
bumenmap.put(lins[0], lins[1]);
}

}

if (null != in) {
in.close();
}
}
// 10,ACCOUNTING,NEW YORK
// 7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
arrs=value.toString().split(",");
if (arrs.length>3) {
if (bumenmap.containsKey(arrs[7])) {
if (null!=arrs[4]&&null!=arrs[1]) {
//String gbkTime=new String(arrs[4].getBytes("UTF-8"),"gbk");
//System.out.println(gbkTime);
context.write(new Text(bumenmap.get(arrs[7]).toString()), new Text(new String(arrs[4]  )+","+arrs[1]));

System.out.println(arrs[7]+"-----"+new String(arrs[4].getBytes("gbk"),"UTF-8")+","+arrs[1]);
}

}
}

}
}

public static class Reducer3 extends Reducer<Text, Text, Text,Text>{

@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws  InterruptedException, IOException {
// 员工姓名和进入公司日期
String empName = null;
String empEnterDate = null;

// 设置日期转换格式和最早进入公司的员工、日期
DateFormat df = new SimpleDateFormat("dd-MM月-yy");

Date earliestDate = new Date() ;

String nowTime=df.format(earliestDate);

String showTime=null;

// 7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
//得到的数据是4和1
String[] reducerArrs=null;
for (Text val:values) {
reducerArrs=val.toString().split(",");//得到【20-2月-81,ALLEN】
String oldTime=reducerArrs[0];
// String newTime=new String(oldTime.getBytes("gbk"),"utf-8");

try {
Date yuangongtime= df.parse(reducerArrs[0]);//Sat Jan 23 00:00:00 CST 1982
Date nowtime2=new Date();//Thu Mar 24 13:52:49 CST 2016

if (yuangongtime.before(nowtime2) ) {
showTime=oldTime;
empName=reducerArrs[1];
}
} catch (ParseException e) {
e.printStackTrace();
}
}
context.write(  new Text("公司部门为"+key+":"), new Text("员工姓名是"+empName+",入职时间是:"+showTime));
System.out.println("公司部门为"+key+":"+"员工姓名是"+empName+",入职时间是:"+showTime);

}

}

@Override
public int run(String[] arg0) throws Exception {
//实例化作业对象
Job job=Job.getInstance();
job.setJobName("Test3");
job.setJarByClass(Test3.class);
job.setMapperClass(MapClass3.class);
job.setReducerClass(Reducer3.class);

//设置输入格式
job.setInputFormatClass(TextInputFormat.class);

//设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

//输入文件和输出文件位置设置
File file=new File("E:\\hadjar\\test3out");

delete(file);
FileInputFormat.addInputPath(job, new Path("E:\\hadjar\\in"));
FileOutputFormat.setOutputPath(job, new Path("E:\\hadjar\\test3out"));

job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
private static void delete(File file) {

if (file.isDirectory()) {
for (File f : file.listFiles()) {
delete(f);
f.delete();
}
}
file.delete();

}
public static void main(String[] args) throws Exception{
Configuration conf = new Configuration();
System.setProperty("hadoop.home.dir",
"E:\\hadjar\\hadoop-2.4.1-x64\\hadoop-2.4.1");
int res = ToolRunner.run(new Configuration(), new Test3(), args);
System.exit(res);
}
}


将全体员工按照总收入(工资+提成)从高到低排列

package cn.itcast.hadoop.mr.test;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.jboss.netty.util.internal.StringUtil;

//将全体员工按照总收入(工资+提成)从高到低排列
public class Test9 extends Configured implements Tool{

public static class Map9 extends Mapper<LongWritable, Text, Text, Text>{

@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] arrsMap=value.toString().split(",");
if (arrsMap.length>5) {
//7934,MILLER,CLERK,7782,23-1月-82,1300,,10
if (arrsMap[6].length()>1) {
long allSala=(Long.parseLong(arrsMap[5])+Long.parseLong(arrsMap[6]));
context.write(new Text("0"), new Text(arrsMap[1]+","+allSala+""));
}else{
context.write(new Text("0"), new Text(arrsMap[1]+","+arrsMap[5]));
}

}
}
}

public static class Reducer9 extends Reducer<Text, Text, Text, Text>{
static int reducernum=0;
static	long max=0;
static	long num2=0;
static	long num3=0;
//设置一个map存这些数据
Map<String, Long> showmap=new HashMap<String, Long>();
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {

// 定义工资前三员工姓名
String empName;

// 定义工资前三工资
int empSalary = 0;

List<Integer>  showlist=new ArrayList<Integer> ();

Map namemap=new HashMap();
// 通过冒泡法遍历所有员工,比较员工工资多少,求出前三名
for (Text val : values) {
empName = val.toString().split(",")[0];
empSalary = Integer.parseInt(val.toString().split(",")[1].toString() )  ;
namemap.put(empSalary, empName);
showlist.add(  empSalary);

}

int[] arrs = new int[showlist.size()];
for(int i=0;i<showlist.size();i++)
{
arrs[i] = showlist.get(i);
}
toarryal(arrs);
for (int i = 0; i < arrs.length; i++) {
String name=(String) namemap.get(arrs[i]);

context.write(new Text(name), new Text(arrs[i]+""));
System.out.println("姓名是:"+name+"----"+"工资是"+arrs[i]);
}

/*  for (int i = 0; i < showlist.size(); i++) {  //[ 10,20,5]
for (int j = 1; j < showlist.size(); j++) {
if (Integer.parseInt((String) showlist.get(i))<Integer.parseInt((String) showlist.get(j))) {
showlist
}
if (showlist.get(i)<showlist.get(j)) {
showlist.
}
}
}*/

// 输出工资前三名信息
/*   context.write(new Text( "First employee name:" + firstEmpName), new Text("Salary:"          + firstEmpSalary));
System.out.println( "First employee name:" + firstEmpName + "---"+firstEmpSalary);
context.write(new Text( "Second employee name:" + secondEmpName), new Text("Salary:" + secondEmpSalary));
System.out.println( "First employee name:" + secondEmpName + "---"+secondEmpSalary);
context.write(new Text( "Third employee name:" + thirdEmpName), new Text("Salary:"          + thirdEmpSalary));
System.out.println( "First employee name:" + thirdEmpName + "---"+thirdEmpSalary);
*/
/* for (int i = 0; i < showlist.size(); i++) {
String value=   showlist.get(i).toString();
String[] arrs=value.split("。");
context.write(new Text(arrs[0]),new Text(arrs[1]));
System.out.println(arrs[0]+"------"+arrs[1]);
}*/

}

private void toarryal(int[] arrs) {
int temp=0;
for (int i = 0; i < arrs.length; i++) {
for (int j = i+1; j < arrs.length; j++) {
if (arrs[i]<arrs[j]) {
temp=arrs[i];
arrs[i]=arrs[j];
arrs[j]=temp;

}
}
}

}

}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
System.setProperty("hadoop.home.dir",
"E:\\hadjar\\hadoop-2.4.1-x64\\hadoop-2.4.1");
int res = ToolRunner.run(new Configuration(), new Test9(), args);
System.exit(res);
}

@Override
public int run(String[] arg0) throws Exception {
Job job = Job.getInstance();
job.setJobName("test8");
job.setJarByClass(Test9.class);
job.setMapperClass(Map9.class);
job.setReducerClass(Reducer9.class);

// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);

// 设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
File file = new File("E:\\hadjar\\test9out");
delete(file);
FileInputFormat.addInputPath(job, new Path("E:\\hadjar\\in"));
FileOutputFormat.setOutputPath(job, new Path("E:\\hadjar\\test9out"));

job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}

private static void delete(File file) {

if (file.isDirectory()) {
for (File f : file.listFiles()) {
delete(f);
f.delete();
}
}
file.delete();

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: