大数据IMF传奇行动绝密课程第97课:使用SparkStreaming+SparkSQL实现在线动态计算出特定时间窗口
2017-04-03 21:05
1016 查看
使用SparkStreaming+SparkSQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名
1、Streaming+SQL技术实现解析2、Streaming+SQL实现实战
启动hive metastore
hive --service metastore &
package com.tom.spark.sparkstreaming import org.apache.spark.{SparkConf, rdd} import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.streaming.{Durations, Seconds, StreamingContext} /** * 使用SparkStreaming+Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三种手机 * 电视这个类别下最热门的三种电视,该实例在实际生产环境下具有非常重大的意义 * * 实现技术:SparkStreaming+Spark SQL,之所以Spark Streaming能够使用ML、SQL、graphX等功能是因为有foreachRDD和transform * 等接口,这些接口中其实是基于RDD进行操作的,所以以RDD为基石,就可以直接使用Spark其他所有的功能,就像直接调用API一样简单。 * 假设说这里的数据的格式:user item category,例如Rocky Samsung Android */ object OnlineTop3ItemForEachCategory2DB { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("OnlineTop3ItemForEachCategory2DB").setMaster("local[2]") //此处设置Batch Interval是在Spark Streaming中生成基本Job的时间单位,窗口和滑动时间间隔 // 一定是该Batch Interval的整数倍 val ssc = new StreamingContext(conf, Durations.seconds(5)) ssc.checkpoint("/root/Documents/sparkApps/checkpoint") val userClickLogDStream = ssc.socketTextStream("Master", 9999) //用户搜索的格式简化为name item,在这里我们由于要计算出热点内容,所以只需要提取出item即可, //提取出的item然后通过map转换为(item, 1)格式 val formattedUserClickLogsDStream = userClickLogDStream.filter(_.split(" ").length == 3).map(clickLog => { (clickLog.split(" ")(2) + "_" + clickLog.split(" ")(2), 1) }) val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_, _-_, Seconds(60), Seconds(20)) categoryUserClickLogsDStream.foreachRDD( rdd => { if(rdd.isEmpty()){ println("No data inputted!!!") } else { val categoryItemRow = rdd.map(reducedItem => { val category = reducedItem._1.split("_")(0) val item = reducedItem._1.split("_")(1) val click_count = reducedItem._2 Row(category, item, click_count) }) val structType = StructType(Array( new StructField("category", StringType, true), new StructField("item", StringType, true), new StructField("click_count", IntegerType, true) )) val hiveContext = new HiveContext(rdd.context) val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType) categoryItemDF.registerTempTable("categoryItemTable") val resultDataFrame = hiveContext.sql("SELECT category, item, click_count FROM " + "(SELECT category, item, click_count, row_number() OVER (PARTITION BY category ORDER BY click_count DESC) rank " + "FROM categoryItemTable) subquery" + "WHERE rank <=3") val resultRowRDD = resultDataFrame.rdd resultRowRDD.foreachPartition( partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections if(partitionOfRecords.isEmpty) { println("This RDD is not null, but partition is null!!!") }else { val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => { val sql = "insert into categorytop3(category, item, click_count) values ('" + record.getAs("category") + "','" + record.getAs("item") + "'," + record.getAs("click_count") + ")" val stmt = connection.createStatement() stmt.executeUpdate(sql) }) ConnectionPool.returnConnection(connection) } }) } }) //计算后的有效数据一般都会写入Kafka中,下游的计费系统会从Kafka中Pull到有效数据进行计费 ssc.start() ssc.awaitTermination() } }
相关文章推荐
- 第97课: 使用Spark Streaming+Spark SQL+mysql 实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名(详细内幕版本)
- 第97课: 使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名
- 大数据IMF传奇行动绝密课程第100-101课:使用Spark Streaming+Spark SQL+Kafka+FileSystem综合案例
- 大数据IMF传奇行动绝密课程第103课:动手实战Spark Streaming Broadcast、Accumulator实现在线黑名单过滤和计数
- 大数据IMF传奇行动绝密课程第98-99课:使用Spark Streaming实战对论坛网站动态行为的多维度分析
- 大数据IMF传奇行动绝密课程第118课:Spark Streaming性能优化:如何获得和持续使用足够的集群计算资源
- 大数据IMF传奇行动绝密课程第94课:SparkStreaming实现广告计费系统中在线黑名单过滤实战
- 大数据IMF传奇行动绝密课程第62课:Spark SQL下的Parquet使用最佳实践和代码实战
- 大数据IMF传奇行动绝密课程第116课:Spark Streaming性能优化:如何在毫秒内处理大吞吐量和数据波动比较大的流计算
- 第97课:Spark Streaming在线动态计算特定时间窗口下热门商品排名
- 大数据IMF传奇行动绝密课程第83课:透彻讲解使用Scala和Java两种方式实战Spark Streaming开发
- 大数据IMF传奇行动绝密课程第92课:SparkStreaming中Transformations和状态管理解密
- 大数据IMF传奇行动绝密课程第85课:基于HDFS的SparkStreaming案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第88课:SparkStreaming从Flume Poll数据案例实战和内幕源码解密
- 大数据IMF传奇行动绝密课程第75-79课:Spark SQL基于网站Log的综合案例实战
- 大数据IMF传奇行动绝密课程第64课:Spark SQL下Parquet的数据切分和压缩内幕详解
- 大数据IMF传奇行动绝密课程第81课:一节课贯通Spark SQL工作源码流程
- 大数据IMF传奇行动绝密课程第65课:Spark SQL下Parquet深入进阶
- 大数据IMF传奇行动绝密课程第87课:Flume推送数据到Spark Streaming案例实战和内幕源码解密