使用MapReduce计算框架统计CDN日志IP数、流量等数据
2017-05-26 15:31
876 查看
写在前面
前面两篇文章中,我们使用Spark RDD及Spark Streaming 从CDN日志中计算出了独立IP数、每个视频独立IP数、每时CDN流量,这里我们将使用MapReduce计算框架,从同样的日志中完成相同的计算,前提是要搭建好Hadoop及HDFS运行环境独立IP计算
计算思路
Map 在map中将每行的IP地址提取出来,并将(ip,1)输出Reduce 在reduce中相同的ip已经合并,所以只需统计出values的大小就是这个IP的数量了
代码
public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] lines = line.split(" "); String ip = lines[0]; context.write(new Text(ip), new IntWritable(1)); } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } }
每个视频独立IP计算
计算思路
map 在map中将每行的视频及IP提出,以(id,ip)输出Reduce 在reduce中同一视频id被合并,只需统计values中不同IP的数量即可
代码
//从每行日志中获取视频ID public static IntWritable getVideoId(String line) { Matcher matcher = Pattern.compile("([\\d]+)\\.mp4").matcher(line); if (matcher.find()) { return new IntWritable(Integer.parseInt(matcher.group(1))); } else { return new IntWritable(0); } } public static class Map extends Mapper<LongWritable, Text, IntWritable, Text> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] lines = line.split(" "); String ip = lines[0]; context.write(getVideoId(line), new Text(ip)); } } public static class Reduce extends Reducer<IntWritable, Text, IntWritable, IntWritable> { @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //统计出不同IP数量 Set<Text> ips = new HashSet<Text>(); for (Text value : values) { ips.add(value); } context.write(key, new IntWritable(ips.size())); } }
每个小时CDN流量计算
计算思路
map 将每行日志中的小时及请求大小提取出来以(hour,size)输出Reduce 将每小时的size相加,得出每小时总大小
代码
//提取小时 public static IntWritable getHour(String line) { Matcher matcher = Pattern.compile("2017:(\\d{2}):").matcher(line); if (matcher.find()) { return new IntWritable(Integer.parseInt(matcher.group(1))); } else { return new IntWritable(0); } } //提取大小 public static LongWritable getHttpSize(String line) { Matcher matcher = Pattern.compile(" (200|206|304) ([0-9]+) ").matcher(line); if (matcher.find()) { return new LongWritable(Integer.parseInt(matcher.group(2))); } else { return new LongWritable(0); } } public static class Map extends Mapper<LongWritable, Text, IntWritable, LongWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); context.write(getHour(line), getHttpSize(line)); } } public static class Reduce extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> { @Override protected void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { //统计每时cdn总和 Long size = 0L; for (LongWritable value : values) { size += value.get(); } context.write(key, new LongWritable(size)); } }
源码
http://git.oschina.net/whzhaochao/spark-learning原文地址
http://blog.csdn.net/whzhaochao/article/details/72770499相关文章推荐
- hadoop学习笔记之mapreduce 基于hbase日志数据的最频繁访问ip统计
- 使用SparkSQL 分析日志中IP数、流量等数据
- 什么是PV,什么是UV,什么是IP. 流量统计的各种数据!
- 使用shell脚本分析网站日志统计PV、404、500等数据
- Nginx环境使用CDN加速后网站日志获取真实用户IP地址方法
- SparkSQL结合SparkStreaming,使用SQL完成实时计算中的数据统计
- 使用xml描述的数据周期统计框架
- [python]使用python实现Hadoop MapReduce程序:计算一组数据的均值和方差
- Java使用极小的内存完成对超大数据的去重计数,用于实时计算中统计UV
- 海量数据统计:海量日志提取最常访问IP,最常使用的query
- 使用Matlab从Excel中读取数据并实现回归统计计算
- 使用IPTABLES实现对特定IP,端口流量的精确统计
- (第4篇)hadoop之魂--mapreduce计算框架,让收集的数据产生价值
- 使用Enumerable模块实现简单的测试框架并进行数据统计
- 使用Log日志 计算带宽流量峰值
- (第4篇)hadoop之魂--mapreduce计算框架,让收集的数据产生价值
- MapReduce之一——上网流量数据统计
- hadoop之魂--mapreduce计算框架,让收集的数据产生价值
- 使用MapReduce统计微博关注数据