您的位置:首页 > 其它

用spark实现count(distinct fieldname)形式的聚合

2015-03-12 16:47 127 查看
举个例子,比如要统计用户的总访问次数和去除访问同一个URL之后的总访问次数,随便造了几条样例数据(四个字段:id,name,vtm,url,vtm字段本例没用,不用管)如下:

id1,user1,2,http://www.hupu.com
id1,user1,2,http://www.hupu.com
id1,user1,3,http://www.hupu.com
id1,user1,100,http://www.hupu.com
id2,user2,2,http://www.hupu.com
id2,user2,1,http://www.hupu.com
id2,user2,50,http://www.hupu.com
id2,user2,2,http://touzhu.hupu.com

根据这个数据集,我们可以写hql 实现:select id,name, count(0) as ct,count(distinct url) as urlcount from table group by id,name.

得出结果应该是:

id1,user1,4,1

id2,user2,4,2

下面用spark实现这个聚合功能:

import org.apache.spark.SparkContext._
import org.apache.spark._

/**
* Created by xiaojun on 2015/3/9.
*/
object SparkDemo2 {
def main(args: Array[String]) {

case class User(id: String, name: String, vtm: String, url: String)
//val rowkey = (new RowKey).evaluate(_)
val HADOOP_USER = "hdfs"
// 设置访问spark使用的用户名
System.setProperty("user.name", HADOOP_USER);
// 设置访问hadoop使用的用户名
System.setProperty("HADOOP_USER_NAME", HADOOP_USER);

val conf = new SparkConf().setAppName("wordcount").setMaster("local").setExecutorEnv("HADOOP_USER_NAME", HADOOP_USER)
val sc = new SparkContext(conf)
val data = sc.textFile("test.txt")
val rdd1 = data.map(line => {
val r = line.split(",")
User(r(0), r(1), r(2), r(3))
})
val rdd2 = rdd1.map(r => ((r.id, r.name), r))

val seqOp = (a: (Int, List[String]), b: User) => a match {
case (0, List()) => (1, List(b.url))
case _ => (a._1 + 1, b.url :: a._2)
}

val combOp = (a: (Int, List[String]), b: (Int, List[String])) => {
(a._1 + b._1, a._2 ::: b._2)
}

println("-----------------------------------------")
val rdd3 = rdd2.aggregateByKey((0, List[String]()))(seqOp, combOp).map(a => {
(a._1, a._2._1, a._2._2.distinct.length)
})
rdd3.collect.foreach(println)
println("-----------------------------------------")
sc.stop()
}

}
输出结果如下:

((id1,user1),4,1)

((id2,user2),4,2)

与HQL输出一致.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: