您的位置:首页 > 数据库

Spark Streaming结合Spark SQL开发案例:电商中不同类别中最热门的商品排名

2017-10-24 12:51 495 查看
object StreamingCombineSqlTmp {
def main(args: Array[String]): Unit = {
/**
* 创建SparkConf
*/
val conf = new SparkConf().setAppName(this.getClass.getName)

/**
* 设置batchDuration时间间隔来控制Job生成的频率并且创建Spark Streaming执行的入口
*
* 参数说明:
* Seconds(5) 流数据的时间间隔将被分成若干组 数据流分割成批的时间间隔
* 为什么要这个参数? sparkStreaming是微批的处理,把流数据分割为非常小的时间间隔分成一批一批的数据
* 时间间隔设置为5,那么每5秒会从数据源创建一个RDD,最小值可以配置500毫秒
*/
val batchInterval = 5
/**
* 通过SparkConf创建StreamingContext
*/
val ssc = new StreamingContext(conf, Seconds(batchInterval))

/**
* 原:
* Set the context to periodically checkpoint the DStream operations for driver
* 设置上下文,定期检查驱动程序的DStream操作
* 容错系统
* 参数:directory
* HDFS-compatible directory where the checkpoint data will be reliably stored.
* 与hdfs兼容的目录,其中检查点数据将被可靠地存储。
* Note that this must be a fault-tolerant file system like HDFS for
* 注意,这必须是一个容错的文件系统,比如HDFS
*
* 检查点:
* 它会上spark streaming定定期创建检查点数据,参数是一个目录的名称,这个目录是hdfs目录
*/
ssc.checkpoint("/root/Documents/SparkApps?checkpoint")

/**
* 输出源:
*
* 从TCP源套接字连接接收数据流的DStream
*
* 创建DStream 返回ReceiverInputDStream[String]
*
* (参数 主机名:端口)
* 主机名:数据源的主机名, 端口:接收数据的连接的端口号
*
* 模拟机制:通过nc
* nc -lk 8888
* 如果linux没有nc可以安装一个yum install nc
*/
val userClickLogsStream = ssc.socketTextStream("hadoop01", 8888)
//val dstreamForHdfs = ssc.textFileStream("hdfs//:8020/tmp/dsteam")

/**
* 返回 MappedDStream[T, U] 备:MappedDStream[T, U]是DStream[U]的子类
* 读取数据流并进行map转换操作返回DStream[U]
*
* 如源数据:假设说这里的数据的格式:user item category,例如Rocky Samsung Android
* 那么这里返回的数据格式 (Android_Samsung, 1)
*
*/
/*val formattedUserClickLogsDStream = userClickLogsStream.map(clickLog => {
( clickLog.split(" ")(2) + "_" +  clickLog.split(" ")(1), 1)
})*/
val formattedUserClickLogsDStream = userClickLogsStream.map(clickLog => {
val clickLogSplitArr = clickLog.split(" ")
if (clickLogSplitArr.length >= 3) {
(clickLogSplitArr(2) + "_" + clickLogSplitArr(1), 1)
} else {
println("-------------------------------------------------------------------------clickLogSplitArr.length less 3")
null
}
})

/**
* DStream的转换操作
* reduceByKeyAndWindow函数:进行窗口计算,时间窗口是60s,每隔20s更新一次滑动一次,中间可以复用40s效率比较高
* _ + _在list上加上新的, _ - _然后减去旧的
*
* (
* reduceFunc: (V, V) => V,
* windowDuration: Duration
* )
* 返回 DStream[(K, V)] 得出任意一种类型的商品在过去60s点击多少次
*/
/*val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow((v1:Int, v2:Int) => v1+v2,
(v1:Int, v2:Int) => v1-v2 , Seconds(60), Seconds(20))*/
val categoryUserClickLogsDStream = formattedUserClickLogsDStream.reduceByKeyAndWindow(_ + _, _ - _, Seconds(60), Seconds(20))

/**
* DStream调用foreachRDD转换为RDD进行操作
* 数据格式 (Android_Samsung, n)
*/
if (categoryUserClickLogsDStream == null) {
return
}
categoryUserClickLogsDStream.foreachRDD(rdd =>
if (rdd == null || rdd.isEmpty()) {
print("No data inputted")
} else {
/**
* 构建DataFram需要的RDD[ROW]一行的数据
*/
val categoryItemRow = rdd.map(reducedItem => {
val category = reducedItem._1.split("_")(0)
val item = reducedItem._1.split("_")(1)
val click_count = reducedItem._2
Row(category, item, click_count)
})

/**
* 构建DataFrame需要 schema: StructType
* dataFrame每一列的类型
*/
val structType = StructType(Array(
StructField("category", StringType, nullable = true),
StructField("item", StringType, nullable = true),
StructField("click_count", IntegerType, nullable = true)
))

/**
* 创建HiveContext
*/
val hiveContext = new HiveContext(rdd.context)
/**
* 根据RDD[ROW]和StruckType构建DataFrame
*/
val categoryItemDF = hiveContext.createDataFrame(categoryItemRow, structType)

/**
* DataFrame注册为一个临时表,可以通过SparkSql去操作
*/
categoryItemDF.registerTempTable("categoryItemTable")

/**
* Spark sql
*/
val resultDataFrame = hiveContext.sql("SELECT category,item,click_count FROM (SELECT category,item,click_count,row_number()" +
"OVER(PARTITION BY category ORDER BY click_count DESC) rank FROM categoryItemTable) subquery " +
"WHERE rank <= 3")

/**
* 展示DataFrame前20行的表格式的数据
*/
resultDataFrame.show()

/**
* DataFrame的rdd方法,返回其RDD[ROW]
*/
val resultRowRDD = resultDataFrame.rdd

/**
* 循环resultRowRDD (RDD[ROW])遍历写入Oracle的categorytop3表中
*/
if (resultRowRDD == null || resultRowRDD.isEmpty()) {
println("resultDataFrame.rdd is null ")
} else {
resultRowRDD.foreachPartition { partitionOfRecords => {

if (partitionOfRecords == null || partitionOfRecords.isEmpty) {
println("this is RDD is not null but partition is null")
} else {
val connection = ConnectionPool.getConnection
partitionOfRecords.foreach(record => {
val sql = "insert into categorytop3(category,item,client_count) values('" + record.getAs("category") + "','" +
record.getAs("item") + "'," + record.getAs("click_count") + ")"
if (connection != null) {
val stmt = connection.createStatement()
val sql1 = new String(sql.getBytes, "UTF-8")
stmt.executeUpdate(sql1)
}
})
if (connection != null)
ConnectionPool.returnConnection(connection)
}
}
}
}

}
)

ssc.start()

ssc.awaitTermination()
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐