SparkStreaming和Kafka整合
Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个单独的对应Spark Streaming包可用。请为您的代理选择正确的包和所需的特性;请注意,0.8集成与后来的0.9和0.10代理兼容,但是0.10集成与前面的代理不兼容。
maven依赖:
[code] groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.11 version = 2.3.1
SparkStreaming和Kafka整合也分为两种模式(后述为0.8版本):
1: Receiver-based Approach
[code]package com.ruoze.spark.SparkStreaming_integration import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.KafkaUtils object ssk { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("ssk") val ssc = new StreamingContext(sparkConf, Seconds(10)) val topics = "ruoze_kafka_streaming".split(",").map((_,1)).toMap val lines = KafkaUtils.createStream(ssc, "hadoop000:2181","ruoze_group",topics) //业务处理 lines.map(_._2).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
注意点:
该模式下Kafka中的主题分区与SparkStreaming中产生的RDDS的分区不相关。因此,增加KafkaUtils.createStream()中特定于主题的分区的数量只会增加线程的数量,它不会增加数据处理过程中Spark的并行性。
可以使用不同的组和主题创建多个Kafka输入DStream,以便使用多个接收器并行接收数据。
如果已经启用了带有HDFS等复制文件系统的提前写入日志,则接收的数据已经在日志中复制。储存级别为
StorageLevel.MEMORY_AND_DISK_SER
2: Direct Approach (No Receivers)
[code]package com.ruozedata.spark.streaming.day03 import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object StreamingKafkaApp02 { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local[2]").setAppName("StreamingKafkaApp02") val ssc = new StreamingContext(sparkConf, Seconds(10)) val kafkaParams = Map[String, String]( "metadata.broker.list"->"ruozehadoop000:9092" ) val topics = "ruoze_kafka_streaming".split(",").toSet val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) //业务处理代码 lines.map(_._2).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } }
该模式的优点:
监护并行:使用directStream,Spark Streaming将创建与需要使用的Kafka分区一样多的RDD分区,这些分区将并行地从Kafka读取数据。因此,卡夫卡和RDD分区之间有一对一的映射,这更易于理解和优化。
更具效率性:在第一种方法中实现零数据丢失需要将数据WAL存储。这实际上是低效的,因为数据被有效地复制了两次——一次由Kafka复制,第二次由Write Ahead Log复制。第二种方法消除了问题,因为没有接收器,因此不需要提前写入日志。只要你有足够的Kafka保留,消息可以从卡夫卡中恢复。
语义(仅消费一次):第一种方法使用Kafka的高级API来存储Zookeeper消耗的偏移量。这是传统上从Kafka消费数据的方法。虽然这种方法(与WAL相结合)可以确保零数据丢失,但是在某些故障下,一些记录可能被消耗两次。这是因为Spark Streaming接收到的数据和由Zookeeper跟踪的偏移量不一致。因此,在第二种方法中,我们使用SimpleKafkaAPI而不使用Zookeeper,偏移量由SparkStreaming在CheckPoint跟踪,这消除了SparkStreaming和Zookeeper/Kafka之间的不一致性,因此每个记录都被SparkStreaming有效地接收一次。尽管失败了,为了精确实现输出结果的一次语义,将数据保存到外部数据存储的输出操作必须是幂等的,或者以原子事务的形式保存结果和偏移量。
阅读更多- sparkstreaming整合kafka参数设置,message偏移量写入redis
- 整合Kafka到Spark Streaming——代码示例和挑战
- kafka flume sparkStreaming整合
- spark streaming 整合kafka
- 整合Kafka到Spark Streaming——代码示例和挑战
- <转>整合Kafka到Spark Streaming——代码示例和挑战
- SparkStreaming整合Kafka
- 整合Kafka到Spark Streaming——代码示例和挑战
- Kafka+Spark Streaming+Redis实时计算整合实践
- 整合Kafka到Spark Streaming——代码示例和挑战
- Kafka+Spark Streaming+Redis实时计算整合实践
- Spark Streaming + Kafka整合指南
- flume+kafka+spark streaming整合
- Kafka+Spark Streaming+Redis实时计算整合实践
- Flume+Kafka+SparkStreaming整合
- zookeeper+kafka安装以及kafka+spark streaming 的简单整合
- kafka生产者消费者API 与sparkStreaming 整合(scala版)
- Maven+Eclipse+SparkStreaming+Kafka整合
- Kafka+Spark Streaming+Redis实时计算整合实践
- Kafka+Spark Streaming+Redis实时计算整合实践