您的位置:首页 > 编程语言 > Java开发

分别用Java和Scala求PV,UV测试程序

2017-08-27 17:23 246 查看
求 PV,UV的程序还是比较简单 ,在mapreduce阶段也有相应的Java代码,其实本质都是一样的,

首先要准备一下要分析的数据,这里就不把数据贴上去了 !

//Java代码
package com.zyf.myhadoop;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class PVmapreduce extends Configured implements Tool {
/**
* LongWritable,Text文件每一行的偏移量
* @author Administrator
*
*/
public static class WCmapper extends Mapper<LongWritable, Text,Text, IntWritable>{
/**
* map阶段,数据是一行一行的读进来的,作为map输入
* map映射
*/

private Text mapOutKey = new Text();
private IntWritable mapOutput = new IntWritable(1);

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException{
//将每一行内容转换String数组
String[] str = value.toString().split(" ");

for( String word : str){
//将每个单词转换成文本类型
mapOutKey.set(word);
System.out.println("<"+mapOutKey+"--"+mapOutput+">");
//map输出的<key,vaule>
context.write(mapOutKey, mapOutput);
}

}
}
public static class WCReducer extends
Reducer<Text,IntWritable, Text, IntWritable> {

@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context contex) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable value : values){
System.out.println(value.toString()+"\t");
sum += value.get();
}

contex.write(key, new IntWritable(sum));
}

}
/**
* 相当于yarn的客户端,
* @param args
* @return
* @throws Exception
*/
public int run(String[] args) throws Exception {
//1.获取hadoop的配置信息
Configuration conf = new Configuration();
//2.生成对应的job
Job job = Job.getInstance(
conf,this.getClass().getCanonicalName());
//
job.setJarByClass(getClass());
//
//4. 1设置具体的内容
Path inPath = new Path(args[0]);
FileInputFormat.setInputPaths(job, inPath);
//4.2设置map阶段
job.setMapperClass(WCmapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//Set the number of reduce tasks for the job
job.setNumReduceTasks(3);

//4.3设置reduce阶段
job.setReducerClass(WCReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//4.4设置输出目录
Path outPath = new Path(args[1]);
FileSystem fs = outPath.getFileSystem(conf);
//设置输出目录如果存在就自动删除
if(fs.exists(outPath)){
fs.delete(outPath, true);

}
FileOutputFormat.setOutputPath(job, outPath);

//提交job
boolean isSuccess =  job.waitForCompletion(true);
return isSuccess ? 0 : 1;

}
public static void main(String[]args) throws Exception{
Configuration conf = new Configuration();
args = new String[]{
//              "hdfs://hadoop.dimensoft:8020/input/xx.txt",
//              "hdfs://hadoop.dimensoft:8020/output"
"E:\\wc.txt",
"E:\\mapReduce\\output"
};
int status = ToolRunner.run(conf,
new PVmapreduce(),
args);

System.exit(status);

}
}


其实流程都是差不多,大致就是读入>>分析>>读出

下面的Spark是运行在Spark集群上,

//Scala代码
import org.apache.spark.{SparkContext, SparkConf}

/**
* Created by Administrator on 2017/8/25.
*/
object PVAndUV {
def main(args: Array[String]) {
val inputPath = "hdfs://hadoop.dimensoft:8020/input/2015082818"
val outputPath = "hdfs://hadoop.dimensoft:8020/user/zyf/output2"

val config = new SparkConf().setAppName("FistSparkApp").setMaster("local[2]")
val sc = new SparkContext(config)
val trackrdd=sc.textFile(inputPath)
trackrdd.count

val baserdd=trackrdd.filter(line=>line.length>0).map(line=>{
val arr=line.split("\t")
val date=arr(17).substring(0,10)
val guid=arr(5)
val url=arr(1)
(date,guid,url)
}).filter(tuple=>tuple._3.length>0)

//缓存
baserdd.cache
baserdd.count
baserdd.take(1)

// 求每天的pv
val pvrdd=baserdd.map(tuple=>(tuple._1,1)).reduceByKey((a,b)=>(a+b))
pvrdd.collect

//求每天的uv
val uvrdd=baserdd.map(tuple=>(tuple._1+"_"+tuple._2,1)).distinct.map(tuple=>{
val arr=tuple._1.split("_")
(arr(0),1)
}).reduceByKey((a,b)=>(a+b))
uvrdd.collect

//合并
val pvuvunion=pvrdd.union(uvrdd)
pvuvunion.collect

pvuvunion.saveAsTextFile(outputPath)

//连接
val pvuvjoin=pvrdd.join(uvrdd)
pvuvjoin.collect

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark