Hadoop分组统计计算案例
2017-09-15 12:11
387 查看
Hadoop分组统计计算案例
假如现在有一个用户流量使用情况的日志表,需要对用户的上行流量,下行流量和总流量进行统计;同时还要按照号码的前3位不同进行分别输出。日志记录如下:(【2】号码,【8】上行流量,【9】下行流量,中间Tab隔开)
思路:
1、设计一个对象,记录手机号,上行流量,下行流量,总流量。
2、设计分组规则
3、遍历所有行得到每一行的字符串。
4、分割字符串,取出第一个,第八个,第九个数据,封装到一个对象中。
5、对所有对象遍历计算。
设计一个对象存储数据:
package cn.guyouda.hadoop.mapreduce.flowcount.partition; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class FlowBean implements WritableComparable<FlowBean>{ private String phoneNB; private long up_flow; private long d_flow; private long s_flow; //在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数 public FlowBean(){} //为了对象数据的初始化方便,加入一个带参的构造函数 public FlowBean(String phoneNB, long up_flow, long d_flow) { this.phoneNB = phoneNB; this.up_flow = up_flow; this.d_flow = d_flow; this.s_flow = up_flow + d_flow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUp_flow() { return up_flow; } public void setUp_flow(long up_flow) { this.up_flow = up_flow; } public long getD_flow() { return d_flow; } public void setD_flow(long d_flow) { this.d_flow = d_flow; } public long getS_flow() { return s_flow; } public void setS_flow(long s_flow) { this.s_flow = s_flow; } //将对象数据序列化到流中 @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(up_flow); out.writeLong(d_flow); out.writeLong(s_flow); } //从数据流中反序列出对象的数据 //从数据流中读出对象字段时,必须跟序列化时的顺序保持一致 @Override public void readFields(DataInput in) throws IOException { phoneNB = in.readUTF(); up_flow = in.readLong(); d_flow = in.readLong(); s_flow = in.readLong(); } @Override public String toString() { return "Up:" + up_flow + "\t Down:" +d_flow + "\t Total:" + s_flow; } @Override public int compareTo(FlowBean o) { return s_flow>o.getS_flow()?-1:1; } }
设计分组规则:
package cn.guyouda.hadoop.mapreduce.flowcount.partition; import java.util.HashMap; import org.apache.hadoop.mapreduce.Partitioner; /** * 自定义分组机制 * 将电话号码按区域分组统计输出 * @author Youda * * @param <KEY> * @param <VALUE> */ public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{ private static HashMap<String,Integer> areaMap = new HashMap<>(); static{ loadPartition(areaMap); } /* * 模拟分组规则 * 假设取前三位进行分组 * 在正式开发中应该提前准备数据规则并加载入内存中,以后直接去内存里面查询 */ public static void loadPartition(HashMap<String,Integer> map){ map.put("135", 0); map.put("136", 1); map.put("137", 2); map.put("138", 3); map.put("139", 4); } @Override public int getPartition(KEY key, VALUE value, int numPartitions) { //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号 int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3)); return areaCoder; } }
计算:
package cn.guyouda.hadoop.mapreduce.flowcount.partition; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件 * 需要自定义改造两个机制: * 1、改造分组的逻辑,自定义一个partitioner * 2、自定义reduer task的并发任务数 * * @author Youda * */ public class FlowSumByArea { public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { //拿一行数据 String line = value.toString(); //切分成各个字段 String[] fields = StringUtils.split(line, "\t"); //获取需要的字段 String phoneNB = fields[1]; long u_flow = Long.parseLong(fields[7]); long d_flow = Long.parseLong(fields[8]); //封装数据并输出 context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow)); } } public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values,Context context) throws IOException, InterruptedException { long up_flow_counter = 0; long d_flow_counter = 0; for(FlowBean bean: values){ up_flow_counter += bean.getUp_flow(); d_flow_counter += bean.getD_flow(); } context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumByArea.class); job.setMapperClass(FlowSumAreaMapper.class); job.setReducerClass(FlowSumAreaReducer.class); //设置我们自定义的分组逻辑定义 job.setPartitionerClass(AreaPartitioner.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); /* * 设置reduce的任务并发数,应该跟分组的数量保持一致 * 若小于但不等于一会报错 * 若大于,多余的分组没意义 */ job.setNumReduceTasks(6); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); } }
准备工作:
1、将工程打成一个jar文件上传2、准备数据
3、开启hdfs,yarn
运行程序:
查看结果:
在输出文件夹下将按照分组输出6个统计文件查看每个统计文件,恰好和我们设计的规则一致
相关文章推荐
- Hadoop学习笔记—20.网站日志分析项目案例(三)统计分析
- redmine中使用python按人员分组统计问题入excel的自编写案例
- 流量汇总(自定义jar包,在hadoop集群上 统计,排序,分组)之统计
- Hadoop学习笔记—20.网站日志分析项目案例(三)统计分析
- 【案例分享】电力设备生产数据的多层分组统计报表实现
- 一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序
- 【案例分享】电力设备生产数据的多层分组统计报表实现
- Hadoop MapReduce统计手机流量案例学习(结合Partitioner)
- Linq不分组统计、子查询、计算列及索引器的应用
- hadoop入门之利用hadoop来对文档数据归类统计案例wordcount
- Hadoop(4-3)-MapReduce程序案例-统计每一年最高温度
- 【案例分享】电力设备生产数据的多层分组统计报表实现
- Hadoop的计算上下行流量的案例
- 分组统计(平均值计算)
- 【案例分享】电力设备生产数据的多层分组统计报表实现
- MongoDB集成Hadoop进行统计计算
- R之分组计算描述性统计统计量
- spark分组统计及二次排序案例一枚
- Hadoop(4-1)-MapReduce程序案例-统计销售商品数量
- [python]按key1分组后,计算data1,data2的统计信息并附加到原始表格中