MapReduce编程实例(一)-求平均数
2014-07-01 10:26
393 查看
开始学习写一些MR编程实例,工作中即将使用(刚刚开始,如果有错误和建议,欢迎指出)
现在有一个文件,里面记录了全校所有学生各科成绩,求每个学生的平均成绩,格式如下
小明 语文 92
小明 数学 88
小明 英语 90
小强 语文 76
小强 数学 66
小强 英语 80
小木 语文 60
小木 数学 65
小木 英语 61
解决思路
Map阶段先将数据拆成key:姓名,value:课程_成绩的格式提供给reduce,默认的partitioner会将名字相同的学生发到同一个reduce上面
这样reduce可以根据总分/科目数计算平均成绩。
逻辑比较简单,
代码如下:
[java] view
plaincopyprint?
package com.test.mr2;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/*
* 计算学生课程平均成绩(某学生总分/课程数)
* 输入格式
*
* 小明 语文 92
* 小明 数学 88
* 小明 英语 90
* 小强 语文 76
* 小强 数学 66
* 小强 英语 80
* 小木 语文 60
* 小木 数学 65
* 小木 英语 61
*
* 输出
*
* 小明 90
* 小强 74
* 小木 62
*/
public class Average {
public static class AverMapper extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line, "\n");
String name = "";
StringBuffer out = new StringBuffer(32);
while (stringTokenizer.hasMoreElements()) {
String tmp = stringTokenizer.nextToken();
StringTokenizer st = new StringTokenizer(tmp);
while (st.hasMoreElements()) {
name = st.nextToken();
out.append(st.nextToken());
out.append("_");
out.append(st.nextToken());
// 使用默认的hash partitioner将名字相同的同学发到一个reduce上
context.write(new Text(name), new Text(out.toString()));
}
}
}
}
public static class AverReducer extends
Reducer<Text, Text, Text, FloatWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Iterator<Text> it = values.iterator();
//计算每个key对应的记录条数和总分数
int count = 0;
int sum = 0;
while (it.hasNext()) {
String value = it.next().toString();
String[] strs = value.split("\\_");
if (strs.length < 2) {
continue;
}
try {
sum += Integer.parseInt(strs[1]);
} catch (Exception e) {
System.err.println(e.getMessage());
}
count++;
}
FloatWritable average = new FloatWritable(sum / count);
context.write(key, average);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
System.out.println("Begin.....");
Configuration conf =new Configuration();
String[] arguments=new GenericOptionsParser(conf, args).getRemainingArgs();
if(arguments.length<2){
System.out.println("Usage:com.test.mr2.Average in out");
System.exit(1);
}
Job job=new Job(conf,"Average");
job.setJarByClass(Average.class);
job.setMapperClass(AverMapper.class);
job.setReducerClass(AverReducer.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(arguments[0]));
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
System.exit(job.waitForCompletion(true)?0:1);
System.out.println("End.....");
}
}
现在有一个文件,里面记录了全校所有学生各科成绩,求每个学生的平均成绩,格式如下
小明 语文 92
小明 数学 88
小明 英语 90
小强 语文 76
小强 数学 66
小强 英语 80
小木 语文 60
小木 数学 65
小木 英语 61
解决思路
Map阶段先将数据拆成key:姓名,value:课程_成绩的格式提供给reduce,默认的partitioner会将名字相同的学生发到同一个reduce上面
这样reduce可以根据总分/科目数计算平均成绩。
逻辑比较简单,
代码如下:
[java] view
plaincopyprint?
package com.test.mr2;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
/*
* 计算学生课程平均成绩(某学生总分/课程数)
* 输入格式
*
* 小明 语文 92
* 小明 数学 88
* 小明 英语 90
* 小强 语文 76
* 小强 数学 66
* 小强 英语 80
* 小木 语文 60
* 小木 数学 65
* 小木 英语 61
*
* 输出
*
* 小明 90
* 小强 74
* 小木 62
*/
public class Average {
public static class AverMapper extends Mapper<Object, Text, Text, Text> {
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer stringTokenizer = new StringTokenizer(line, "\n");
String name = "";
StringBuffer out = new StringBuffer(32);
while (stringTokenizer.hasMoreElements()) {
String tmp = stringTokenizer.nextToken();
StringTokenizer st = new StringTokenizer(tmp);
while (st.hasMoreElements()) {
name = st.nextToken();
out.append(st.nextToken());
out.append("_");
out.append(st.nextToken());
// 使用默认的hash partitioner将名字相同的同学发到一个reduce上
context.write(new Text(name), new Text(out.toString()));
}
}
}
}
public static class AverReducer extends
Reducer<Text, Text, Text, FloatWritable> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Iterator<Text> it = values.iterator();
//计算每个key对应的记录条数和总分数
int count = 0;
int sum = 0;
while (it.hasNext()) {
String value = it.next().toString();
String[] strs = value.split("\\_");
if (strs.length < 2) {
continue;
}
try {
sum += Integer.parseInt(strs[1]);
} catch (Exception e) {
System.err.println(e.getMessage());
}
count++;
}
FloatWritable average = new FloatWritable(sum / count);
context.write(key, average);
}
}
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
System.out.println("Begin.....");
Configuration conf =new Configuration();
String[] arguments=new GenericOptionsParser(conf, args).getRemainingArgs();
if(arguments.length<2){
System.out.println("Usage:com.test.mr2.Average in out");
System.exit(1);
}
Job job=new Job(conf,"Average");
job.setJarByClass(Average.class);
job.setMapperClass(AverMapper.class);
job.setReducerClass(AverReducer.class);
job.setMapOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
FileInputFormat.addInputPath(job, new Path(arguments[0]));
FileOutputFormat.setOutputPath(job, new Path(arguments[1]));
System.exit(job.waitForCompletion(true)?0:1);
System.out.println("End.....");
}
}
相关文章推荐
- MapReduce编程实例(一)-求平均数
- MapReduce编程实例(六)
- hadoop中使用MapReduce编程实例(转)
- hadoop中使用MapReduce编程实例(转)
- (三)MapReduce编程实例
- 以实例让你真正明白mapreduce---填空式、分布(分割)编程
- octopy的MapReduce编程实例
- Hadoop编程实例之MapReduce
- MapReduce编程实例(二)-MR2操作MySQL
- mapreduce编程实例(7)-求所有用户ID
- (五)MapReduce编程实例
- MapReduce编程实例
- mapreduce编程实例(1)-统计词频
- hadoop中使用MapReduce编程实例
- 通过实例让你真正明白mapreduce---填空式、分布(分割)编程
- mapreduce编程实例(2)-求最大值和最小值
- hadoop中使用MapReduce编程实例
- MongoDB中MapReduce编程模型使用实例
- mapreduce编程实例(4)-求中位数和标准差
- hadoop中使用MapReduce编程实例(转)