第97课: 使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名
2016-05-04 20:03
288 查看
第97课: 使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名
本节课将在之前学习的Spark SQL和 DataFrames彻底剖析的基础上,使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名。并针对此部分进行内幕源码解密。理论与实战并行,并配以源码支持,乃Spark学习之真正王道!今晚8点,YY频道68917580,不见不散!也可网页登录Pandatv观看:http://panda.tv/340992。
2. 实现方案说明:
在spark官方网站上我们看到了如下内容:
You can easily use DataFrames and SQL operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating
a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier word count example to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table
and then queried using SQL.
示例代码请参考如下地址:
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
package com.dt.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
object SqlNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
// Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(15))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Register as table
wordsDataFrame.registerTempTable("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
while (true){
}
}
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
// scalastyle:on println
本节课将在之前学习的Spark SQL和 DataFrames彻底剖析的基础上,使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名。并针对此部分进行内幕源码解密。理论与实战并行,并配以源码支持,乃Spark学习之真正王道!今晚8点,YY频道68917580,不见不散!也可网页登录Pandatv观看:http://panda.tv/340992。
2. 实现方案说明:
在spark官方网站上我们看到了如下内容:
You can easily use DataFrames and SQL operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating
a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier word count example to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table
and then queried using SQL.
示例代码请参考如下地址:
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
package com.dt.spark.streaming
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
object SqlNetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
// Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(15))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
// Convert RDDs of the words DStream to DataFrame and run SQL query
words.foreachRDD { (rdd: RDD[String], time: Time) =>
// Get the singleton instance of SQLContext
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val wordsDataFrame = rdd.map(w => Record(w)).toDF()
// Register as table
wordsDataFrame.registerTempTable("words")
// Do word count on table using SQL and print it
val wordCountsDataFrame =
sqlContext.sql("select word, count(*) as total from words group by word")
println(s"========= $time =========")
wordCountsDataFrame.show()
}
ssc.start()
ssc.awaitTermination()
while (true){
}
}
}
/** Case class for converting RDD to DataFrame */
case class Record(word: String)
/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
// scalastyle:on println
相关文章推荐
- Oracle 11g RAC到单实例ASM的物理Standby搭建
- 基于hiredis的投票系统实现
- mysql报错处理
- Redis 集成Spring(spring-data-redis)
- ssqlit3.0数据库使用方法
- sqlite3数据的使用(xcode 7,ios9)
- 【连载】关系型数据库是如何工作的?(2) - 时间复杂度
- SQL server 内存数据库
- redis学习笔记(4)---跳表zskiplist
- sql中on与where的区别
- Oracle 故障处理总结
- 搭建ORACLE高可用 高性能 高扩展的 MMM_APE 架构
- Activiti-Explorer中设计的流程图保存到哪里去了?
- plsql常用方法-转
- 数据库事务隔离级别-- 脏读、幻读、不可重复读(清晰解释)
- 企业级-Mysql双主互备高可用负载均衡架构(基于GTID主从复制模式)
- Mysql之建立主从同步
- Mysql之源码安装
- mongodb 客户端推荐
- mysql慢查询的配置及mysql进程查看