您的位置:首页 > 运维架构

《Spark商业案例与性能调优实战100课》第14课:商业案例之纯粹通过DataSet进行电商交互式分析系统中特定时段购买金额Top10 和访问次数增长Top10

2017-01-22 20:54 1036 查看
《Spark商业案例与性能调优实战100课》第14课:商业案例之纯粹通过DataSet进行电商交互式分析系统中特定时段购买金额Top10
和访问次数增长Top10

 

结合生产实际,json格式换成parquet格式,数据有问题,改回json

val userInfo=spark.read.format("parquet").parquet("parquet file's path ...")

val userAccessLog=spark.read.format("parquet").parquet("parquet file's path ...")

val userInfo=spark.read.format("json").json("json file's path ...")

val userAccessLog=spark.read.format("json").json("json file's path ...")

val userInfo=spark.read.json("json file's path ...")

val userAccessLog=spark.read.json("json file's path ...")

//检查数据

usersInfo.select("time").show()

usersInfo.show()

//检查schema

userInfo.printSchema()

userAccessLog.printSchema()

userAccessLog.filter("time >= 2017-1-1 and time <=2017-1-10")

.join(userInfo,userAccessLog("UserID")===userInfo("UserID"))

.groupby(usersInfo("UserID"),usersInfo("name"))

.agg(count(userAccessLog("time")).alias("userCount"))

.sort($"usercount".desc)

.limit(10)

.show()

功能二:特定时段购买金额Top10

userAccessLog.filter("time >= 2017-1-1 and time <=2017-1-10")

.join(userInfo,userAccessLog("UserID")===userInfo("UserID"))

.groupby(usersInfo("UserID"),usersInfo("name"))

.agg(round(sum(userAccessLog("consumed")),2).alias("totalCount"))

.sort($"totalCount".desc)

.limit(10)

.show()

功能3:访问次数增长Top10

case class UserLog(logID:Long,userID:Long,time:String,typed:Long,consumed:Double )

case class LogOnce(logID:Long,userID:Long,count:Long)

//Row转换为dataset

val userAccessTemp= userAccessLog.as[userLog].filter("time >= 2017-1-1 and time <=2017-1-10 and typed =0")

.map(log =>LogOnce(log.logID,log.userID,1))

.union(userAccessLog.as[userLog].filter("time >= 2017-1-20 and time <=2017-1-30 and typed =0")

.map(log =>LogOnce(log.logID,log.userID,-1)))

// 这里有个小技巧,两个时间段分别统计,count计数一个为正数,一个为负数,然后通过agg进行求和,就能统计出

后一个时间段比前一个时间段访问次数的增长数

userAccessTemp.join(userInfo,userAccessTemp("userID")===usersInfo("userID"))

.groupBy("userID"),userInfo("name"))

.agg(sum(userAccessTemp("count")).alias("viewIncreasedTmp"))

.sort("userAccessTemp".desc)

.limit(10)

.show()









内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐