您的位置:首页 > 运维架构

Hadoop实战-MapReduce之分组(group-by)统计(七)

2017-05-07 23:34 549 查看
1、数据准备

使用MapReduce计算age.txt中年龄最大、最小、均值
name,min,max,count
Mike,35,20,1
Mike,5,15,2
Mike,20,13,1
Steven,40,20,10
Ken,28,68,1
Ken,14,198,10
Cindy,32,31,100

2、预期结果
Mike 5 20 4
Steven,40,20,10
Ken 14 198 11
Cindy,32,31,100

3、需要加入自定义输出类型MinMaxCountTuple

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class MinMaxCountTuple implements Writable {
private int min;
private int max;
private int count;

public int getMin() {
return min;
}

public void setMin(int min) {
this.min = min;
}

public int getMax() {
return max;
}

public void setMax(int max) {
this.max = max;
}

public int getCount() {
return count;
}

public void setCount(int count) {
this.count = count;
}

public void readFields(DataInput in) throws IOException {
min = in.readInt();
max = in.readInt();
count = in.readInt();
}

public void write(DataOutput out) throws IOException {
out.writeInt(min);
out.writeInt(max);
out.writeInt(count);
}

@Override
public String toString() {
return min + "\t" + max + "\t" + count;
}
}


4、MapReduce编程

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Age {
public static class AgeMap extends
Mapper<Object, Text, Text, MinMaxCountTuple> {

private Text userName = new Text();
private MinMaxCountTuple outTuple = new MinMaxCountTuple();

@Override
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
String content = itr.nextToken();
String[] splits = content.split(",");
String name = splits[0];
int min = Integer.valueOf(splits[1]);
int max = Integer.valueOf(splits[2]);
int count = Integer.valueOf(splits[3]);
outTuple.setMin(min);
outTuple.setMax(max);
outTuple.setCount(count);
userName.set(name);
context.write(userName, outTuple);
}
}
}

public static class AgeReduce extends
Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> {
private MinMaxCountTuple result = new MinMaxCountTuple();

public void reduce(Text key, Iterable<MinMaxCountTuple> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
result.setMax(0);
result.setMin(Integer.MAX_VALUE);
for (MinMaxCountTuple tmp : values) {
if (tmp.getMin() < result.getMin()) {
result.setMin(tmp.getMin());
}
if (tmp.getMax() > result.getMax()) {
result.setMax(tmp.getMax());
}
sum += tmp.getCount();
}
result.setCount(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MinMaxCountDriver <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "StackOverflow Comment Date Min Max Count");
job.setJarByClass(Age.class);
job.setMapperClass(AgeMap.class);
job.setCombinerClass(AgeReduce.class);
job.setReducerClass(AgeReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(MinMaxCountTuple.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: