用spark实现count(distinct fieldname)形式的聚合
2015-03-12 16:47
127 查看
举个例子,比如要统计用户的总访问次数和去除访问同一个URL之后的总访问次数,随便造了几条样例数据(四个字段:id,name,vtm,url,vtm字段本例没用,不用管)如下:
根据这个数据集,我们可以写hql 实现:select id,name, count(0) as ct,count(distinct url) as urlcount from table group by id,name.
得出结果应该是:
id1,user1,4,1
id2,user2,4,2
下面用spark实现这个聚合功能:
((id1,user1),4,1)
((id2,user2),4,2)
与HQL输出一致.
id1,user1,2,http://www.hupu.com id1,user1,2,http://www.hupu.com id1,user1,3,http://www.hupu.com id1,user1,100,http://www.hupu.com id2,user2,2,http://www.hupu.com id2,user2,1,http://www.hupu.com id2,user2,50,http://www.hupu.com id2,user2,2,http://touzhu.hupu.com
根据这个数据集,我们可以写hql 实现:select id,name, count(0) as ct,count(distinct url) as urlcount from table group by id,name.
得出结果应该是:
id1,user1,4,1
id2,user2,4,2
下面用spark实现这个聚合功能:
import org.apache.spark.SparkContext._ import org.apache.spark._ /** * Created by xiaojun on 2015/3/9. */ object SparkDemo2 { def main(args: Array[String]) { case class User(id: String, name: String, vtm: String, url: String) //val rowkey = (new RowKey).evaluate(_) val HADOOP_USER = "hdfs" // 设置访问spark使用的用户名 System.setProperty("user.name", HADOOP_USER); // 设置访问hadoop使用的用户名 System.setProperty("HADOOP_USER_NAME", HADOOP_USER); val conf = new SparkConf().setAppName("wordcount").setMaster("local").setExecutorEnv("HADOOP_USER_NAME", HADOOP_USER) val sc = new SparkContext(conf) val data = sc.textFile("test.txt") val rdd1 = data.map(line => { val r = line.split(",") User(r(0), r(1), r(2), r(3)) }) val rdd2 = rdd1.map(r => ((r.id, r.name), r)) val seqOp = (a: (Int, List[String]), b: User) => a match { case (0, List()) => (1, List(b.url)) case _ => (a._1 + 1, b.url :: a._2) } val combOp = (a: (Int, List[String]), b: (Int, List[String])) => { (a._1 + b._1, a._2 ::: b._2) } println("-----------------------------------------") val rdd3 = rdd2.aggregateByKey((0, List[String]()))(seqOp, combOp).map(a => { (a._1, a._2._1, a._2._2.distinct.length) }) rdd3.collect.foreach(println) println("-----------------------------------------") sc.stop() } }输出结果如下:
((id1,user1),4,1)
((id2,user2),4,2)
与HQL输出一致.
相关文章推荐
- Mongodb聚合函数count、distinct、group如何实现数据聚合操作
- 学习关于聚合FUNCTION(count(*|fieldname))
- Mongodb聚合函数count、distinct、group如何实现数据聚合操作
- Mongodb中数据聚合之基本聚合函数count、distinct、group
- java实现kafka整合spark streaming完成wordCount,updateStateByKey完成实时状态更新
- MongoDB教程之聚合(count、distinct和group)
- java8实现spark wordcount并且按照value排序输出
- SparkSQL如何实现聚合下推
- Spark SQL中的聚合(Aggregate)实现
- 基于Jupyter平台通过python实现Spark的应用程序之wordCount
- Java正则表达式实现${name}形式的字符串模板
- spark graphx实现共同好友的聚合
- Spark:用Scala和Java实现WordCount
- IDEA搭建spark-scala的sbt编辑环境实现WorldCount练习
- MongoDB聚合(count、distinct、group、MapReduce)
- Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率
- 10. 如何在XPages里实现典型的Notes权限控制:@IsMember(@UserNamesList; FieldName)
- lucene中facet实现统计分析的思路——本质上和word count计数无异,像splunk这种层层聚合(先filed1统计,再field2统计,最后field3统计)lucene是排序实现
- java8实现spark streaming的wordcount
- spark【例子】count(distinct 字段) 简易版 使用groupByKey和zip