您的位置:首页 > 移动开发

第73课: 基于Spark 2.0.1项目 例如注册之后前10天访问我们的移动App最多的前五个人;注册之后前10天内购买商品总额排名前5为的人

2016-11-05 21:44 447 查看
大数据Spark “蘑菇云”行动第73课: 基于Spark 2.0.1项目实现之三

 

例如注册之后前10天访问我们的移动App最多的前五个人

或者注册之后前10天内购买商品总额排名前5为的人

 

 

package com.dt.spark200

import org.apache.spark.sql.SparkSession

object UserBehaviorsAnalysis {

  case class UserLog(logID: Long, userID: Long, time: String, typed: Int, location: String, consumed: Double)

  case class LogOnce(logID: Long, userID: Long, count: Long)

  case class ConsumOnce(logID: Long, userID: Long, consumed: Double)

  def main(args: Array[String]): Unit = {

    val spark = SparkSession

      .builder()

      .appName("UserBehaviorsAnalysis")

      .master("local")

      .config("spark.sql.warehouse.dir", "file:///G:/IMFBigDataSpark2016/IMFScalaWorkspace_spark200/Spark200/spark-warehouse")

      .getOrCreate()

    import spark.implicits._

    import org.apache.spark.sql.types._

    import org.apache.spark.sql.functions._

    val userLogjson = spark.read.json("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userLog.json")

    val userInfojson = spark.read.json("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userInfo.json")

    //   userLogjson.write.format("parquet").save("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\logparquet")

    // userInfojson.write.format("parquet").save("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userparquet")

    val userInfo = spark.read.format("parquet").parquet("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\userparquet")

    val userLog = spark.read.format("parquet").parquet("G:\\IMFBigDataSpark2016\\tesdata\\spark200\\logparquet")

    //统计今天访问次数最多的top5 例如 2016-11-1-00:00:00  ~2016-11-1-23:59:59

    userLog.show()

    userLog.printSchema()

    val startTime = "2016-11-1 00:00:00"

    val endTime = "2016-11-10 22:52:52"

    userLog.filter("time >= '" + startTime + "' and time <= '" + endTime + "'")

      //   userLog.filter("$'time' >= '" + startTime + "' and $'time' <= '" + endTime +"'")

      .join(userInfo, userInfo("userID") === userLog("userID"))

      .groupBy(userInfo("userID"), userInfo("name"))

      .agg(count(userLog("logID")).alias("userlogCount"))

      .sort($"userlogCount".desc)

      .limit(5)

      .show()

    //作业:生成parquet方式的数据,自己实现时间函数

    //统计今天购买次数最多的  Top5

    userLog.filter("time >= '" + startTime + "' and time <= '" + endTime + "'")

      .join(userInfo, userInfo("userID") === userLog("userID"))

      .groupBy(userInfo("userID"), userInfo("name"))

      .agg(round(sum(userLog("consumed")), 2).alias("totalConsumed"))

      .sort($"totalConsumed".desc)

      .limit(5)

      .show

    //统计特定时间段里访问次数增多最多的TOP5用户,例如这一周比上一周增长最快的5位用户

    //实现思路:计算这周用户的访问次数,同时计算上周用户的访问次数,相减以后排名

    //思路2 join

    //("time >= '2016-10-24' and time <= '2016-10-30'")

    val userLogDS = userLog.as[UserLog].filter("time >= '2016-10-24' and time <= '2016-10-30'")

      //  val userLogDS=userLog.as[UserLog].filter("time >= '2016-11-1 22:00:00' and time <= '2016-11-2 22:00:00'")

      .map(log => LogOnce(log.logID, log.userID, 1))

      .union(userLog.as[UserLog].filter("time >= '2016-10-17' and time <= '2016-10-23'")

        .map(log => LogOnce(log.logID, log.userID, -1)))

    userLogDS.join(userInfo, userLog("userID") === userInfo("userID"))

      .groupBy(userInfo("userID"), userInfo("name"))

      .agg(sum(userLogDS("count")).alias("viewCountIncreased"))

      .sort($"viewCountIncreased".desc)

      .limit(5)

      .show()

    //////////////////

    //购买金额前5的用户

    val userLogConsumerDS = userLog.as[UserLog].filter("time >= '2016-10-24' and time <= '2016-10-30'")

      //  val userLogDS=userLog.as[UserLog].filter("time >= '2016-11-1 22:00:00' and time <= '2016-11-2 22:00:00'")

      .map(log => ConsumOnce(log.logID, log.userID, log.consumed))

      .union(userLog.as[UserLog].filter("time >= '2016-10-17' and time <= '2016-10-23'")

        .map(log => ConsumOnce(log.logID, log.userID, -log.consumed)))

    userLogConsumerDS.join(userInfo, userLogConsumerDS("userID") === userInfo("userID"))

      .groupBy(userInfo("userID"), userInfo("name"))

      .agg(round(sum(userLogConsumerDS("consumed")), 2).alias("viewconsumedIncreased"))

      .sort($"viewconsumedIncreased".desc)

      .limit(5)

      .show()

    ///例如注册之后前10天访问我们的移动App最多的前五个人, userinfo加一个 字段 registeredTime

    userLog.join(userInfo, userLog("userID") === userInfo("userID"))

      .filter(userInfo("registeredTime") >= "2016-11-1"

        && userInfo("registeredTime") <= "2016-11-20"

        && userLog("time") >= userInfo("registeredTime")

        && userLog("time") <= date_add(userInfo("registeredTime"), 10))

      .groupBy(userInfo("userID"), userInfo("name"))

      .agg(round(count(userLog("consumed")),2).alias("totalconsumed"))

      .sort($"totalconsumed".desc)

      .limit(5)

      .show()

    ///或者注册之后前10天内购买商品总额排名前5为的人

    userLog.join(userInfo, userLog("userID") === userInfo("userID"))

      .filter(userInfo("registeredTime") >= "2016-11-1"

        && userInfo("registeredTime") <= "2016-11-20"

        && userLog("time") >= userInfo("registeredTime")

        && userLog("time") <= date_add(userInfo("registeredTime"), 10))

      .groupBy(userInfo("userID"), userInfo("name"))

      .agg(sum(userLog("logID").alias("logTimes")))

      .sort($"logTimes".desc)

      .limit(5)

      .show()

  }

}

 

 

 

 

时间的处理

package com.dt.spark200

import java.text.SimpleDateFormat

import java.sql.Timestamp

import java.sql.Date

import org.apache.spark.sql.catalyst.expressions.CurrentDate

import org.apache.spark.sql.catalyst.util.DateTimeUtils

object UtilsTime {

  def main(args: Array[String]): Unit = {

    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

    val sdfDate = new SimpleDateFormat("yyyy-MM-dd")

    val d = new Date(sdf.parse("2015-04-08 13:10:15").getTime)

    val ts = new Timestamp(sdf.parse("2013-11-08 13:10:15").getTime)

    println(d)

    println(ts)

    val startTime = "2016-11-1 00:00:00"

    val endTime = "2016-11-1 23:59:59"

    val test = "$'time' >= '" + startTime + "' and $'time' <= '" + endTime + "'"

    println(test)

    val imfd = new Date(sdf.parse(endTime).getTime)

    val imfts = new Timestamp(sdf.parse(endTime).getTime)

    println(imfd)

    println(imfts)

    val d0 = DateTimeUtils.millisToDays(System.currentTimeMillis())

    val d1 = DateTimeUtils.millisToDays(System.currentTimeMillis())

    println(d0)

    println(d1)

  }

}

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐