您的位置:首页 > 编程语言

MapReduce编程基础(二)——数值概要(计算最大值、最小值、平均值)

2017-07-22 21:42 507 查看

数值概要

数值概要模式是计算数据集聚合统计的一般性模式

适用场景:

要处理的数据数值或者计数

数据可以按某些特定的字段分组

数值概要的应用:

单词计数

记录计数

最大/最小值计数

平均值/中位数/标准差

话不多说,现在直接开始我们的第一个示例,最大值、最小值计数示例

最大值、最小值计数示例

1.数据集:

本示例使用Movielens数据集中的u1.base文件,MovieLens数据集是一个用户对电影的评分数据集,在后续的示例中我们将一直使用这个数据集,我会将这个数据集上传到CSDN方便大家下载,文件的格式如下所示:

第1列到第4列分别代表用户ID,项目ID,用户对项目的评分时间戳

1   1   5   874965758
1   2   3   876893171
1   3   4   878542960
1   4   3   876893119
1   5   3   889751712
1   7   4   875071561
1   8   1   875072484
...      ....    ....
943 1067    2   875501756
943 1074    4   888640250
943 1188    3   888640250
943 1228    3   888640275
943 1330    3   888692465


2.程序示例

问题:对于给定的用户项目评分数据,确定每个用户评分的最大值、最小值、该用户的评分次数及该用户的平均评分。

1)自定义Writable类型存储输出数据

注意:实现Writable接口必须实现readFields(DataInput in)和write(DataOutput out)方法。定义toString()方法,才能正确解析输出。

package mapreduce.design.parrerns;

import java.io.*;

import org.apache.hadoop.io.Writable;

/*
* 一个定制的Writable对象类,便于定制特定个数的输入和输出
* Writable接口有两个方法,Write和readFile
* */
public class MinMaxCountTuple implements Writable{
private int min=0;//用户评过的最低分
private int max=0;//用户评过的最高分
private double average=0;//用户的评分
private long count=0;//用户的评分数

public int getMin(){
return min;
}

public void setMin(int min){
this.min=min;
}

public int getMax(){
return max;
}

public void setMax(int max){
this.max=max;
}

public double getAverage(){
return average;
}

public void setAverage(double average){
this.average=average;
}

public long getCount(){
return count;
}

public void setCount(long count){
this.count=count;
}

@Override
public void readFields(DataInput in) throws IOException{
//返回writable实例的反序列化流,注意:字段的顺序和write()方法相同
min=in.readInt();
max=in.readInt();
count=in.readLong();
average=in.readDouble();
}
@Override
public void write(DataOutput out) throws IOException{
//返回writable实例的反序列化流,注意:字段的顺序和write()方法相同
out.writeInt(this.min);
out.writeInt(this.max);
out.writeLong(this.count);
out.writeDouble(this.average);
}

//定义toString在输出时才能正确解析
public String toString(){
return "min:"+min+" "+"max:"+max+"  "+"count:"+count+"  "+"average:"+average;
}
}


2)mapper代码

输出键是用户ID,输出值是最大评分、最小评分及用户评分数。这三个字段存贮在MinMaxCountTuple类型的Writable对象中。在map端,我们将最大评分、最小评分都设置成了当前读入数据中的评分值。计数值为1,表示该用户评分了一次,最后所有的计数将在reduce端进行汇总。

public static class MinMaxCountMapper extends Mapper<Object, Text, Text, MinMaxCountTuple>{
private Text outUserId=new Text();//输出健
private MinMaxCountTuple outTuple=new MinMaxCountTuple();//输出值,为自定义类型

public void map(Object key, Text value, Context context) throws IOException,InterruptedException{
String[] data=value.toString().split("  ");
if(data.length==4){
//输出的键
outUserId=new Text(data[0]);
//输出的值,自定义Writable类型
outTuple.setMin(Integer.valueOf(data[2]));//最小评分
outTuple.setMax(Integer.valueOf(data[2]));//最大评分
outTuple.setAverage(Double.valueOf(data[2]));//平均评分
outTuple.setCount(1);//评分数目计数
}
context.write(outUserId, outTuple);
}
}


3)reducer代码

reducer遍历(一个键对应的)所有值得到其最小评分、最大评分并计算次数的总和。

public static class MinMaxCountReducer extends Reducer<Text, MinMaxCountTuple, Text,MinMaxCountTuple>{
private MinMaxCountTuple result=new MinMaxCountTuple();

public void reduce(Text key, Iterable<MinMaxCountTuple> values, Context context) throws IOException,InterruptedException{
//初始化
result.setMin(100);
result.setMax(0);
result.setAverage(0.0);
result.setCount(0);

double average=0;
double sumNew=0;
int countNew=0;

//迭代循环这个键值的所有输入
for(MinMaxCountTuple val:values){

if(result.getMax()<val.getMax()){
result.setMax(val.getMax());
}
if(result.getMin()>val.getMin()){
result.setMin(val.getMin());
}
//注意sumNew+=平均评分*评分次数
sumNew+=val.getAverage()*val.getCount();//评分和
countNew+=val.getCount();//评分次数
}
result.setCount(countNew);

average=sumNew/countNew;
result.setAverage(average);

context.write(key, result);
}

}


4)combiner优化

本示例为了直接使用reducer作为combiner进行了特殊处理,本来计算最大评分,最小评分,和统计用户评分数是满足交换律和结合律的,意思就是可以任意的改变值的顺序,并且随意的将计算进行分组但是不会该别最终的计算结果,那么就可以使用combiner。

但是计算平均值是不满足交换率和结合律的,因为如果每个combiner对本地的map输出计算平均值,再将各自计算的到的平均值发送到reduce进行平均值计算,得到的结果会是错误的。即,平均值的平均值与真实值的平均值不同。

因此,为了避免以上错误,同时利用combiner来优化计算,在计算总评分的时候使得:

评分总和+=平均评分*评分次数

这样就可满足交换律和结合律,可以使用combiner对本地map的输出进行计算。

总结

combiner的使用:只有在map输出数据的处理满足交换律和结合律的情况下才能使用combiner。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce 编程
相关文章推荐