您的位置:首页 > 大数据 > Hadoop

Hadoop学习之路(二十五)MapReduce的API使用(二)

2018-03-24 16:00 381 查看

学生成绩---增强版

数据信息

public class MRScore3 {

public static void main(String[] args) throws Exception {

Configuration conf1 = new Configuration();
Configuration conf2 = new Configuration();

Job job1 = Job.getInstance(conf1);
Job job2 = Job.getInstance(conf2);

job1.setJarByClass(MRScore3.class);
job1.setMapperClass(MRMapper3_1.class);
//job.setReducerClass(ScoreReducer3.class);

job1.setMapOutputKeyClass(IntWritable.class);
job1.setMapOutputValueClass(StudentBean.class);
job1.setOutputKeyClass(IntWritable.class);
job1.setOutputValueClass(StudentBean.class);

job1.setPartitionerClass(CoursePartitioner2.class);

job1.setNumReduceTasks(4);

Path inputPath = new Path("D:\\MR\\hw\\work3\\input");
Path outputPath = new Path("D:\\MR\\hw\\work3\\output_hw3_1");

FileInputFormat.setInputPaths(job1, inputPath);
FileOutputFormat.setOutputPath(job1, outputPath);

job2.setMapperClass(MRMapper3_2.class);
job2.setReducerClass(MRReducer3_2.class);

job2.setMapOutputKeyClass(IntWritable.class);
job2.setMapOutputValueClass(StudentBean.class);
job2.setOutputKeyClass(StudentBean.class);
job2.setOutputValueClass(NullWritable.class);

Path inputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_1");
Path outputPath2 = new Path("D:\\MR\\hw\\work3\\output_hw3_end");

FileInputFormat.setInputPaths(job2, inputPath2);
FileOutputFormat.setOutputPath(job2, outputPath2);

JobControl control = new JobControl("Score3");

ControlledJob aJob = new ControlledJob(job1.getConfiguration());
ControlledJob bJob = new ControlledJob(job2.getConfiguration());

bJob.addDependingJob(aJob);

control.addJob(aJob);
control.addJob(bJob);

Thread thread = new Thread(control);
thread.start();

while(!control.allFinished()) {
thread.sleep(1000);
}
System.exit(0);

}

public static class MRMapper3_1 extends Mapper<LongWritable, Text, IntWritable, StudentBean>{

StudentBean outKey = new StudentBean();
IntWritable outValue = new IntWritable();
List<String> scoreList = new ArrayList<>();

protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {

scoreList.clear();
String[] splits = value.toString().split(",");
long sum = 0;

for(int i=2;i<splits.length;i++) {
scoreList.add(splits[i]);
sum += Long.parseLong(splits[i]);
}

Collections.sort(scoreList);
outValue.set(Integer.parseInt(scoreList.get(scoreList.size()-1)));

double avg = sum * 1.0/(splits.length-2);
outKey.setCourse(splits[0]);
outKey.setName(splits[1]);
outKey.setavgScore(avg);

context.write(outValue, outKey);

};
}

public static class MRMapper3_2 extends Mapper<LongWritable, Text,IntWritable, StudentBean >{

StudentBean outValue = new StudentBean();
IntWritable outKey = new IntWritable();

protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {

String[] splits = value.toString().split("\t");
outKey.set(Integer.parseInt(splits[0]));

outValue.setCourse(splits[1]);
outValue.setName(splits[2]);
outValue.setavgScore(Double.parseDouble(splits[3]));

context.write(outKey, outValue);

};
}

public static class MRReducer3_2 extends Reducer<IntWritable, StudentBean, StudentBean, NullWritable>{

StudentBean outKey = new StudentBean();

@Override
protected void reduce(IntWritable key, Iterable<StudentBean> values,Context context)
throws IOException, InterruptedException {

int length = values.toString().length();

for(StudentBean value : values) {
outKey = value;
}

context.write(outKey, NullWritable.get());

}
}

}
View Code

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: