您的位置:首页 > 其它

使用Spark计算PV、UV

2016-07-24 22:52 375 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。

日志字段格式:

id,ip,url,ref,cookie,time_stamp

把日志文件放到HDFS。仅取了1000行。

[plain] view
plain copy

 





hadoop fs -put 1000_log hdfs://localhost:9000/user/root/input  

直接在Scala Shell中读取文件并计算PV。

[plain] view
plain copy

 





scala> val textFile = sc.textFile("hdfs://localhost:9000/user/root/input/1000_log")  

scala> val textRDD = textFile.map(_.split("\t")).filter(_.length == 6)  

scala> val result = textRDD.map(w => ((new java.net.URL(w(2))).getHost,1)).reduceByKey(_ + _).map(item => item.swap).sortByKey(false).map(item => item.swap)  

scala> result.saveAsTextFile("hdfs://localhost:9000/user/root/out8.txt")  

从HDFS上取回结果:

[plain] view
plain copy

 





hadoop fs -get hdfs://localhost:9000/user/root/out8.txt  

查看结果:

[plain] view
plain copy

 





$ more out8.txt/part-00000   

(www.j1.com,126)  

(tieba.baidu.com,65)  

(113.105.174.5,60)  

(www.baidu.com,54)  

(mp.weixin.qq.com,46)  

(119.147.106.188,40)  

(bbs.caoav.net,31)  

(i.ifeng.com,19)  

(www.amazon.de,18)  

(m.zhiyoula.com,18)  

(www.360doc.com,16)  

(br.pps.tv.iqiyi.com,14)  

(www.1905.com,14)  

(xa.btfs.ftn.qq.com,14)  

如果是生成.snappy压缩格式的文件,则可以按如下方法重定向到本地文本文件。
hadoop fs -text part-r-00001.snappy > filename.txt

下面对同一日志文件计算UV(Unique View)。

UV一般认为不同cookie的访问则算不同的独立用户。

[plain] view
plain copy

 





package org.asiainfo  

  

import org.apache.spark.SparkConf  

import org.apache.spark.SparkContext  

import org.apache.spark.SparkContext._  

  

/**  

 * @author:zhaohf@asiainfo.com  

 * @date:2015年1月27日 下午5:54:39  

 * @Description: TODO  

 */  

object UniqueViewCount {  

  

  def main(args: Array[String]): Unit = {  

    if(args.length < 4){  

      System.err.println("Usage:<input_file> <url_column_index> <output_file>")  

      System.exit(1)  

    }  

    val conf = new SparkConf().setAppName("UniqueViewApp")  

    val sc = new SparkContext(conf)  

    val url_index = args(1).toInt  

    val cookie_index = args(2).toInt  

    val textRDD = sc.textFile(args(0))  

        .map(_.split("\t"))  

        .map(line => (new java.net.URL(line(url_index)).getHost) + "\t" + line(cookie_index))  

        .distinct()  

        .map(line => (line.split("\t")(0),1))  

        .reduceByKey(_ + _)  

        .map(item => item.swap)  

        .sortByKey(false)  

        .map(item => item.swap)  

    textRDD.saveAsTextFile(args(3))  

  }  

  

}  

sbt package 编译打包。

生成jar文件,提交spark应用。

[plain] view
plain copy

 





spark-submit --class main.UniqueViewCount target/scala-2.11/spark_2.11-1.0.jar hdfs://localhost:9000/user/root/intput/1000_log 2 4 hdfs://localhost:9000/user/root/output  

结果:

[plain] view
plain copy

 





more result.txt  

(bbs.caoav.net,31)  

(www.baidu.com,28)  

(www.amazon.de,15)  

(m.zhiyoula.com,15)  

(www.360doc.com,14)  

(m.sohu.com,11)  

(mp.weixin.qq.com,11)  

(www.kaixin001.com,10)  

(www.zhiyoula.com,7)  

(lolbox.duowan.com,7)  

下面用shell来验证正确性:

先用python解析出url中的host:

[python] view
plain copy

 





#!/usr/bin/python  

from urlparse import urlparse  

import sys  

with open(sys.argv[1],'r') as f:  

        for line in f.readlines():  

                splits = line.split('\t')  

                url,cookie = urlparse(splits[2]).netloc,splits[4]  

                print url + '\t' + cookie  

[plain] view
plain copy

 





$ python check.py 1000_log > 1000_log_pre  

[plain] view
plain copy

 





$ cat 1000_log_pre | sort | uniq | awk -F '\t' '{print $1}' | sort | uniq -c | sort -nr -k1|  head  

    31 bbs.caoav.net  

    28 www.baidu.com  

    15 www.amazon.de  

    15 m.zhiyoula.com  

    14 www.360doc.com  

    11 m.sohu.com  

    11 mp.weixin.qq.com  

    10 www.kaixin001.com  

     7 www.zhiyoula.com  



结果正确!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: