SparkStreaming与kafka通过直连方式读取数据
2018-11-10 22:51
711 查看
1、Spark-Streaming的receive的方式和直连方式有什么区别:
Receive接收固定时间间隔的数据(放在内存中),达到固定的时间才进行处理,效率低并且容易丢失数据(Kafka高级API),自动维护偏移量
Direct直连方式,相当于直接连接到Kafka的分区上,相当于Kafka底层API,效率很高,需要自己维护偏移量,读一条处理一条(把指定的时间间隔当做一个批次)。
2、直接连到kafka的分区上读取,一个RDD的分区对应一个kafka的分区,一个分区会生成一个Task,这个Task不会消失,会一直盯着这个分区,不停的读取数据。
3、在用Reciver方式,消费消费者时,不用指定broker,在直连的方式,需要指定broker,因为这种方式相当于直接练到Kafka的分区中,需要broker
4、zookeeper的作用,zookeeper中记录的是,以组名和topic名作为唯一标识,不同的组可以读取同一topic中的数据,记偏移量是从前面记录
package day01.Dirctor import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Duration, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} object DrictorDemoV4 { def main(args: Array[String]): Unit = { val group = "groupTT"//指定组名 val conf = new SparkConf().setAppName("kafkaWC").setMaster("local[2]") val sc = new SparkContext(conf) //创建SparkStreaming,并设置时间间隔 val ssc = new StreamingContext(conf,Duration(5000)) //指定消费的topic名字 val topic = "tt2" //指定kafka的broker地址,Streaming的Task直连到kafka的分区上,用底层的API,效率更高 val brokerList = "hadoop01:9092,hadoop02:9092,hadoop03:9092" //指定zk地址,后期更新消费的偏移量时使用(以后可以Redis、MySQL) val zkQuorum = "hadoop01:2181,hadoop02:2181,hadoop03:2181" //创建DStream时使用topic名字的集合,SparkStreaming可以同时消费多少topic val topics:Set[String] = Set(topic) //创建一个ZKGroupTopicDirs对象,其实就是指定往zk中写入数据的目录,用于保存偏移量 val topicDirs = new ZKGroupTopicDirs(group,topic) //获取zookeeper中的路径"groupTT/offsets/tt01" val zkTopicPath:String = s"${topicDirs.consumerOffsetDir}" //准备kafka参数 val kafkaParams = Map( "metadata.broker.list"->brokerList, "group.id"->group, "auto.offset.reset"->kafka.api.OffsetRequest.SmallestTimeString ) //zookeeper的host和ip,创建一个Client,用于更新偏移量 //是zookeeper的一个客户端,可以从zk中读取,偏移量的数据,并跟新偏移量 val zkClient: ZkClient = new ZkClient(zkQuorum) //查询该路径下是否字节点(默认有字节点为我们自己保存不同Partition生成的) // /consumers/组名 27209 /offsets/topic名/分区名/偏移量, 可以zkClient.sh 插询 val children = zkClient.countChildren(zkTopicPath) //创建一个InputDStream, 要是var,因为不去定是不是以前读过,要先判断,再赋值 //key 是 kafka的Key,默认不设置是null,value是读取的内容 var kafkaStream:InputDStream [(String,String)] = null //如果zookeeper中保存offset,我们会利用这个Offset作为kafkaStream的读取位置 var fromOffsets:Map[TopicAndPartition,Long] = Map() //如果保存过Offset,以前读取过 if(children >0){ for (i<- 0 until children){ //zkClient根据文件位置读取偏移量( /consumers/组名/offsets/topic名/分区名/偏移量,) val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/${i}") val tp: TopicAndPartition = TopicAndPartition(topic,i) //将不同 Partition对应的Offset增加到fromOffset中( // fromOffsets += (tp-> partitionOffset.toLong) //这个会将kafka的消息进行transform,最终的kafka的数据会变成kafka的key,message)这样的Tuple val messageHandler = (mam:MessageAndMetadata[String,String])=>(mam.key(),mam.message()) //通过KafkaUtils创建直连的DStream,fromOffset参数的作用是按照之间计算好的偏移量继续读取 //[String,String,StringDecoder,StringDecoder,(String,String)] // key value key的解码, value的解码 kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,(String,String)]( ssc,kafkaParams,fromOffsets,messageHandler ) } }else{ //从头开始读,之前没有读取过 kafkaStream = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics) } var offsetRanges = Array[OffsetRange]() //从kafka中读取消息,DStream的Transform方法,可以将当前批次RDD取出来来 //该transform方法计算获取当前批次RDD,然后将RDD的偏移量取出来,然后将RDD返回DStream val transformed: DStream[(String, String)] = kafkaStream.transform(rdd => { //得到该rdd对应的kafka的消息的offset //该RDD是个kafkaRDD,可以获取偏移量的范围 offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }) val messages: DStream[String] = transformed.map((_._2)) //一次迭代DStream中的RDD messages.foreachRDD(rdd=>{ //对RDD进行操作,触发Action rdd.foreachPartition(partition=>{ partition.foreach(x=>{ println(x) }) }) }) for(off <- offsetRanges){ //获取zk 中记录偏移量的目录, // /consumers/组名/offsets/topic名/分区名 val zkPath = s"${topicDirs.consumerOffsetDir}/${off.partition}" //更新偏移量 ZkUtils.updatePersistentPath(zkClient,zkPath,off.untilOffset.toString) } ssc.start() ssc.awaitTermination() } }阅读更多
相关文章推荐
- Spark Streaming通过直连的方式消费Kafka中的数据
- SparkStreaming通过Kafka获取数据(Receiver方式)
- Spark Streaming场景应用-Kafka数据读取方式
- SparkStreaming直连方式读取kafka数据,使用MySQL保存偏移量
- SparkStreaming采用直连方式(Direct Approach)获取Kafka数据的研究心得
- Spark Streaming场景应用-Kafka数据读取方式
- spark streaming接kafka数据方式汇总
- spark streaming读取kafka数据,记录offset
- Spark Streaming消费Kafka Direct方式数据零丢失实现
- Spark Streaming结合 Kafka 两种不同的数据接收方式比较
- sparkstreaming读取kafka的两种方式
- Spark Streaming kafka实现数据零丢失的几种方式
- kafka SparkStreaming读取数据笔记
- Spark Streaming 读取Kafka数据写入Elasticsearch
- spark读取kafka数据(两种方式比较及flume配置文件)
- SparkStreaming读取Kafka数据
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式
- Spark Streaming kafka实现数据零丢失的几种方式