您的位置:首页 > 其它

MapReduce程序统计UV数量

2018-08-12 21:08 148 查看

UV(Unique Visitor)独立访客,统计1天内访问某站点的用户数(以cookie为依据);访问网站的一台电脑客户端为一个访客。可以理解成访问某网站的电脑的数量。网站判断来访电脑的身份是通过来访电脑的cookies实现的。如果更换了IP后但不清除cookies,再访问相同网站,该网站的统计中UV数是不变的。如果用户不保存cookies访问、清除了cookies或者更换设备访问,计数会加1。00:00-24:00内相同的客户端多次访问只计为1个访客。

一 书写webLogUVMapper类文件

package com.huadian.webloguvs;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WebLogUVMapper extends Mapper <LongWritable, Text, Text, Text>{

private Text        outputKey = new Text(  );
private Text outputValue = new Text(  );
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//分割每一行内容,
String line = value.toString();
String[] items = line.split( "\t" );

/**
* (1)一个有36个字段,如果分割之后,数组的长度小于36,这条数据是脏数据,可以丢弃
* (2)如果URL是空的话,该条记录丢弃 下标1 ,"" ,null,"null"
*  城市下标23
* 输出(城市Id,1)
*/
if(items.length >=36){
if(StringUtils.isBlank( items[5] )){
return;
}
outputKey.set( items[24] );
outputValue.set(items[5]);
context.write( outputKey,outputValue );

}else {
return;
}
}
}

二 书写WebLogUVMapReduce类文件

package com.huadian.webloguvs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WebLogUVMapReduce extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {

//2、创建job
Job job = Job.getInstance( this.getConf(), "WebLogUVMapReduce" );
//设置job运行的主类
job.setJarByClass( WebLogUVMapReduce.class );

//设置Job
//a、input
Path inputPath = new Path( args[0] );
FileInputFormat.setInputPaths( job, inputPath);

//b、map
job.setMapperClass( WebLogUVMapper.class );
job.setMapOutputKeyClass( Text.class );
job.setMapOutputValueClass( Text.class );

job.setNumReduceTasks( 2 );

//c、reduce
job.setReducerClass( WebLogUVReducer.class );
job.setOutputKeyClass( Text.class  );
job.setOutputValueClass( Text.class );

//d、output
Path outputPath = new Path( args[1] );

//如果输出目录存在,先删除
FileSystem hdfs = FileSystem.get( this.getConf() );
if(hdfs.exists(outputPath )){
hdfs.delete( outputPath,true );
}
FileOutputFormat.setOutputPath( job,outputPath );

//第四步,提交job
boolean isSuccess = job.waitForCompletion( true );

return isSuccess?0:1 ;
}

public static void main(String[] args) {
Configuration configuration = new Configuration();
///public static int run(Configuration conf, Tool tool, String[] args)
try {
int  status =  ToolRunner.run( configuration,new WebLogUVMapReduce(),args );
System.exit( status );
} catch (Exception e) {
e.printStackTrace();
}
}
}

三 书写WebLogUVReducer类文件

package com.huadian.webloguvs;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

public class WebLogUVReducer extends Reducer<Text,Text,Text,Text> {
private Text outputValue = new Text(  );
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws InterruptedException, IOException {
//key :城市;  value:<guid1,guid1,guid2,guid3>
Set<Text> set = new HashSet<Text>();
for (Text value:values) {
set.add(value);
}
/*Iterator<Text> iterator = set.iterator();
Text text = null;
while (iterator.hasNext()){
text = iterator.next();
}*/
outputValue.set(String.valueOf(set.size()));
context.write( key,outputValue );
}
}

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: