Hadoop实战-MapReduce之分组(group-by)统计(七)
2017-05-07 23:34
549 查看
1、数据准备
使用MapReduce计算age.txt中年龄最大、最小、均值
name,min,max,count
Mike,35,20,1
Mike,5,15,2
Mike,20,13,1
Steven,40,20,10
Ken,28,68,1
Ken,14,198,10
Cindy,32,31,100
2、预期结果
Mike 5 20 4
Steven,40,20,10
Ken 14 198 11
Cindy,32,31,100
3、需要加入自定义输出类型MinMaxCountTuple
4、MapReduce编程
使用MapReduce计算age.txt中年龄最大、最小、均值
name,min,max,count
Mike,35,20,1
Mike,5,15,2
Mike,20,13,1
Steven,40,20,10
Ken,28,68,1
Ken,14,198,10
Cindy,32,31,100
2、预期结果
Mike 5 20 4
Steven,40,20,10
Ken 14 198 11
Cindy,32,31,100
3、需要加入自定义输出类型MinMaxCountTuple
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class MinMaxCountTuple implements Writable { private int min; private int max; private int count; public int getMin() { return min; } public void setMin(int min) { this.min = min; } public int getMax() { return max; } public void setMax(int max) { this.max = max; } public int getCount() { return count; } public void setCount(int count) { this.count = count; } public void readFields(DataInput in) throws IOException { min = in.readInt(); max = in.readInt(); count = in.readInt(); } public void write(DataOutput out) throws IOException { out.writeInt(min); out.writeInt(max); out.writeInt(count); } @Override public String toString() { return min + "\t" + max + "\t" + count; } }
4、MapReduce编程
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class Age { public static class AgeMap extends Mapper<Object, Text, Text, MinMaxCountTuple> { private Text userName = new Text(); private MinMaxCountTuple outTuple = new MinMaxCountTuple(); @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { String content = itr.nextToken(); String[] splits = content.split(","); String name = splits[0]; int min = Integer.valueOf(splits[1]); int max = Integer.valueOf(splits[2]); int count = Integer.valueOf(splits[3]); outTuple.setMin(min); outTuple.setMax(max); outTuple.setCount(count); userName.set(name); context.write(userName, outTuple); } } } public static class AgeReduce extends Reducer<Text, MinMaxCountTuple, Text, MinMaxCountTuple> { private MinMaxCountTuple result = new MinMaxCountTuple(); public void reduce(Text key, Iterable<MinMaxCountTuple> values, Context context) throws IOException, InterruptedException { int sum = 0; result.setMax(0); result.setMin(Integer.MAX_VALUE); for (MinMaxCountTuple tmp : values) { if (tmp.getMin() < result.getMin()) { result.setMin(tmp.getMin()); } if (tmp.getMax() > result.getMax()) { result.setMax(tmp.getMax()); } sum += tmp.getCount(); } result.setCount(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: MinMaxCountDriver <in> <out>"); System.exit(2); } Job job = new Job(conf, "StackOverflow Comment Date Min Max Count"); job.setJarByClass(Age.class); job.setMapperClass(AgeMap.class); job.setCombinerClass(AgeReduce.class); job.setReducerClass(AgeReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(MinMaxCountTuple.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
相关文章推荐
- hadoop基础----hadoop实战(三)-----hadoop运行MapReduce---对单词进行统计--经典的自带例子wordcount
- hadoop基础----hadoop实战(三)-----hadoop运行MapReduce---对单词进行统计--经典的自带例子wordcount
- Hadoop实战-MapReduce之max、min、avg统计(六)
- 分组统计方法:用GroupBy
- 通过GROUP BY grouping sets提升数据分组统计效率
- group by & count分组统计数量
- 分组统计方法:用Group By
- Hadoop/spark安装实战(系列篇4) Hadoop MapReduce词频统计之小试牛刀
- C# List GroupBy 分组统计
- oracle sum case when group by,同时使用,实现分组统计
- mssql分组 ( ROLLUP /CUBE/GROUP BY /)
- hadoop2.x—mapreduce实战和总结
- Spark API编程动手实战-04-以在Spark 1.2版本实现对union、groupByKey、join、reduce、lookup等操作实践
- Sql server Group by 统计数量 order by时间
- MSSQL 分组后取每组第一条(group by order by)
- Hadoop示例程序之单词统计MapReduce
- mysql 分组统计(直播内容状态,带上内容id),count case when group_concat sql
- MSSQL 分组后取每组第一条(group by order by)
- 浅析mongodb的group分组统计
- hadoop基础----hadoop实战(四)-----myeclipse开发MapReduce---myeclipse搭建hadoop开发环境并运行wordcount