您的位置:首页 > 数据库 > Redis

SparkStreaming聚合类操作写入Redis数据库

2020-08-29 17:43 1561 查看

引入操作redis的依赖

<!-- 导入jedis的依赖 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.3.0</version>
</dependency>

redis的数据库连接池

JedisConnectionPool

package cn._51doit.spark.utils

import redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}

object JedisConnectionPool {

val config = new JedisPoolConfig()
config.setMaxTotal(5)
config.setMaxIdle(3)
config.setTestOnBorrow(true)

val pool = new JedisPool(config, "linux", 6379, 10000, "123456")

def getConnection: Jedis = {
pool.getResource
}

}

查询偏移量的工具类

从redis数据库中查询

package cn._51doit.utils

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}
import java.util
import org.apache.kafka.common.TopicPartition
import scala.collection.mutable

/**获取偏移量的工具类(查询数据库,获取偏移量)
* @Auther Zhang
* @Date 2020/8/29
*/
object OffsetUtils {

/**
* 从Redis中查询历史偏移量
* @param appId
* @param groupId
* @return
*/
def queryHistoryOffsetFromRedis(appId: String, groupId: String): Map[TopicPartition, Long] = {

val offsetMap = new mutable.HashMap[TopicPartition, Long]

val jedis = JedisConnectionPool.getConnection
jedis.select(7)
val mp: util.Map[String, String] = jedis.hgetAll(appId + "_" + groupId)
//导入隐射转换
import scala.collection.JavaConverters._
for (tp <- mp.asScala) {
val topic_partition = tp._1
val offset = tp._2.toLong
val fields = topic_partition.split("_")
val topicPartition = new TopicPartition(fields(0), fields(1).toInt)

offsetMap(topicPartition) = offset
}
offsetMap.toMap
}
}

Streaming程序(

查询redis数据库偏移量、写数据到redis
)

package cn._51doit.day16

import cn._51doit.utils.{JedisConnectionPool, OffsetUtils}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.{Jedis, Pipeline}

/** 将计算好的结果和偏移量在一个事务中都保存到Redis(聚合类运算)
* @Auther Zhang
* @Date 2020/8/29
*/
object KafkaWordCountStoreDataAndOffsetInRedis {
def main(args: Array[String]): Unit = {

val appId = "app1"
val groupId = "g1"

//实时计算创建StreamingContext(StreamingContext是对SparkContext的增强包装,里面持有者SparkContext的引用)
val conf = new SparkConf().setAppName(appId).setMaster("local[*]")
val ssc = new StreamingContext(conf, Seconds(5))
ssc.sparkContext.setLogLevel("WARN")

//Kafka相关参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "linux01:9092,linux02:9092,linux03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> groupId,
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean) //让消费者不用自动提交偏移量
)

//指定消费的topic
val topics = Array("wordcount")

//读取历史偏移量(在Driver端查询历史偏移量)
val offset: Map[TopicPartition, Long] = OffsetUtils.queryHistoryOffsetFromRedis(appId, groupId)

//Spark和kafka整合(官方推荐的直连方式,效率更高)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
ssc, //传入StreamingContext
LocationStrategies.PreferConsistent, //位置策略
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,offset) //消费策略(订阅的topic,kafka参数,历史偏移量)
)

kafkaDStream.foreachRDD(rdd => {
//获取偏移量信息
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

//实现聚合功能
val lines = rdd.map(_.value())
val reduced = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
//将聚合后的数据收集到driver端
val results: Array[(String, Int)] = reduced.collect() //只适用聚合类的运算

var jedis: Jedis = null
var pipeline: Pipeline = null

try {
//操作redis数据库
//获取一个jedis连接
jedis = JedisConnectionPool.getConnection
jedis.select(7)

//开启一个pipline (multi类似于MySQL中的事务)
pipeline = jedis.pipelined()
pipeline.multi()

//写入计算好的结果
for (tp <- results) {
//upsert
pipeline.hincrBy("WORD_COUNT", tp._1, tp._2)
}
//写入偏移量
for (range <- offsetRanges) {
val topic = range.topic
val partition = range.partition
val offset = range.untilOffset
pipeline.hset(appId + "_" + groupId, topic + "_" + partition, offset.toString)
}

//提交事务
pipeline.sync()
pipeline.exec()
} catch {
case e: Exception => {
e.printStackTrace()
pipeline.discard() //回滚
ssc.stop(true) //停掉当前的任务
}
} finally {
if (pipeline != null) {
pipeline.close()
}
if (jedis != null) {
jedis.close()
}
}
})

//开启
ssc.start()
//让程序一直运行,将Driver挂起
ssc.awaitTermination()

}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: