您的位置:首页 > 数据库 > MySQL

SparkStreaming聚合类操作写入MySQL

2020-08-28 16:40 1846 查看

mysql数据库连接池(使用的数据源是阿里巴巴的

德鲁伊

<!--导入阿里巴巴的德鲁伊-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.23</version>
</dependency>
package cn._51doit.spark.utils

import java.sql.Connection
import java.util.Properties
import com.alibaba.druid.pool.DruidDataSourceFactory
import javax.sql.DataSource

object DruidConnectionPool {

private val props = new Properties()

props.put("driverClassName", "com.mysql.jdbc.Driver")
props.put("url", "jdbc:mysql://localhost:3306/doit_16?characterEncoding=UTF-8")
props.put("username", "root")
props.put("password", "123456")

private val source: DataSource = DruidDataSourceFactory.createDataSource(props)

def getConnection: Connection = {
source.getConnection
}
}

读取偏移量的工具类

(读取MySQL中存放的偏移量)

package cn._51doit.spark.utils

import java.sql.{Connection, PreparedStatement, ResultSet}
import org.apache.kafka.common.TopicPartition
import scala.collection.mutable

object OffsetUtils {

/**
* 从MySQL中查询历史偏移量
* @param appId
* @param groupId
* @return  Map
*/
def queryHistoryOffsetFromMySQL(appId: String, groupId: String): Map[TopicPartition, Long] = {

val offsetMap = new mutable.HashMap[TopicPartition, Long]()

//查询MySQL
var connection: Connection = null
var statement: PreparedStatement = null
var resultSet: ResultSet = null
try {
connection = DruidConnectionPool.getConnection
statement = connection.prepareStatement("SELECT topic_partition, offset FROM t_kafka_offset WHERE app_gid = ?")
statement.setString(1, appId + "_" + groupId)
val resultSet: ResultSet = statement.executeQuery()
//变量结果集
while (resultSet.next()) {
val topic_partition = resultSet.getString(1)
val offset = resultSet.getLong(2)
val fields = topic_partition.split("_")
val topic = fields(0)
val partition = fields(1).toInt
val topicPartition = new TopicPartition(topic, partition)
offsetMap(topicPartition) = offset
}
} catch {
case e: Exception => {
throw new RuntimeException("查询历史偏移量出现异常")
}
} finally {
if(resultSet != null) {
resultSet.close()
}
if(statement != null) {
statement.close()
}
if(connection != null) {
connection.close()
}
}
offsetMap.toMap
}

}

SparkStreaming程序

(查询历史偏移量、处理来自kafka的数据、将处理结果及偏移量写入到mysl中)

package cn._51doit.spark.day15

import java.sql.{Connection, PreparedStatement}

import cn._51doit.spark.utils.{DruidConnectionPool, OffsetUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

//这是一个聚合类型的运算,将计算好的结果和偏移量在一个事务中都保存到MySQL
object KafkaWordCountStoreDataAndOffsetInMySQL {

def main(args: Array[String]): Unit = {

val appId = args(0)
val groupId = args(1)

//实时计算创建StreamingContext,(StreamingContext是对SparkContext的增强包装,里面持有者SparkContext的引用)
val conf = new SparkConf().setAppName(appId).setMaster("local[*]")

val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")

//Kafka相关的参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean) //让消费者不用自动提交偏移量
)

val topics = Array("wordcount")

//读取历史偏移量(在Driver端查询历史偏移量)
val offset: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appId, groupId)

//sparkStreaming跟Kafka整合,使用的是官方推荐的直连方式,使用Kafka底层的消费API,效率更高
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, //传入StreamingContext
LocationStrategies.PreferConsistent, //位置策略
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offset) //消费策略(订阅的topic,kafka参数,历史偏移量)
)

kafkaDStream.foreachRDD(rdd => {
if(!rdd.isEmpty()) {
//获取偏移量信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

//实现聚合的功能
val lines = rdd.map(_.value())
val reduced: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//将聚合后的数据收集到Driver端
val results: Array[(String, Int)] = reduced.collect() //只适用聚合类的运算

//获取一个数据库连接(适用数据库连接池)
var connection: Connection = null
var pstm1: PreparedStatement = null
var pstm2: PreparedStatement = null
try {
connection = DruidConnectionPool.getConnection
//开启事务
connection.setAutoCommit(false)

//将计算好的聚合数据写入到MySQL, t_wordcount的表,有两个字段,word 主键,counts long
pstm1 = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?")
//设置参数
for (t <- results) {
pstm1.setString(1, t._1) //设置单词
pstm1.setLong(2, t._2)
pstm1.setLong(3, t._2)
pstm1.executeUpdate()
//pstm1.addBatch()
}
//pstm1.executeBatch()

pstm2 = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?")
//将偏移量写入到数据库中 t_kafka_offset : 分析(topic、分区、组ID、结束偏移量)
for (range <- offsetRanges) {

val topic = range.topic
val partition = range.partition
val offset = range.untilOffset

//设置参数
pstm2.setString(1, appId + "_" + groupId)
pstm2.setString(2, topic + "_" + partition)
pstm2.setLong(3, offset)
pstm2.setLong(4, offset)

//执行update
pstm2.executeUpdate()
}

//提交事务
connection.commit()

} catch {
case e: Exception => {
e.printStackTrace()
//回滚
connection.rollback()
//停掉sparkstreaming
ssc.stop(true)
}
} finally {
//释放资源
if(pstm2 != null) {
pstm2.close()
}
if(pstm1 != null) {
pstm1.close()
}
if(connection != null) {
connection.close()
}
}
}
})

//开启
ssc.start()
//让程序一直运行,将Driver挂起
ssc.awaitTermination()

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: