SparkStreaming聚合类操作写入Redis数据库
2020-08-29 17:43
1561 查看
引入操作redis的依赖
<!-- 导入jedis的依赖 --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.3.0</version> </dependency>
redis的数据库连接池
JedisConnectionPool
package cn._51doit.spark.utils import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} object JedisConnectionPool { val config = new JedisPoolConfig() config.setMaxTotal(5) config.setMaxIdle(3) config.setTestOnBorrow(true) val pool = new JedisPool(config, "linux", 6379, 10000, "123456") def getConnection: Jedis = { pool.getResource } }
查询偏移量的工具类
从redis数据库中查询
package cn._51doit.utils import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import java.util import org.apache.kafka.common.TopicPartition import scala.collection.mutable /**获取偏移量的工具类(查询数据库,获取偏移量) * @Auther Zhang * @Date 2020/8/29 */ object OffsetUtils { /** * 从Redis中查询历史偏移量 * @param appId * @param groupId * @return */ def queryHistoryOffsetFromRedis(appId: String, groupId: String): Map[TopicPartition, Long] = { val offsetMap = new mutable.HashMap[TopicPartition, Long] val jedis = JedisConnectionPool.getConnection jedis.select(7) val mp: util.Map[String, String] = jedis.hgetAll(appId + "_" + groupId) //导入隐射转换 import scala.collection.JavaConverters._ for (tp <- mp.asScala) { val topic_partition = tp._1 val offset = tp._2.toLong val fields = topic_partition.split("_") val topicPartition = new TopicPartition(fields(0), fields(1).toInt) offsetMap(topicPartition) = offset } offsetMap.toMap } }
Streaming程序(
查询redis数据库偏移量、写数据到redis)
package cn._51doit.day16 import cn._51doit.utils.{JedisConnectionPool, OffsetUtils} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.{Jedis, Pipeline} /** 将计算好的结果和偏移量在一个事务中都保存到Redis(聚合类运算) * @Auther Zhang * @Date 2020/8/29 */ object KafkaWordCountStoreDataAndOffsetInRedis { def main(args: Array[String]): Unit = { val appId = "app1" val groupId = "g1" //实时计算创建StreamingContext(StreamingContext是对SparkContext的增强包装,里面持有者SparkContext的引用) val conf = new SparkConf().setAppName(appId).setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(5)) ssc.sparkContext.setLogLevel("WARN") //Kafka相关参数 val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) //让消费者不用自动提交偏移量 ) //指定消费的topic val topics = Array("wordcount") //读取历史偏移量(在Driver端查询历史偏移量) val offset: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromRedis(appId, groupId) //Spark和kafka整合(官方推荐的直连方式,效率更高) val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream( ssc, //传入StreamingContext LocationStrategies.PreferConsistent, //位置策略 ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offset) //消费策略(订阅的topic,kafka参数,历史偏移量) ) kafkaDStream.foreachRDD(rdd => { //获取偏移量信息 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //实现聚合功能 val lines = rdd.map(_.value()) val reduced = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) //将聚合后的数据收集到driver端 val results: Array[(String, Int)] = reduced.collect() //只适用聚合类的运算 var jedis: Jedis = null var pipeline: Pipeline = null try { //操作redis数据库 //获取一个jedis连接 jedis = JedisConnectionPool.getConnection jedis.select(7) //开启一个pipline (multi类似于MySQL中的事务) pipeline = jedis.pipelined() pipeline.multi() //写入计算好的结果 for (tp <- results) { //upsert pipeline.hincrBy("WORD_COUNT", tp._1, tp._2) } //写入偏移量 for (range <- offsetRanges) { val topic = range.topic val partition = range.partition val offset = range.untilOffset pipeline.hset(appId + "_" + groupId, topic + "_" + partition, offset.toString) } //提交事务 pipeline.sync() pipeline.exec() } catch { case e: Exception => { e.printStackTrace() pipeline.discard() //回滚 ssc.stop(true) //停掉当前的任务 } } finally { if (pipeline != null) { pipeline.close() } if (jedis != null) { jedis.close() } } }) //开启 ssc.start() //让程序一直运行,将Driver挂起 ssc.awaitTermination() } }
相关文章推荐
- Spark Streaming foreachRDD以及foreachPartition 操作数据库连接写入数据
- sparkstreaming整合kafka参数设置,message偏移量写入redis
- Kafka+Spark Streaming+Redis Spark streaming实时读取kafka中数据完成wordcount并写入redis中
- 第96讲 通过Spark Streaming的foreachRDD把处理后的数据写入外部存储系统(数据库)中
- 第109讲: Spark Streaming电商广告点击综合案例动态黑名单基于数据库MySQL的真正操作代码实战
- sparkstreaming整合kafka参数设置,message偏移量写入redis
- Kafka+Spark Streaming+Redis实时计算整合实践
- java操作redis数据库四:map集合
- Spark操作dataFrame进行写入mysql,自定义sql的方式
- sqlplus使用update操作完数据,不要忘记commit,不然并没有写入到数据库中去
- spark RDD算子(六)之键值对聚合操作reduceByKey,foldByKey,排序操作sortByKey
- Spark Streaming的窗口操作
- Kafka-SparkStreaming-Redis
- Redis源码解析:09redis数据库实现(键值对操作、键超时功能、键空间通知)
- redis实时缓存数据库简单操作命令
- 数据库缓存-Redis 部署与基本操作
- redis数据库--操作对象
- sparkstreaming对接kafka将数据批量插入数据库(java版本)
- redis数据库简单的命令操作
- Spark Streaming操作笔记