您的位置:首页 > 大数据

大数据IMF传奇行动绝密课程第97课:使用SparkStreaming+SparkSQL实现在线动态计算出特定时间窗口

2017-04-03 21:05 1016 查看

使用SparkStreaming+SparkSQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名

1、Streaming+SQL技术实现解析

2、Streaming+SQL实现实战

启动hive metastore

hive --service metastore &


package com.tom.spark.sparkstreaming

import org.apache.spark.{SparkConf, rdd}
import org.apache.spark.sql.Row
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.streaming.{Durations, Seconds, StreamingContext}

/**
* 使用SparkStreaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机
* 电视这个类别下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义
*
* 实现技术:SparkStreaming+Spark SQL,之所以Spark Streaming能够使用ML、SQL、graphX等功能是因为有foreachRDD和transform
* 等接口,这些接口中其实是基于RDD进行操作的,所以以RDD为基石,就可以直接使用Spark其他所有的功能,就像直接调用API一样简单。
* 假设说这里的数据的格式:user item category,例如Rocky Samsung Android
*/
object OnlineTop3ItemForEachCategory2DB {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("OnlineTop3ItemForEachCategory2DB").setMaster("local[2]")
//此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔
// 一定是该Batch Interval的整数倍
val ssc = new StreamingContext(conf, Durations.seconds(5))
ssc.checkpoint("/root/Documents/sparkApps/checkpoint")
val userClickLogDStream = ssc.socketTextStream("Master", 9999)
//用户搜索的格式简化为name item,在这里我们由于要计算出热点内容,所以只需要提取出item即可,
//提取出的item然后通过map转换为(item, 1)格式
val formattedUserClickLogsDStream = userClickLogDStream.filter(_.split(" ").length == 3).map(clickLog => {
(clickLog.split(" ")(2) + "_" + clickLog.split(" ")(2), 1)
})
val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_, _-_, Seconds(60), Seconds(20))
categoryUserClickLogsDStream.foreachRDD( rdd => {
if(rdd.isEmpty()){
println("No data inputted!!!")
} else {
val categoryItemRow = rdd.map(reducedItem => {
val category = reducedItem._1.split("_")(0)
val item = reducedItem._1.split("_")(1)
val click_count = reducedItem._2
Row(category, item, click_count)
})
val structType = StructType(Array(
new StructField("category", StringType, true),
new StructField("item", StringType, true),
new StructField("click_count", IntegerType, true)
))
val hiveContext = new HiveContext(rdd.context)
val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)
categoryItemDF.registerTempTable("categoryItemTable")
val resultDataFrame = hiveContext.sql("SELECT category, item, click_count FROM " +
"(SELECT category, item, click_count, row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank " +
"FROM categoryItemTable) subquery" +
"WHERE rank <=3")
val resultRowRDD = resultDataFrame.rdd

resultRowRDD.foreachPartition( partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
if(partitionOfRecords.isEmpty) {
println("This RDD is not null, but partition is null!!!")
}else {
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => {
val sql = "insert into categorytop3(category, item, click_count) values ('" + record.getAs("category") + "','" +
record.getAs("item") + "'," + record.getAs("click_count") + ")"
val stmt = connection.createStatement()
stmt.executeUpdate(sql)
})
ConnectionPool.returnConnection(connection)
}
})
}
})

//计算后的有效数据一般都会写入Kafka中,下游的计费系统会从Kafka中Pull到有效数据进行计费
ssc.start()
ssc.awaitTermination()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐