SparkStreaming聚合类操作写入MySQL
2020-08-28 16:40
1846 查看
mysql数据库连接池(使用的数据源是阿里巴巴的
德鲁伊)
<!--导入阿里巴巴的德鲁伊--> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.23</version> </dependency>
package cn._51doit.spark.utils import java.sql.Connection import java.util.Properties import com.alibaba.druid.pool.DruidDataSourceFactory import javax.sql.DataSource object DruidConnectionPool { private val props = new Properties() props.put("driverClassName", "com.mysql.jdbc.Driver") props.put("url", "jdbc:mysql://localhost:3306/doit_16?characterEncoding=UTF-8") props.put("username", "root") props.put("password", "123456") private val source: DataSource = DruidDataSourceFactory.createDataSource(props) def getConnection: Connection = { source.getConnection } }
读取偏移量的工具类
(读取MySQL中存放的偏移量)
package cn._51doit.spark.utils import java.sql.{Connection, PreparedStatement, ResultSet} import org.apache.kafka.common.TopicPartition import scala.collection.mutable object OffsetUtils { /** * 从MySQL中查询历史偏移量 * @param appId * @param groupId * @return Map */ def queryHistoryOffsetFromMySQL(appId: String, groupId: String): Map[TopicPartition, Long] = { val offsetMap = new mutable.HashMap[TopicPartition, Long]() //查询MySQL var connection: Connection = null var statement: PreparedStatement = null var resultSet: ResultSet = null try { connection = DruidConnectionPool.getConnection statement = connection.prepareStatement("SELECT topic_partition, offset FROM t_kafka_offset WHERE app_gid = ?") statement.setString(1, appId + "_" + groupId) val resultSet: ResultSet = statement.executeQuery() //变量结果集 while (resultSet.next()) { val topic_partition = resultSet.getString(1) val offset = resultSet.getLong(2) val fields = topic_partition.split("_") val topic = fields(0) val partition = fields(1).toInt val topicPartition = new TopicPartition(topic, partition) offsetMap(topicPartition) = offset } } catch { case e: Exception => { throw new RuntimeException("查询历史偏移量出现异常") } } finally { if(resultSet != null) { resultSet.close() } if(statement != null) { statement.close() } if(connection != null) { connection.close() } } offsetMap.toMap } }
SparkStreaming程序
(查询历史偏移量、处理来自kafka的数据、将处理结果及偏移量写入到mysl中)
package cn._51doit.spark.day15 import java.sql.{Connection, PreparedStatement} import cn._51doit.spark.utils.{DruidConnectionPool, OffsetUtils} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} //这是一个聚合类型的运算,将计算好的结果和偏移量在一个事务中都保存到MySQL object KafkaWordCountStoreDataAndOffsetInMySQL { def main(args: Array[String]): Unit = { val appId = args(0) val groupId = args(1) //实时计算创建StreamingContext,(StreamingContext是对SparkContext的增强包装,里面持有者SparkContext的引用) val conf = new SparkConf().setAppName(appId).setMaster("local[*]") val ssc: StreamingContext = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") //Kafka相关的参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) //让消费者不用自动提交偏移量 ) val topics = Array("wordcount") //读取历史偏移量(在Driver端查询历史偏移量) val offset: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromMySQL(appId, groupId) //sparkStreaming跟Kafka整合,使用的是官方推荐的直连方式,使用Kafka底层的消费API,效率更高 val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, //传入StreamingContext LocationStrategies.PreferConsistent, //位置策略 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, offset) //消费策略(订阅的topic,kafka参数,历史偏移量) ) kafkaDStream.foreachRDD(rdd => { if(!rdd.isEmpty()) { //获取偏移量信息 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //实现聚合的功能 val lines = rdd.map(_.value()) val reduced: RDD[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //将聚合后的数据收集到Driver端 val results: Array[(String, Int)] = reduced.collect() //只适用聚合类的运算 //获取一个数据库连接(适用数据库连接池) var connection: Connection = null var pstm1: PreparedStatement = null var pstm2: PreparedStatement = null try { connection = DruidConnectionPool.getConnection //开启事务 connection.setAutoCommit(false) //将计算好的聚合数据写入到MySQL, t_wordcount的表,有两个字段,word 主键,counts long pstm1 = connection.prepareStatement("INSERT INTO t_wordcount (word, counts) VALUES (?, ?) ON DUPLICATE KEY UPDATE counts = counts + ?") //设置参数 for (t <- results) { pstm1.setString(1, t._1) //设置单词 pstm1.setLong(2, t._2) pstm1.setLong(3, t._2) pstm1.executeUpdate() //pstm1.addBatch() } //pstm1.executeBatch() pstm2 = connection.prepareStatement("INSERT INTO t_kafka_offset (app_gid, topic_partition, offset) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset = ?") //将偏移量写入到数据库中 t_kafka_offset : 分析(topic、分区、组ID、结束偏移量) for (range <- offsetRanges) { val topic = range.topic val partition = range.partition val offset = range.untilOffset //设置参数 pstm2.setString(1, appId + "_" + groupId) pstm2.setString(2, topic + "_" + partition) pstm2.setLong(3, offset) pstm2.setLong(4, offset) //执行update pstm2.executeUpdate() } //提交事务 connection.commit() } catch { case e: Exception => { e.printStackTrace() //回滚 connection.rollback() //停掉sparkstreaming ssc.stop(true) } } finally { //释放资源 if(pstm2 != null) { pstm2.close() } if(pstm1 != null) { pstm1.close() } if(connection != null) { connection.close() } } } }) //开启 ssc.start() //让程序一直运行,将Driver挂起 ssc.awaitTermination() } }
相关文章推荐
- SparkStreaming聚合类操作写入Redis数据库
- 第109讲: Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
- sparkstreaming整合kafka参数设置,message偏移量写入mysql
- 2020年3月5日——sparkstreaming把wordcount写入到mysql中
- Spark Streaming foreachRDD以及foreachPartition 操作数据库连接写入数据
- Spark操作dataFrame进行写入mysql,自定义sql的方式
- sparkstreaming整合kafka参数设置,message偏移量写入mysql
- SparkStreaming之HDFS操作
- Spark如何写入HBase/Redis/MySQL/Kafka
- kafka + spark streaming 实时读取计算 nginx 日志,存储结果到 mongodb/mysql
- 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统中
- spark写入mysql的几种方法,针对不同场景
- Kafka+Spark Streaming+Redis Spark streaming实时读取kafka中数据完成wordcount并写入redis中
- Spark将计算结果写入到Mysql中
- Spark Streaming介绍,DStream,DStream相关操作(来自学习资料)
- AutoIT操作Mysql,写入数据,不支持中文的解决办法。
- Spark将计算结果写入到Mysql中
- spark 操作hbase及mysql
- SparkStreaming直连方式读取kafka数据,使用MySQL保存偏移量
- sparkstreaming整合kafka参数设置,message偏移量写入redis