您的位置:首页 > 运维架构

Hadoop分组统计计算案例

2017-09-15 12:11 387 查看

Hadoop分组统计计算案例

假如现在有一个用户流量使用情况的日志表,需要对用户的上行流量,下行流量和总流量进行统计;同时还要按照号码的前3位不同进行分别输出。

日志记录如下:(【2】号码,【8】上行流量,【9】下行流量,中间Tab隔开)



思路:

1、设计一个对象,记录手机号,上行流量,下行流量,总流量。

2、设计分组规则

3、遍历所有行得到每一行的字符串。

4、分割字符串,取出第一个,第八个,第九个数据,封装到一个对象中。

5、对所有对象遍历计算。

设计一个对象存储数据:

package cn.guyouda.hadoop.mapreduce.flowcount.partition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{

private String phoneNB;
private long up_flow;
private long d_flow;
private long s_flow;

//在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
public FlowBean(){}

//为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNB, long up_flow, long d_flow) {
this.phoneNB = phoneNB;
this.up_flow = up_flow;
this.d_flow = d_flow;
this.s_flow = up_flow + d_flow;
}

public String getPhoneNB() {
return phoneNB;
}

public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}

public long getUp_flow() {
return up_flow;
}

public void setUp_flow(long up_flow) {
this.up_flow = up_flow;
}

public long getD_flow() {
return d_flow;
}

public void setD_flow(long d_flow) {
this.d_flow = d_flow;
}

public long getS_flow() {
return s_flow;
}

public void setS_flow(long s_flow) {
this.s_flow = s_flow;
}

//将对象数据序列化到流中
@Override
public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);
out.writeLong(up_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);

}

//从数据流中反序列出对象的数据
//从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
@Override
public void readFields(DataInput in) throws IOException {

phoneNB = in.readUTF();
up_flow = in.readLong();
d_flow = in.readLong();
s_flow = in.readLong();

}

@Override
public String toString() {

return "Up:" + up_flow + "\t Down:" +d_flow + "\t Total:" + s_flow;
}

@Override
public int compareTo(FlowBean o) {
return s_flow>o.getS_flow()?-1:1;
}

}

设计分组规则:

package cn.guyouda.hadoop.mapreduce.flowcount.partition;

import java.util.HashMap;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* 自定义分组机制
* 将电话号码按区域分组统计输出
* @author Youda
*
* @param <KEY>
* @param <VALUE>
*/
public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{

private static HashMap<String,Integer> areaMap = new HashMap<>();

static{
loadPartition(areaMap);
}

/*
* 模拟分组规则
* 假设取前三位进行分组
* 在正式开发中应该提前准备数据规则并加载入内存中,以后直接去内存里面查询
*/
public static void loadPartition(HashMap<String,Integer> map){
map.put("135", 0);
map.put("136", 1);
map.put("137", 2);
map.put("138", 3);
map.put("139", 4);
}

@Override
public int getPartition(KEY key, VALUE value, int numPartitions) {
//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号

int areaCoder  = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));

return areaCoder;
}

}

计算:

package cn.guyouda.hadoop.mapreduce.flowcount.partition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
* 需要自定义改造两个机制:
* 1、改造分组的逻辑,自定义一个partitioner
* 2、自定义reduer task的并发任务数
*
* @author Youda
*
*/
public class FlowSumByArea {

public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {

//拿一行数据
String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line, "\t");

//获取需要的字段
String phoneNB = fields[1];
long u_flow = Long.parseLong(fields[7]);
long d_flow = Long.parseLong(fields[8]);

//封装数据并输出
context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

}

}

public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

@Override
protected void reduce(Text key, Iterable<FlowBean> values,Context context)
throws IOException, InterruptedException {

long up_flow_counter = 0;
long d_flow_counter = 0;

for(FlowBean bean: values){

up_flow_counter += bean.getUp_flow();
d_flow_counter += bean.getD_flow();
}

context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(FlowSumByArea.class);

job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);

//设置我们自定义的分组逻辑定义
job.setPartitionerClass(AreaPartitioner.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

/*
* 设置reduce的任务并发数,应该跟分组的数量保持一致
* 若小于但不等于一会报错
* 若大于,多余的分组没意义
*/
job.setNumReduceTasks(6);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}

准备工作:

1、将工程打成一个jar文件上传
2、准备数据
3、开启hdfs,yarn

运行程序:



查看结果:

在输出文件夹下将按照分组输出6个统计文件



查看每个统计文件,恰好和我们设计的规则一致

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息