您的位置:首页 > 数据库

第97课: 使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名

2016-05-04 20:03 288 查看
第97课: 使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名

本节课将在之前学习的Spark SQL和 DataFrames彻底剖析的基础上,使用Spark Streaming+Spark SQL实现在线动态计算出特定时间窗口下的不同种类商品中的热门商品排名。并针对此部分进行内幕源码解密。理论与实战并行,并配以源码支持,乃Spark学习之真正王道!今晚8点,YY频道68917580,不见不散!也可网页登录Pandatv观看:http://panda.tv/340992。

2. 实现方案说明:

在spark官方网站上我们看到了如下内容:

You can easily use DataFrames and SQL operations on streaming data. You have to create a SQLContext using the SparkContext that the StreamingContext is using. Furthermore this has to done such that it can be restarted on driver failures. This is done by creating
a lazily instantiated singleton instance of SQLContext. This is shown in the following example. It modifies the earlier word count example to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table
and then queried using SQL.

示例代码请参考如下地址:
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
package com.dt.spark.streaming

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SQLContext

import org.apache.spark.storage.StorageLevel

import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

object SqlNetworkWordCount {

def main(args: Array[String]) {

if (args.length < 2) {

System.err.println("Usage: NetworkWordCount <hostname> <port>")

System.exit(1)

}

//StreamingExamples.setStreamingLogLevels()

// Create the context with a 2 second batch size

val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount")

.setMaster("local[2]")

val ssc = new StreamingContext(sparkConf, Seconds(15))

// Create a socket stream on target ip:port and count the

// words in input stream of \n delimited text (eg. generated by 'nc')

// Note that no duplication in storage level only for running locally.

// Replication necessary in distributed scenario for fault tolerance.

val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)

val words = lines.flatMap(_.split(" "))

// Convert RDDs of the words DStream to DataFrame and run SQL query

words.foreachRDD { (rdd: RDD[String], time: Time) =>

// Get the singleton instance of SQLContext

val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)

import sqlContext.implicits._

// Convert RDD[String] to RDD[case class] to DataFrame

val wordsDataFrame = rdd.map(w => Record(w)).toDF()

// Register as table

wordsDataFrame.registerTempTable("words")

// Do word count on table using SQL and print it

val wordCountsDataFrame =

sqlContext.sql("select word, count(*) as total from words group by word")

println(s"========= $time =========")

wordCountsDataFrame.show()

}

ssc.start()

ssc.awaitTermination()

while (true){

}

}

}

/** Case class for converting RDD to DataFrame */

case class Record(word: String)

/** Lazily instantiated singleton instance of SQLContext */

object SQLContextSingleton {

@transient private var instance: SQLContext = _

def getInstance(sparkContext: SparkContext): SQLContext = {

if (instance == null) {

instance = new SQLContext(sparkContext)

}

instance

}

}

// scalastyle:on println









内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: