您的位置:首页 > 大数据

使用MapReduce计算框架统计CDN日志IP数、流量等数据

2017-05-26 15:31 876 查看

写在前面

前面两篇文章中,我们使用Spark RDDSpark Streaming 从CDN日志中计算出了独立IP数、每个视频独立IP数、每时CDN流量,这里我们将使用MapReduce计算框架,从同样的日志中完成相同的计算,前提是要搭建好Hadoop及HDFS运行环境

独立IP计算

计算思路

Map 在map中将每行的IP地址提取出来,并将(ip,1)输出

Reduce 在reduce中相同的ip已经合并,所以只需统计出values的大小就是这个IP的数量了

代码

public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
String ip = lines[0];
context.write(new Text(ip), new IntWritable(1));
}
}

public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable value : values) {
sum += value.get();
}
context.write(key, new IntWritable(sum));
}
}


每个视频独立IP计算

计算思路

map 在map中将每行的视频及IP提出,以(id,ip)输出

Reduce 在reduce中同一视频id被合并,只需统计values中不同IP的数量即可

代码

//从每行日志中获取视频ID
public static IntWritable getVideoId(String line) {
Matcher matcher = Pattern.compile("([\\d]+)\\.mp4").matcher(line);
if (matcher.find()) {
return new IntWritable(Integer.parseInt(matcher.group(1)));
} else {
return new IntWritable(0);
}
}

public static class Map extends Mapper<LongWritable, Text, IntWritable, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] lines = line.split(" ");
String ip = lines[0];
context.write(getVideoId(line), new Text(ip));
}
}

public static class Reduce extends Reducer<IntWritable, Text, IntWritable, IntWritable> {
@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//统计出不同IP数量
Set<Text> ips = new HashSet<Text>();
for (Text value : values) {
ips.add(value);
}
context.write(key, new IntWritable(ips.size()));
}
}


每个小时CDN流量计算

计算思路

map 将每行日志中的小时及请求大小提取出来以(hour,size)输出

Reduce 将每小时的size相加,得出每小时总大小

代码

//提取小时
public static IntWritable getHour(String line) {
Matcher matcher = Pattern.compile("2017:(\\d{2}):").matcher(line);
if (matcher.find()) {
return new IntWritable(Integer.parseInt(matcher.group(1)));
} else {
return new IntWritable(0);
}
}
//提取大小
public static LongWritable getHttpSize(String line) {
Matcher matcher = Pattern.compile(" (200|206|304) ([0-9]+) ").matcher(line);
if (matcher.find()) {
return new LongWritable(Integer.parseInt(matcher.group(2)));
} else {
return new LongWritable(0);
}
}

public static class Map extends Mapper<LongWritable, Text, IntWritable, LongWritable> {

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
context.write(getHour(line), getHttpSize(line));
}
}

public static class Reduce extends Reducer<IntWritable, LongWritable, IntWritable, LongWritable> {
@Override
protected void reduce(IntWritable key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//统计每时cdn总和
Long size = 0L;
for (LongWritable value : values) {
size += value.get();
}
context.write(key, new LongWritable(size));
}
}


源码

http://git.oschina.net/whzhaochao/spark-learning

原文地址

http://blog.csdn.net/whzhaochao/article/details/72770499
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  mapreduce hadoop 大数据