分别用Java和Scala求PV,UV测试程序
2017-08-27 17:23
246 查看
求 PV,UV的程序还是比较简单 ,在mapreduce阶段也有相应的Java代码,其实本质都是一样的,
首先要准备一下要分析的数据,这里就不把数据贴上去了 !
其实流程都是差不多,大致就是读入>>分析>>读出
下面的Spark是运行在Spark集群上,
首先要准备一下要分析的数据,这里就不把数据贴上去了 !
//Java代码 package com.zyf.myhadoop; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class PVmapreduce extends Configured implements Tool { /** * LongWritable,Text文件每一行的偏移量 * @author Administrator * */ public static class WCmapper extends Mapper<LongWritable, Text,Text, IntWritable>{ /** * map阶段,数据是一行一行的读进来的,作为map输入 * map映射 */ private Text mapOutKey = new Text(); private IntWritable mapOutput = new IntWritable(1); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException{ //将每一行内容转换String数组 String[] str = value.toString().split(" "); for( String word : str){ //将每个单词转换成文本类型 mapOutKey.set(word); System.out.println("<"+mapOutKey+"--"+mapOutput+">"); //map输出的<key,vaule> context.write(mapOutKey, mapOutput); } } } public static class WCReducer extends Reducer<Text,IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context contex) throws IOException, InterruptedException { int sum = 0; for(IntWritable value : values){ System.out.println(value.toString()+"\t"); sum += value.get(); } contex.write(key, new IntWritable(sum)); } } /** * 相当于yarn的客户端, * @param args * @return * @throws Exception */ public int run(String[] args) throws Exception { //1.获取hadoop的配置信息 Configuration conf = new Configuration(); //2.生成对应的job Job job = Job.getInstance( conf,this.getClass().getCanonicalName()); // job.setJarByClass(getClass()); // //4. 1设置具体的内容 Path inPath = new Path(args[0]); FileInputFormat.setInputPaths(job, inPath); //4.2设置map阶段 job.setMapperClass(WCmapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //Set the number of reduce tasks for the job job.setNumReduceTasks(3); //4.3设置reduce阶段 job.setReducerClass(WCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //4.4设置输出目录 Path outPath = new Path(args[1]); FileSystem fs = outPath.getFileSystem(conf); //设置输出目录如果存在就自动删除 if(fs.exists(outPath)){ fs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath); //提交job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; } public static void main(String[]args) throws Exception{ Configuration conf = new Configuration(); args = new String[]{ // "hdfs://hadoop.dimensoft:8020/input/xx.txt", // "hdfs://hadoop.dimensoft:8020/output" "E:\\wc.txt", "E:\\mapReduce\\output" }; int status = ToolRunner.run(conf, new PVmapreduce(), args); System.exit(status); } }
其实流程都是差不多,大致就是读入>>分析>>读出
下面的Spark是运行在Spark集群上,
//Scala代码 import org.apache.spark.{SparkContext, SparkConf} /** * Created by Administrator on 2017/8/25. */ object PVAndUV { def main(args: Array[String]) { val inputPath = "hdfs://hadoop.dimensoft:8020/input/2015082818" val outputPath = "hdfs://hadoop.dimensoft:8020/user/zyf/output2" val config = new SparkConf().setAppName("FistSparkApp").setMaster("local[2]") val sc = new SparkContext(config) val trackrdd=sc.textFile(inputPath) trackrdd.count val baserdd=trackrdd.filter(line=>line.length>0).map(line=>{ val arr=line.split("\t") val date=arr(17).substring(0,10) val guid=arr(5) val url=arr(1) (date,guid,url) }).filter(tuple=>tuple._3.length>0) //缓存 baserdd.cache baserdd.count baserdd.take(1) // 求每天的pv val pvrdd=baserdd.map(tuple=>(tuple._1,1)).reduceByKey((a,b)=>(a+b)) pvrdd.collect //求每天的uv val uvrdd=baserdd.map(tuple=>(tuple._1+"_"+tuple._2,1)).distinct.map(tuple=>{ val arr=tuple._1.split("_") (arr(0),1) }).reduceByKey((a,b)=>(a+b)) uvrdd.collect //合并 val pvuvunion=pvrdd.union(uvrdd) pvuvunion.collect pvuvunion.saveAsTextFile(outputPath) //连接 val pvuvjoin=pvrdd.join(uvrdd) pvuvjoin.collect } }
相关文章推荐
- 分别用Java、Scala、spark-shell开发wordcount程序及测试代码
- Java程序基础测试题目
- Java程序基础测试题目
- Java程序基础测试题目
- PV、UV、IP分别是什么意思?
- java程序环境的测试
- JAVA 学习日志 测试抽象类的程序,每天进步/退步一点点,变化很大哦,努力!
- java程序测试时间
- Java PTA上机测试题目小结之程序填空题
- jmeter:对Java程序进行测试
- LoadRunner调用Java程序—性能测试
- Spark实战----(1)使用Scala开发本地测试的Spark WordCount程序
- java application应用程序 使用JDBC和proxool两种方式连接数据库 的测试程序代码
- Linux上测试C及Java程序
- 在测试java程序时,控制台显示的检测数据结果显示不全,怎么办?
- Java程序基础测试题目
- selenium2 java第一个自动化测试程序
- java 使用图片代理程序,解决网站图片防盗链机制(测试百度,QQ空间有效)
- Zookeeper的集群配置和Java测试程序
- 按 字节截取分别以GBK 和 utf-8 编码的 字符串的java程序。