Spark 09 Spark SQL 实战:日志分析(二)实现需求
2018-09-01 12:01
459 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lihaogn/article/details/82285930
1 需求
- 统计最受欢迎的课程TopN访问次数
- 按地市统计最受欢迎的TopN课程
- 按流量统计最受欢迎的TopN课程
2 在MySQL中创建数据库、创建表
// 创建数据库 create database sparkSql_project; // 创建表 create table day_vedio_access_topn_stat( day varchar(8) not null, class_id bigint(10) not null, times bigint(10) not null, primary key (day,class_id) ); create table day_vedio_city_access_topn_stat( day varchar(8) not null, class_id bigint(10) not null, city varchar(20) not null, times bigint(10) not null, times_rank int not null, primary key (day,class_id,city) ); create table day_vedio_traffics_topn_stat( day varchar(8) not null, class_id bigint(10) not null, traffics bigint(20) not null, primary key (day,class_id) );
3 编程
1)pom.xml中添加依赖
<!-- 添加MySQL驱动依赖--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency>
2)TopNStatJob.scala -> 统计数据、并写入数据库
package log import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import scala.collection.mutable.ListBuffer /** * 统计spark作业 */ object TopNStatJob { /** * 最受欢迎的topN课程 * * @param spark * @param accessDF * @param day */ def videoAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = { /** * 使用DataFrame方式统计 */ import spark.implicits._ val videoAccessTopDF = accessDF.filter($"day" === day && $"classType" === "video") .groupBy("day", "classId").agg(count("classId").as("times")).orderBy($"times".desc) // videoAccessTopDF.show(false) /** * 使用SQL方式进行统计 */ // accessDF.createOrReplaceTempView("access_logs") // val videoAccessTopDF=spark.sql("select day,classId, count(1) as times from access_logs "+ // "where day=" + day + " and classType='video' " + // "group by day,classId order by times desc") // // videoAccessTopDF.show(false) /** * 将统计结果写入mysql */ try { videoAccessTopDF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayVedioAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val classId = info.getAs[Long]("classId") val times = info.getAs[Long]("times") list.append(DayVedioAccessStat(day, classId, times)) }) // 将数据插入数据库 StatDAO.insertDayVedioAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() } } /** * 按地市统计topN课程 * * @param spark * @param accessDF * @param day * @return */ def cityAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = { import spark.implicits._ val cityAccessTopDF = accessDF.filter($"day" === day && $"classType" === "video") .groupBy("day", "city", "classId").agg(count("classId").as("times")) // cityAccessTopDF.show(false) // window函数在spark sql的使用 val top3DF = cityAccessTopDF.select( cityAccessTopDF("day"), cityAccessTopDF("city"), cityAccessTopDF("classId"), cityAccessTopDF("times"), row_number().over(Window.partitionBy(cityAccessTopDF("city")) .orderBy(cityAccessTopDF("times").desc)) .as("times_rank") ).filter("times_rank <= 3") /** * 将统计结果写入mysql */ try { top3DF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayCityVideoAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val classId = info.getAs[Long]("classId") val city = info.getAs[String]("city") val times = info.getAs[Long]("times") val timesRank = info.getAs[Int]("times_rank") list.append(DayCityVideoAccessStat(day, classId, city, times, timesRank)) }) // 将数据插入数据库 StatDAO.insertDayCityVedioAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() } } /** * 按流量统计topN课程 * * @param spark * @param accessDF * @param day * @return */ def videoTrafficsTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = { import spark.implicits._ val videoAccessTopDF = accessDF.filter($"day" === day && $"classType" === "video") .groupBy("day", "classId").agg(sum("traffic").as("traffics")).orderBy($"traffics".desc) // videoAccessTopDF.show(false) /** * 将统计结果写入mysql */ try { videoAccessTopDF.foreachPartition(partitionOfRecords => { val list = new ListBuffer[DayTrafficVideoAccessStat] partitionOfRecords.foreach(info => { val day = info.getAs[String]("day") val classId = info.getAs[Long]("classId") val traffics = info.getAs[Long]("traffics") list.append(DayTrafficVideoAccessStat(day, classId, traffics)) }) // 将数据插入数据库 StatDAO.insertDayVedioTrafficsAccessTopN(list) }) } catch { case e: Exception => e.printStackTrace() } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("TopNStatJob") .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false") .master("local[2]").getOrCreate() val accessDF = spark.read.format("parquet").load("/Users/Mac/testdata/log_clean") // accessDF.printSchema() // accessDF.show(30, false) val day = "20161110" StatDAO.deleteData(day) // 最受欢迎的topN课程 videoAccessTopNStat(spark, accessDF, day) // 按地市统计topN课程 cityAccessTopNStat(spark, accessDF, day) // 按流量统计topN课程 videoTrafficsTopNStat(spark, accessDF, day) spark.stop() } }
3)StatDAO.scala -> 向数据库中插入数据
package log import java.sql.{Connection, PreparedStatement} import scala.collection.mutable.ListBuffer /** * 各个维度统计的DAO操作 */ object StatDAO { /** * 批量保存DayVedioAccessStat到数据库 */ def insertDayVedioAccessTopN(list: ListBuffer[DayVedioAccessStat]): Unit = { var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySqlUtils.getConnection() // 获取数据库连接 connection.setAutoCommit(false) // 设置手动提交 val sql = "insert into day_vedio_access_topn_stat(day,class_id,times) values(?,?,?)" pstmt = connection.prepareStatement(sql) for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.classId) pstmt.setLong(3, ele.times) pstmt.addBatch() } pstmt.executeBatch() // 执行批量处理 connection.commit() // 手动提交 } catch { case e: Exception => e.printStackTrace() } finally { MySqlUtils.release(connection, pstmt) } } /** * 批量保存DayCityVideoAccessStat到数据库 */ def insertDayCityVedioAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = { var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySqlUtils.getConnection() // 获取数据库连接 connection.setAutoCommit(false) // 设置手动提交 val sql = "insert into day_vedio_city_access_topn_stat(day,class_id,city,times,times_rank) values(?,?,?,?,?)" pstmt = connection.prepareStatement(sql) for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.classId) pstmt.setString(3, ele.city) pstmt.setLong(4, ele.times) pstmt.setInt(5, ele.timesRank) pstmt.addBatch() } pstmt.executeBatch() // 执行批量处理 connection.commit() // 手动提交 } catch { case e: Exception => e.printStackTrace() } finally { MySqlUtils.release(connection, pstmt) } } /** * 批量保存 DayTrafficVideoAccessStat 到数据库 */ def insertDayVedioTrafficsAccessTopN(list: ListBuffer[DayTrafficVideoAccessStat]): Unit = { var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySqlUtils.getConnection() // 获取数据库连接 connection.setAutoCommit(false) // 设置手动提交 val sql = "insert into day_vedio_traffics_topn_stat(day,class_id,traffics) values(?,?,?)" pstmt = connection.prepareStatement(sql) for (ele <- list) { pstmt.setString(1, ele.day) pstmt.setLong(2, ele.classId) pstmt.setLong(3, ele.traffics) pstmt.addBatch() } pstmt.executeBatch() // 执行批量处理 connection.commit() // 手动提交 } catch { case e: Exception => e.printStackTrace() } finally { MySqlUtils.release(connection, pstmt) } } /** * 删除指定日期的数据 */ def deleteData(day: String): Unit = { val tables = Array("day_vedio_access_topn_stat", "day_vedio_city_access_topn_stat", "day_vedio_traffics_topn_stat") var connection: Connection = null var pstmt: PreparedStatement = null try { connection = MySqlUtils.getConnection() for (table <- tables) { val deleteSQL = s"delete from $table where day=?" pstmt = connection.prepareStatement(deleteSQL) pstmt.setString(1, day) pstmt.executeUpdate() } } catch { case e: Exception => e.printStackTrace() } finally { MySqlUtils.release(connection, pstmt) } } }
4)MySqlUtils.scala -> 获取、释放数据库连接
package log import java.sql.{Connection, DriverManager, PreparedStatement} /** * MySQL操作工具类 */ object MySqlUtils { // 获取数据库连接 def getConnection() = { DriverManager.getConnection("jdbc:mysql://localhost:3306/sparkSql_project?user=root&password=rootroot") } // 释放数据库连接资源 def release(connection: Connection, pstmt: PreparedStatement): Unit = { try { if (pstmt != null) { pstmt.close() } } catch { case e: Exception => e.printStackTrace() } finally { if (connection != null) { connection.close() } } } def main(args: Array[String]): Unit = { println(getConnection()) } }
5)实体类
DayVedioAccessStat.scala
package log /** * 每天课程访问次数实体类 * @param day * @param classId * @param times */ case class DayVedioAccessStat(day: String, classId: Long, times: Long)
DayCityVideoAccessStat.scala
package log case class DayCityVideoAccessStat(day:String,classId:Long,city:String,times:Long,timesRank:Int)
DayTrafficVideoAccessStat.scala
package log case class DayTrafficVideoAccessStat(day:String,classId:Long,traffics:Long)
4 结果
相关文章推荐
- Spark(1.6.1) Sql 编程指南+实战案例分析
- spark高级数据分析实战---随机森林实现
- Spark商业案例与性能调优实战100课》第2课:商业案例之通过RDD实现分析大数据电影点评系统中电影流行度分析
- 使用SparkSQL 分析日志中IP数、流量等数据
- Spark入门实战系列 (做个标签,转载基于 ELK Stack 和 Spark Streaming 的日志处理平台设计与实现)
- Spring+Log4j+ActiveMQ实现远程记录日志——实战+分析
- 使用Spark进行流式实时日志分析系统的实现
- Hadoop经典案例Spark实现(七)——日志分析:分析非结构化文件
- Spark 实战,第 3 部分: 使用 Spark SQL 对结构化数据进行统计分析
- Spark商业案例与性能调优实战100课》第16课:商业案例之NBA篮球运动员大数据分析系统架构和实现思路
- Spark Streaming 实战 日志分析(二)数据可视化
- (3)Storm实时日志分析实战--编码实现
- spark大数据分析实战案列1学习 (lamda架构日志分析流水线)
- 第67课:Spark SQL下采用Java和Scala实现Join的案例综合实战(巩固前面学习的Spark SQL知识)
- 《Spark商业案例与性能调优实战100课》第1课:商业案例之通过RDD实现分析大数据电影点评系统中电影的用户行为信息
- 《Spark商业案例与性能调优实战100课》第6课:商业案例之通过Spark SQL实现大数据电影用户行为分析
- Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现
- Spark 实战,第 3 部分: 使用 Spark SQL 对结构化数据进行统计分析
- 《Spark商业案例与性能调优实战100课》第9课:商业案例之通过Spark SQL 下两种不同方式实现口碑最佳和最热门电影比较