Spark SQL 笔记(14)——实战网站日志分析(4)代码重构之删除指定日期已有的数据
2018-11-15 17:20
591 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u012292754/article/details/84106798
1
2
1 StatDAO.scala
package com.weblog.cn import java.sql.{Connection, PreparedStatement} import scala.collection.mutable.ListBuffer /* * 各个维度统计 DAO 操作 * */ object StatDAO { /* * 批量保存 DayVideoAccessStat 到数据库 * */ def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]) = { var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySQLUtils.getConnection() connection.setAutoCommit(false) //设置手动提交 val sql = "insert into day_video_access_topn_stat(day,cms_id,times) values (?,?,?) " pstmt = connection.prepareStatement(sql) for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.cmsId) pstmt.setLong(3, ele.times) pstmt.addBatch() } pstmt.executeBatch() //执行批量处理 connection.commit() //手动提交 } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } } /* * 批量保存 DayCityVideoAccessStat 到数据库 * */ def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]) = { var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySQLUtils.getConnection() connection.setAutoCommit(false) //设置手动提交 val sql = "insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) " pstmt = connection.prepareStatement(sql) for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.cmsId) pstmt.setString(3, ele.city) pstmt.setLong(4, ele.times) pstmt.setInt(5, ele.timesRank) pstmt.addBatch() } pstmt.executeBatch() //执行批量处理 connection.commit() //手动提交 } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } } /* * 批量保存 DayVideoTrafficsStat 到数据库 * */ def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]) = { var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySQLUtils.getConnection() connection.setAutoCommit(false) //设置手动提交 val sql = "insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) " pstmt = connection.prepareStatement(sql) for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.cmsId) pstmt.setLong(3, ele.traffics) pstmt.addBatch() } pstmt.executeBatch() 20000 //执行批量处理 connection.commit() //手动提交 } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } } /* * 删除指定日期的数据 * */ def deleteData(day: String) = { val tables = Array( "day_video_access_topn_stat", "day_video_city_access_topn_stat", "day_video_traffics_topn_stat") var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySQLUtils.getConnection() for (table <- tables) { val deleteSQL = s"delete from $table where day=?" pstmt = connection.prepareStatement(deleteSQL) pstmt.setString(1, day) pstmt.executeUpdate() } } catch { case e: Exception => e.printStackTrace() } finally { MySQLUtils.release(connection, pstmt) } } }
2 TopNStatJob.scala
package com.weblog.cn import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import scala.collection.mutable.ListBuffer object TopNStatJob { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("TopNStatJobApp") .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") .master("local[2]").getOrCreate() val accessDF = spark.read.format("parquet").load("d://weblog_clean") val day = "20170511" StatDAO.deleteData(day) // accessDF.printSchema() // accessDF.show(false) //最受欢迎的 TopN 课程 videoAccessTopNStat(spark, accessDF, day) //按照地市进行统计TopN课程 cityAccessTopNStat(spark, accessDF, day) videoTrafficTopNStat(spark, accessDF, day) spark.stop() } /* * 按照流量统计TopN * */ def videoTrafficTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = { import spark.implicits._ val trafficTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video") .groupBy("day", "cmsId").agg(sum("traffic").as("traffics")) .orderBy($"traffics".desc) //.show(false) /* * 将统计结果写入到 MySQL * */ try { trafficTopNDF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayVideoTrafficsStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val cmsId = info.getAs[Long]("cmsId") val traffics = info.getAs[Long]("traffics") list.append(DayVideoTrafficsStat(day, cmsId, traffics)) }) StatDAO.insertDayVideoTrafficsAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() } } /* * *按照地市进行统计TopN课程 * */ def cityAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String) = { import spark.implicits._ val cityAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video") .groupBy("day", "cmsId", "city") .agg(count("cmsId").as("times")) //cityAccessTopNDF.show(false) /* * Window 函数在 Spark SQL 中的使用 * */ val top3DF = cityAccessTopNDF.select( cityAccessTopNDF("day"), cityAccessTopNDF("city"), cityAccessTopNDF("cmsId"), cityAccessTopNDF("times"), row_number().over(Window.partitionBy(cityAccessTopNDF("city")) .orderBy(cityAccessTopNDF("city").desc) ).as("times_rank") ).filter("times_rank <= 3") //.show(false) //统计每个地市 Top3 /* * 将统计结果写入到 MySQL * */ try { top3DF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayCityVideoAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val cmsId = info.getAs[Long]("cmsId") val city = info.getAs[String]("city") val times = info.getAs[Long]("times") val timesRank = info.getAs[Int]("times_rank") list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank)) }) StatDAO.insertDayCityVideoAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() } } /* * 最受欢迎的 TopN课程 * */ def videoAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String) = { //DataFrame 方式 import spark.implicits._ val videoAccessTopNDF = accessDF.filter($"day" === day && $"cmsType" === "video") .groupBy("day", "cmsId").agg(count("cmsId").as("times")) .orderBy($"times".desc) videoAccessTopNDF.show(false) /*//使用SQL 方式统计 accessDF.createOrReplaceTempView("access_logs") val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_logs where day='20170511' and cmsType='video'" + " group by day,cmsId order by times desc")*/ //videoAccessTopNDF.show(false) /* * * 将统计结果写入到 MySQL中 * */ try { videoAccessTopNDF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayVideoAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val cmsId = info.getAs[Long]("cmsId") val times = info.getAs[Long]("times") list.append(DayVideoAccessStat(day, cmsId, times)) }) StatDAO.insertDayVideoAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() } } }阅读更多
相关文章推荐
- Spark SQL 笔记(15)——实战网站日志分析(5)数据可视化
- Spark 08 Spark SQL 实战:日志分析(一)介绍、数据清洗
- 大数据IMF传奇行动绝密课程第80课:Spark SQL网站搜索综合案例实战
- spark搜狗日志数据分析实战
- Hadoop之网站日志分析项目案例(二)数据清洗(笔记22)
- Hadoop学习笔记—20.网站日志分析项目案例(二)数据清洗
- 大数据IMF传奇行动绝密课程第62课:Spark SQL下的Parquet使用最佳实践和代码实战
- 大数据技术学习笔记之网站流量日志分析项目:Flume日志采集系统1
- Hadoop学习笔记—20.网站日志分析项目案例(二)数据清洗
- 第61课:SparkSQl数据加载和保存内幕深度解密实战学习笔记
- Hadoop学习笔记—20.网站日志分析项目案例(二)数据清洗
- 大数据Spark “蘑菇云”行动补充内容第70课: Spark SQL代码实战和性能调优 4个spark sql调优技巧有用!!!!
- 大数据Spark企业级实战版【学习笔记】-----Spark Streaming案例分析
- python数据分析与挖掘学习笔记(6)-电商网站数据分析及商品自动推荐实战与关联规则算法
- Spark Streaming 实战 日志分析(二)数据可视化
- CK2255-以慕课网日志分析为例 进入大数据 Spark SQL 的世界
- 大数据Spark “蘑菇云”行动第65课: 页面跳转功能代码骨架分析与实战 放scala代码
- Hadoop学习笔记—20.网站日志分析项目案例(二)数据清洗
- Hadoop学习笔记—20.网站日志分析项目案例(二)数据清洗
- hadoop 实战——网站日志数据分析