您的位置:首页 > 数据库

Spark 09 Spark SQL 实战:日志分析(二)实现需求

2018-09-01 12:01 459 查看
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/lihaogn/article/details/82285930
1 需求
  • 统计最受欢迎的课程TopN访问次数
  • 按地市统计最受欢迎的TopN课程
  • 按流量统计最受欢迎的TopN课程
2 在MySQL中创建数据库、创建表
// 创建数据库
create database sparkSql_project;

// 创建表
create table day_vedio_access_topn_stat(
day varchar(8) not null,
class_id bigint(10) not null,
times bigint(10) not null,
primary key (day,class_id)
);

create table day_vedio_city_access_topn_stat(
day varchar(8) not null,
class_id bigint(10) not null,
city varchar(20) not null,
times bigint(10) not null,
times_rank int not null,
primary key (day,class_id,city)
);

create table day_vedio_traffics_topn_stat(
day varchar(8) not null,
class_id bigint(10) not null,
traffics bigint(20) not null,
primary key (day,class_id)
);
3 编程

1)pom.xml中添加依赖

<!-- 添加MySQL驱动依赖-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

2)TopNStatJob.scala -> 统计数据、并写入数据库

package log

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

import scala.collection.mutable.ListBuffer

/**
* 统计spark作业
*/
object TopNStatJob {

/**
* 最受欢迎的topN课程
*
* @param spark
* @param accessDF
* @param day
*/
def videoAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {
/**
* 使用DataFrame方式统计
*/
import spark.implicits._

val videoAccessTopDF = accessDF.filter($"day" === day && $"classType" === "video")
.groupBy("day", "classId").agg(count("classId").as("times")).orderBy($"times".desc)

//    videoAccessTopDF.show(false)

/**
* 使用SQL方式进行统计
*/
//    accessDF.createOrReplaceTempView("access_logs")
//    val videoAccessTopDF=spark.sql("select day,classId, count(1) as times from access_logs "+
//      "where day=" + day + " and classType='video' " +
//      "group by day,classId order by times desc")
//
//    videoAccessTopDF.show(false)

/**
* 将统计结果写入mysql
*/

try {
videoAccessTopDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayVedioAccessStat]

partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val classId = info.getAs[Long]("classId")
val times = info.getAs[Long]("times")

list.append(DayVedioAccessStat(day, classId, times))
})

// 将数据插入数据库
StatDAO.insertDayVedioAccessTopN(list)

})
} catch {
case e: Exception => e.printStackTrace()
}

}

/**
* 按地市统计topN课程
*
* @param spark
* @param accessDF
* @param day
* @return
*/
def cityAccessTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {

import spark.implicits._

val cityAccessTopDF = accessDF.filter($"day" === day && $"classType" === "video")
.groupBy("day", "city", "classId").agg(count("classId").as("times"))

//    cityAccessTopDF.show(false)

// window函数在spark sql的使用
val top3DF = cityAccessTopDF.select(
cityAccessTopDF("day"),
cityAccessTopDF("city"),
cityAccessTopDF("classId"),
cityAccessTopDF("times"),
row_number().over(Window.partitionBy(cityAccessTopDF("city"))
.orderBy(cityAccessTopDF("times").desc))
.as("times_rank")
).filter("times_rank <= 3")

/**
* 将统计结果写入mysql
*/

try {
top3DF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayCityVideoAccessStat]

partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val classId = info.getAs[Long]("classId")
val city = info.getAs[String]("city")
val times = info.getAs[Long]("times")
val timesRank = info.getAs[Int]("times_rank")

list.append(DayCityVideoAccessStat(day, classId, city, times, timesRank))
})

// 将数据插入数据库
StatDAO.insertDayCityVedioAccessTopN(list)

})
} catch {
case e: Exception => e.printStackTrace()
}
}

/**
* 按流量统计topN课程
*
* @param spark
* @param accessDF
* @param day
* @return
*/
def videoTrafficsTopNStat(spark: SparkSession, accessDF: DataFrame, day: String): Unit = {

import spark.implicits._

val videoAccessTopDF = accessDF.filter($"day" === day && $"classType" === "video")
.groupBy("day", "classId").agg(sum("traffic").as("traffics")).orderBy($"traffics".desc)

//    videoAccessTopDF.show(false)

/**
* 将统计结果写入mysql
*/
try {
videoAccessTopDF.foreachPartition(partitionOfRecords => {
val list = new ListBuffer[DayTrafficVideoAccessStat]

partitionOfRecords.foreach(info => {
val day = info.getAs[String]("day")
val classId = info.getAs[Long]("classId")
val traffics = info.getAs[Long]("traffics")

list.append(DayTrafficVideoAccessStat(day, classId, traffics))
})

// 将数据插入数据库
StatDAO.insertDayVedioTrafficsAccessTopN(list)

})
} catch {
case e: Exception => e.printStackTrace()
}
}

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("TopNStatJob")
.config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
.master("local[2]").getOrCreate()

val accessDF = spark.read.format("parquet").load("/Users/Mac/testdata/log_clean")

//    accessDF.printSchema()
//    accessDF.show(30, false)

val day = "20161110"

StatDAO.deleteData(day)

// 最受欢迎的topN课程
videoAccessTopNStat(spark, accessDF, day)

// 按地市统计topN课程
cityAccessTopNStat(spark, accessDF, day)

// 按流量统计topN课程
videoTrafficsTopNStat(spark, accessDF, day)

spark.stop()
}
}

3)StatDAO.scala -> 向数据库中插入数据

package log

import java.sql.{Connection, PreparedStatement}

import scala.collection.mutable.ListBuffer

/**
* 各个维度统计的DAO操作
*/
object StatDAO {

/**
* 批量保存DayVedioAccessStat到数据库
*/
def insertDayVedioAccessTopN(list: ListBuffer[DayVedioAccessStat]): Unit = {

var connection: Connection = null
var pstmt: PreparedStatement = null

try {
connection = MySqlUtils.getConnection() // 获取数据库连接
connection.setAutoCommit(false) // 设置手动提交

val sql = "insert into day_vedio_access_topn_stat(day,class_id,times) values(?,?,?)"
pstmt = connection.prepareStatement(sql)

for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.classId)
pstmt.setLong(3, ele.times)

pstmt.addBatch()
}

pstmt.executeBatch() // 执行批量处理
connection.commit() // 手动提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySqlUtils.release(connection, pstmt)
}
}

/**
* 批量保存DayCityVideoAccessStat到数据库
*/
def insertDayCityVedioAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {

var connection: Connection = null
var pstmt: PreparedStatement = null

try {
connection = MySqlUtils.getConnection() // 获取数据库连接
connection.setAutoCommit(false) // 设置手动提交

val sql = "insert into day_vedio_city_access_topn_stat(day,class_id,city,times,times_rank) values(?,?,?,?,?)"
pstmt = connection.prepareStatement(sql)

for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.classId)
pstmt.setString(3, ele.city)
pstmt.setLong(4, ele.times)
pstmt.setInt(5, ele.timesRank)

pstmt.addBatch()
}

pstmt.executeBatch() // 执行批量处理
connection.commit() // 手动提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySqlUtils.release(connection, pstmt)
}
}

/**
* 批量保存 DayTrafficVideoAccessStat 到数据库
*/
def insertDayVedioTrafficsAccessTopN(list: ListBuffer[DayTrafficVideoAccessStat]): Unit = {

var connection: Connection = null
var pstmt: PreparedStatement = null

try {
connection = MySqlUtils.getConnection() // 获取数据库连接
connection.setAutoCommit(false) // 设置手动提交

val sql = "insert into day_vedio_traffics_topn_stat(day,class_id,traffics) values(?,?,?)"
pstmt = connection.prepareStatement(sql)

for (ele <- list) {
pstmt.setString(1, ele.day)
pstmt.setLong(2, ele.classId)
pstmt.setLong(3, ele.traffics)

pstmt.addBatch()
}

pstmt.executeBatch() // 执行批量处理
connection.commit() // 手动提交
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySqlUtils.release(connection, pstmt)
}
}

/**
* 删除指定日期的数据
*/
def deleteData(day: String): Unit = {

val tables = Array("day_vedio_access_topn_stat",
"day_vedio_city_access_topn_stat",
"day_vedio_traffics_topn_stat")

var connection: Connection = null
var pstmt: PreparedStatement = null

try {

connection = MySqlUtils.getConnection()
for (table <- tables) {
val deleteSQL = s"delete from $table where day=?"
pstmt = connection.prepareStatement(deleteSQL)
pstmt.setString(1, day)
pstmt.executeUpdate()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
MySqlUtils.release(connection, pstmt)
}

}

}

4)MySqlUtils.scala -> 获取、释放数据库连接

package log

import java.sql.{Connection, DriverManager, PreparedStatement}

/**
* MySQL操作工具类
*/
object MySqlUtils {

// 获取数据库连接
def getConnection() = {
DriverManager.getConnection("jdbc:mysql://localhost:3306/sparkSql_project?user=root&password=rootroot")
}

// 释放数据库连接资源
def release(connection: Connection, pstmt: PreparedStatement): Unit = {
try {
if (pstmt != null) {
pstmt.close()
}
} catch {
case e: Exception => e.printStackTrace()
} finally {
if (connection != null) {
connection.close()
}
}
}

def main(args: Array[String]): Unit = {
println(getConnection())
}
}

5)实体类

DayVedioAccessStat.scala

package log

/**
* 每天课程访问次数实体类
* @param day
* @param classId
* @param times
*/
case class DayVedioAccessStat(day: String, classId: Long, times: Long)

DayCityVideoAccessStat.scala

package log

case class DayCityVideoAccessStat(day:String,classId:Long,city:String,times:Long,timesRank:Int)

DayTrafficVideoAccessStat.scala

package log

case class DayTrafficVideoAccessStat(day:String,classId:Long,traffics:Long)
4 结果



阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐