您的位置:首页 > 数据库

Spark SQL 笔记(14)——实战网站日志分析(4)代码重构之删除指定日期已有的数据

2018-11-15 17:20 591 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012292754/article/details/84106798

1
StatDAO.scala

package com.weblog.cn

import java.sql.{Connection, PreparedStatement}

import scala.collection.mutable.ListBuffer

/*
* 各个维度统计 DAO 操作
* */
object StatDAO {

/*
* 批量保存 DayVideoAccessStat 到数据库
* */
def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]) = {
var connection: Connection = null
var pstmt: PreparedStatement = null

try {
connection = MySQLUtils.getConnection()

connection.setAutoCommit(false) //设置手动提交

val sql = "insert into day_video_access_topn_stat(day,cms_id,times) values (?,?,?) "
pstmt = connection.prepareStatement(sql)

for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.times)

pstmt.addBatch()
}

pstmt.executeBatch() //执行批量处理
connection.commit() //手动提交

} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}

}

/*
* 批量保存 DayCityVideoAccessStat 到数据库
* */
def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]) = {
var connection: Connection = null
var pstmt: PreparedStatement = null

try {
connection = MySQLUtils.getConnection()

connection.setAutoCommit(false) //设置手动提交

val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) "
pstmt = connection.prepareStatement(sql)

for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setString(3, ele.city)
pstmt.setLong(4, ele.times)
pstmt.setInt(5, ele.timesRank)
pstmt.addBatch()
}

pstmt.executeBatch() //执行批量处理
connection.commit() //手动提交

} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}

}

/*
* 批量保存 DayVideoTrafficsStat 到数据库
* */
def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]) = {
var connection: Connection = null
var pstmt: PreparedStatement = null

try {
connection = MySQLUtils.getConnection()

connection.setAutoCommit(false) //设置手动提交

val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) "
pstmt = connection.prepareStatement(sql)

for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.cmsId)
pstmt.setLong(3, ele.traffics)
pstmt.addBatch()
}

pstmt.executeBatch()
20000
//执行批量处理
connection.commit() //手动提交

} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}

}

/*
* 删除指定日期的数据
* */
def deleteData(day: String) = {
val tables = Array(
"day_video_access_topn_stat",
"day_video_city_access_topn_stat",
"day_video_traffics_topn_stat")

var connection: Connection = null
var pstmt: PreparedStatement = null

try {
connection = MySQLUtils.getConnection()
for (table <- tables) {
val deleteSQL = s"delete from $table where day=?"
pstmt = connection.prepareStatement(deleteSQL)
pstmt.setString(1, day)
pstmt.executeUpdate()
}

} catch {
case e: Exception => e.printStackTrace()
} finally {
MySQLUtils.release(connection, pstmt)
}

}

}

2
TopNStatJob.scala

package com.weblog.cn

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

object TopNStatJob {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("TopNStatJobApp")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
.master("local[2]").getOrCreate()

val accessDF = spark.read.format("parquet").load("d://weblog_clean")

val day = "20170511"
StatDAO.deleteData(day)

//    accessDF.printSchema()
//    accessDF.show(false)

//最受欢迎的 TopN 课程
videoAccessTopNStat(spark, accessDF, day)

//按照地市进行统计TopN课程
cityAccessTopNStat(spark, accessDF, day)

videoTrafficTopNStat(spark, accessDF, day)

spark.stop()

}

/*
* 按照流量统计TopN
* */
def videoTrafficTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {
import spark.implicits._

val trafficTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day", "cmsId").agg(sum("traffic").as("traffics"))
.orderBy($"traffics".desc) //.show(false)

/*
* 将统计结果写入到 MySQL
* */
try {
trafficTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoTrafficsStat]

partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val traffics = info.getAs[Long]("traffics")

list.append(DayVideoTrafficsStat(day, cmsId, traffics))

})

StatDAO.insertDayVideoTrafficsAccessTopN(list)
})
} catch {
case e: Exception => e.printStackTrace()
}

}

/*
*
*按照地市进行统计TopN课程
* */
def cityAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String) = {
import spark.implicits._

val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day", "cmsId", "city")
.agg(count("cmsId").as("times"))

//cityAccessTopNDF.show(false)

/*
* Window 函数在 Spark SQL 中的使用
* */
val top3DF = cityAccessTopNDF.select(
cityAccessTopNDF("day"),
cityAccessTopNDF("city"),
cityAccessTopNDF("cmsId"),
cityAccessTopNDF("times"),
row_number().over(Window.partitionBy(cityAccessTopNDF("city"))
.orderBy(cityAccessTopNDF("city").desc)
).as("times_rank")

).filter("times_rank <= 3") //.show(false) //统计每个地市 Top3

/*
* 将统计结果写入到 MySQL
* */
try {
top3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]

partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")

list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))

})

StatDAO.insertDayCityVideoAccessTopN(list)

})
} catch {
case e: Exception => e.printStackTrace()
}

}

/*
* 最受欢迎的 TopN课程
* */
def videoAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String) = {

//DataFrame 方式
import spark.implicits._

val videoAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video")
.groupBy("day", "cmsId").agg(count("cmsId").as("times"))
.orderBy($"times".desc)

videoAccessTopNDF.show(false)

/*//使用SQL 方式统计
accessDF.createOrReplaceTempView("access_logs")
val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_logs where day='20170511' and cmsType='video'" +
" group by day,cmsId order by times desc")*/

//videoAccessTopNDF.show(false)

/*
*
* 将统计结果写入到 MySQL中
* */

try {
videoAccessTopNDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVideoAccessStat]
partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val cmsId = info.getAs[Long]("cmsId")
val times = info.getAs[Long]("times")

list.append(DayVideoAccessStat(day, cmsId, times))

})

StatDAO.insertDayVideoAccessTopN(list)

})
} catch {
case e: Exception => e.printStackTrace()
}
}

}
阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐