第73课: 基于Spark 2.0.1项目 例如注册之后前10天访问我们的移动App最多的前五个人;注册之后前10天内购买商品总额排名前5为的人
2016-11-05 21:44
447 查看
大数据Spark “蘑菇云”行动第73课: 基于Spark 2.0.1项目实现之三
例如注册之后前10天访问我们的移动App最多的前五个人
或者注册之后前10天内购买商品总额排名前5为的人
package com.dt.spark200
import org.apache.spark.sql.SparkSession
object UserBehaviorsAnalysis {
case class UserLog(logID: Long, userID: Long, time: String, typed: Int, location: String, consumed: Double)
case class LogOnce(logID: Long, userID: Long, count: Long)
case class ConsumOnce(logID: Long, userID: Long, consumed: Double)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("UserBehaviorsAnalysis")
.master("local")
.config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFScalaWorkspace_spark200/Spark200/spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val userLogjson = spark.read.json("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userLog.json")
val userInfojson = spark.read.json("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userInfo.json")
// userLogjson.write.format("parquet").save("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\logparquet")
// userInfojson.write.format("parquet").save("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userparquet")
val userInfo = spark.read.format("parquet").parquet("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userparquet")
val userLog = spark.read.format("parquet").parquet("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\logparquet")
//统计今天访问次数最多的top5 例如 2016-11-1-00:00:00 ~2016-11-1-23:59:59
userLog.show()
userLog.printSchema()
val startTime = "2016-11-1 00:00:00"
val endTime = "2016-11-10 22:52:52"
userLog.filter("time >= '" + startTime + "' and time <= '" + endTime + "'")
// userLog.filter("$'time' >= '" + startTime + "' and $'time' <= '" + endTime +"'")
.join(userInfo, userInfo("userID") === userLog("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(count(userLog("logID")).alias("userlogCount"))
.sort($"userlogCount".desc)
.limit(5)
.show()
//作业:生成parquet方式的数据,自己实现时间函数
//统计今天购买次数最多的 Top5
userLog.filter("time >= '" + startTime + "' and time <= '" + endTime + "'")
.join(userInfo, userInfo("userID") === userLog("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(round(sum(userLog("consumed")), 2).alias("totalConsumed"))
.sort($"totalConsumed".desc)
.limit(5)
.show
//统计特定时间段里访问次数增多最多的TOP5用户,例如这一周比上一周增长最快的5位用户
//实现思路:计算这周用户的访问次数,同时计算上周用户的访问次数,相减以后排名
//思路2 join
//("time >= '2016-10-24' and time <= '2016-10-30'")
val userLogDS = userLog.as[UserLog].filter("time >= '2016-10-24' and time <= '2016-10-30'")
// val userLogDS=userLog.as[UserLog].filter("time >= '2016-11-1 22:00:00' and time <= '2016-11-2 22:00:00'")
.map(log => LogOnce(log.logID, log.userID, 1))
.union(userLog.as[UserLog].filter("time >= '2016-10-17' and time <= '2016-10-23'")
.map(log => LogOnce(log.logID, log.userID, -1)))
userLogDS.join(userInfo, userLog("userID") === userInfo("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(sum(userLogDS("count")).alias("viewCountIncreased"))
.sort($"viewCountIncreased".desc)
.limit(5)
.show()
//////////////////
//购买金额前5的用户
val userLogConsumerDS = userLog.as[UserLog].filter("time >= '2016-10-24' and time <= '2016-10-30'")
// val userLogDS=userLog.as[UserLog].filter("time >= '2016-11-1 22:00:00' and time <= '2016-11-2 22:00:00'")
.map(log => ConsumOnce(log.logID, log.userID, log.consumed))
.union(userLog.as[UserLog].filter("time >= '2016-10-17' and time <= '2016-10-23'")
.map(log => ConsumOnce(log.logID, log.userID, -log.consumed)))
userLogConsumerDS.join(userInfo, userLogConsumerDS("userID") === userInfo("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(round(sum(userLogConsumerDS("consumed")), 2).alias("viewconsumedIncreased"))
.sort($"viewconsumedIncreased".desc)
.limit(5)
.show()
///例如注册之后前10天访问我们的移动App最多的前五个人, userinfo加一个 字段 registeredTime
userLog.join(userInfo, userLog("userID") === userInfo("userID"))
.filter(userInfo("registeredTime") >= "2016-11-1"
&& userInfo("registeredTime") <= "2016-11-20"
&& userLog("time") >= userInfo("registeredTime")
&& userLog("time") <= date_add(userInfo("registeredTime"), 10))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(round(count(userLog("consumed")),2).alias("totalconsumed"))
.sort($"totalconsumed".desc)
.limit(5)
.show()
///或者注册之后前10天内购买商品总额排名前5为的人
userLog.join(userInfo, userLog("userID") === userInfo("userID"))
.filter(userInfo("registeredTime") >= "2016-11-1"
&& userInfo("registeredTime") <= "2016-11-20"
&& userLog("time") >= userInfo("registeredTime")
&& userLog("time") <= date_add(userInfo("registeredTime"), 10))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(sum(userLog("logID").alias("logTimes")))
.sort($"logTimes".desc)
.limit(5)
.show()
}
}
时间的处理
package com.dt.spark200
import java.text.SimpleDateFormat
import java.sql.Timestamp
import java.sql.Date
import org.apache.spark.sql.catalyst.expressions.CurrentDate
import org.apache.spark.sql.catalyst.util.DateTimeUtils
object UtilsTime {
def main(args: Array[String]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val sdfDate = new SimpleDateFormat("yyyy-MM-dd")
val d = new Date(sdf.parse("2015-04-08 13:10:15").getTime)
val ts = new Timestamp(sdf.parse("2013-11-08 13:10:15").getTime)
println(d)
println(ts)
val startTime = "2016-11-1 00:00:00"
val endTime = "2016-11-1 23:59:59"
val test = "$'time' >= '" + startTime + "' and $'time' <= '" + endTime + "'"
println(test)
val imfd = new Date(sdf.parse(endTime).getTime)
val imfts = new Timestamp(sdf.parse(endTime).getTime)
println(imfd)
println(imfts)
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis())
println(d0)
println(d1)
}
}
例如注册之后前10天访问我们的移动App最多的前五个人
或者注册之后前10天内购买商品总额排名前5为的人
package com.dt.spark200
import org.apache.spark.sql.SparkSession
object UserBehaviorsAnalysis {
case class UserLog(logID: Long, userID: Long, time: String, typed: Int, location: String, consumed: Double)
case class LogOnce(logID: Long, userID: Long, count: Long)
case class ConsumOnce(logID: Long, userID: Long, consumed: Double)
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("UserBehaviorsAnalysis")
.master("local")
.config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFScalaWorkspace_spark200/Spark200/spark-warehouse")
.getOrCreate()
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val userLogjson = spark.read.json("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userLog.json")
val userInfojson = spark.read.json("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userInfo.json")
// userLogjson.write.format("parquet").save("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\logparquet")
// userInfojson.write.format("parquet").save("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userparquet")
val userInfo = spark.read.format("parquet").parquet("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userparquet")
val userLog = spark.read.format("parquet").parquet("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\logparquet")
//统计今天访问次数最多的top5 例如 2016-11-1-00:00:00 ~2016-11-1-23:59:59
userLog.show()
userLog.printSchema()
val startTime = "2016-11-1 00:00:00"
val endTime = "2016-11-10 22:52:52"
userLog.filter("time >= '" + startTime + "' and time <= '" + endTime + "'")
// userLog.filter("$'time' >= '" + startTime + "' and $'time' <= '" + endTime +"'")
.join(userInfo, userInfo("userID") === userLog("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(count(userLog("logID")).alias("userlogCount"))
.sort($"userlogCount".desc)
.limit(5)
.show()
//作业:生成parquet方式的数据,自己实现时间函数
//统计今天购买次数最多的 Top5
userLog.filter("time >= '" + startTime + "' and time <= '" + endTime + "'")
.join(userInfo, userInfo("userID") === userLog("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(round(sum(userLog("consumed")), 2).alias("totalConsumed"))
.sort($"totalConsumed".desc)
.limit(5)
.show
//统计特定时间段里访问次数增多最多的TOP5用户,例如这一周比上一周增长最快的5位用户
//实现思路:计算这周用户的访问次数,同时计算上周用户的访问次数,相减以后排名
//思路2 join
//("time >= '2016-10-24' and time <= '2016-10-30'")
val userLogDS = userLog.as[UserLog].filter("time >= '2016-10-24' and time <= '2016-10-30'")
// val userLogDS=userLog.as[UserLog].filter("time >= '2016-11-1 22:00:00' and time <= '2016-11-2 22:00:00'")
.map(log => LogOnce(log.logID, log.userID, 1))
.union(userLog.as[UserLog].filter("time >= '2016-10-17' and time <= '2016-10-23'")
.map(log => LogOnce(log.logID, log.userID, -1)))
userLogDS.join(userInfo, userLog("userID") === userInfo("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(sum(userLogDS("count")).alias("viewCountIncreased"))
.sort($"viewCountIncreased".desc)
.limit(5)
.show()
//////////////////
//购买金额前5的用户
val userLogConsumerDS = userLog.as[UserLog].filter("time >= '2016-10-24' and time <= '2016-10-30'")
// val userLogDS=userLog.as[UserLog].filter("time >= '2016-11-1 22:00:00' and time <= '2016-11-2 22:00:00'")
.map(log => ConsumOnce(log.logID, log.userID, log.consumed))
.union(userLog.as[UserLog].filter("time >= '2016-10-17' and time <= '2016-10-23'")
.map(log => ConsumOnce(log.logID, log.userID, -log.consumed)))
userLogConsumerDS.join(userInfo, userLogConsumerDS("userID") === userInfo("userID"))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(round(sum(userLogConsumerDS("consumed")), 2).alias("viewconsumedIncreased"))
.sort($"viewconsumedIncreased".desc)
.limit(5)
.show()
///例如注册之后前10天访问我们的移动App最多的前五个人, userinfo加一个 字段 registeredTime
userLog.join(userInfo, userLog("userID") === userInfo("userID"))
.filter(userInfo("registeredTime") >= "2016-11-1"
&& userInfo("registeredTime") <= "2016-11-20"
&& userLog("time") >= userInfo("registeredTime")
&& userLog("time") <= date_add(userInfo("registeredTime"), 10))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(round(count(userLog("consumed")),2).alias("totalconsumed"))
.sort($"totalconsumed".desc)
.limit(5)
.show()
///或者注册之后前10天内购买商品总额排名前5为的人
userLog.join(userInfo, userLog("userID") === userInfo("userID"))
.filter(userInfo("registeredTime") >= "2016-11-1"
&& userInfo("registeredTime") <= "2016-11-20"
&& userLog("time") >= userInfo("registeredTime")
&& userLog("time") <= date_add(userInfo("registeredTime"), 10))
.groupBy(userInfo("userID"), userInfo("name"))
.agg(sum(userLog("logID").alias("logTimes")))
.sort($"logTimes".desc)
.limit(5)
.show()
}
}
时间的处理
package com.dt.spark200
import java.text.SimpleDateFormat
import java.sql.Timestamp
import java.sql.Date
import org.apache.spark.sql.catalyst.expressions.CurrentDate
import org.apache.spark.sql.catalyst.util.DateTimeUtils
object UtilsTime {
def main(args: Array[String]): Unit = {
val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val sdfDate = new SimpleDateFormat("yyyy-MM-dd")
val d = new Date(sdf.parse("2015-04-08 13:10:15").getTime)
val ts = new Timestamp(sdf.parse("2013-11-08 13:10:15").getTime)
println(d)
println(ts)
val startTime = "2016-11-1 00:00:00"
val endTime = "2016-11-1 23:59:59"
val test = "$'time' >= '" + startTime + "' and $'time' <= '" + endTime + "'"
println(test)
val imfd = new Date(sdf.parse(endTime).getTime)
val imfts = new Timestamp(sdf.parse(endTime).getTime)
println(imfd)
println(imfts)
val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())
val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis())
println(d0)
println(d1)
}
}
相关文章推荐
- 【备忘】2017Spark 2.0大型项目实战:移动电商app交互式数据分析
- Spark 2.0大型项目实战:移动电商app交互式数分析平台
- 第74课:基于spark 2.0.1项目测试与分析
- Spark 2.0大型项目实战:移动电商app交互式数据分析平台(大数据高端课程) 下载
- 基于Ionic2 的移动app开发<2>(从项目的新建到打包)
- 大数据Spark “蘑菇云”行动第72课: 基于Spark 2.0.1项目实现之二. 实战 各种小bug修复及性能调优 200并行度调整为2个task
- 大数据Spark “蘑菇云”行动第71课: 基于Spark 2.0.1项目开发分析与实战
- Sharepoint 2010:基于当前用户判断访问列表项目的权限 --Determine access to SPListItem based on a Current User
- 星火杯项目笔记——iOS调用有道API翻译句子、在AppDelegate里实现全局访问的sqlite实例【13-10-14】
- 分享我们项目中基于EF事务机制的架构
- 滴滴打车,打车软件app实现。小车在地图上平滑移动的实现,Android,基于高德地图开发实现
- 最近做一个安卓的手机APP项目涉及到手机号注册登录,怎么实现
- 2017年8月大数据-基于Spark的机器学习-智能客户系统项目实战
- [Phonegap+Sencha Touch] 移动开发24 打包wp8.1的App,执行时输入框聚焦弹出软键盘之后,界面上移而不恢复原位的解决的方法
- 基于HTML5移动app开发教程一
- Android下基于Iptables的一种app网络访问控制方案(二)
- 基于Server-Sent Event的简单聊天室 Web 2.0时代,即时通信已经成为必不可少的网站功能,那实现Web即时通信的机制有哪些呢?在这门项目课中我们将一一介绍。最后我们将会实现一个基于Server-Sent Event和Flask简单的在线聊天室。
- 记录第一个小项目,基于django的登录注册页面
- 基于Android移动终端的搜索客户端应用【团队项目】
- 基于Spark的移动用户主要活动地点的挖掘算法实现以及JavaEE技术整合