您的位置:首页 > 其它

Sparak-Streaming基于Offset消费Kafka数据

2017-03-29 15:16 393 查看
Sparak-Streaming基于Offset消费Kafka数据

原文http://blog.csdn.net/kwu_ganymede/article/details/50930962

Sparak-Streaming基于Offset消费Kafka数据

1、官方提供消费kafka的数据实例

[java] view
plain copy

 





// Hold a reference to the current offset ranges, so it can be used downstream  

var offsetRanges = Array[OffsetRange]()  

  

directKafkaStream.transform { rdd =>  

  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  

  rdd  

}.map {  

          ...  

}.foreachRDD { rdd =>  

  for (o <- offsetRanges) {  

    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")  

  }  

  ...  

}  

官网链接:
http:// href="http://lib.csdn.net/base/spark" target=_blank>Spark.apache.org/docs/latest/streaming-kafka-integration.html
核心思想就是在,创建kafkastream同时获取偏移量,消费完的同时,执行更新操作。

2、自定义操作kafka数据的offset的manager

这里注意,需要把类,放在 org.apache.spark.streaming.kafka下面,与源码操作的包一致

[java] view
plain copy

 





package org.apache.spark.streaming.kafka  

  

import kafka.common.TopicAndPartition  

import kafka.message.MessageAndMetadata  

import kafka.serializer.Decoder  

import scala.reflect.ClassTag  

import org.apache.spark.SparkException  

import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset  

import org.apache.spark.rdd.RDD  

import org.apache.spark.streaming.dstream.InputDStream  

import org.apache.spark.streaming.StreamingContext  

  

class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {  

  

  private val kc = new KafkaCluster(kafkaParams)  

  

  private val flag = 1150 * 10000l  

  

  /** 

   * 创建数据流 

   * @param ssc 

   * @param kafkaParams 

   * @param topics 

   * @tparam K 

   * @tparam V 

   * @tparam KD 

   * @tparam VD 

   * @return 

   */  

  def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag](ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {  

    val groupId = kafkaParams.get("group.id").get  

    // 在zookeeper上读取offsets前先根据实际情况更新offsets    

    setOrUpdateOffsets(topics, groupId)  

  

    //从zookeeper上读取offset开始消费message    

    val messages = {  

      val partitionsE = kc.getPartitions(topics)  

      if (partitionsE.isLeft)  

        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")  

      val partitions = partitionsE.right.get  

      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)  

      if (consumerOffsetsE.isLeft)  

        throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")  

      val consumerOffsets = consumerOffsetsE.right.get  

      KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](  

        ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))  

    }  

    messages  

  }  

  

  /** 

   * 创建数据流前,根据实际消费情况更新消费offsets 

   * @param topics 

   * @param groupId 

   */  

  private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {  

    topics.foreach(topic => {  

      var hasConsumed = true  

      val partitionsE = kc.getPartitions(Set(topic))  

      if (partitionsE.isLeft)  

        throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")  

      val partitions = partitionsE.right.get  

      val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)  

      if (consumerOffsetsE.isLeft) hasConsumed = false  

      if (hasConsumed) { // 消费过  

        /** 

         * 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException, 

         * 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。 

         * 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小, 

         * 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时, 

         * 这时把consumerOffsets更新为earliestLeaderOffsets 

         */  

        val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)  

        if (earliestLeaderOffsetsE.isLeft)  

          throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")  

        val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get  

        val consumerOffsets = consumerOffsetsE.right.get  

  

        // 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets  

        var offsets: Map[TopicAndPartition, Long] = Map()  

        consumerOffsets.foreach({  

          case (tp, n) =>  

            val earliestLeaderOffset = earliestLeaderOffsets(tp).offset  

            if (n < earliestLeaderOffset) {  

              println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +  

                " offsets已经过时,更新为" + earliestLeaderOffset)  

              offsets += (tp -> earliestLeaderOffset)  

            }  

        })  

        if (!offsets.isEmpty) {  

          kc.setConsumerOffsets(groupId, offsets)  

        }  

      } else { // 没有消费过  

        val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)  

        var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null  

        if (reset == Some("smallest")) {  

          val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)  

          if (leaderOffsetsE.isLeft)  

            throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")  

          leaderOffsets = leaderOffsetsE.right.get  

        } else {  

          val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)  

          if (leaderOffsetsE.isLeft)  

            throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")  

          leaderOffsets = leaderOffsetsE.right.get  

        }  

        val offsets = leaderOffsets.map {  

          case (tp, offset) => (tp, offset.offset)  

        }  

        kc.setConsumerOffsets(groupId, offsets)  

      }  

    })  

  }  

  

  /** 

   * 更新zookeeper上的消费offsets 

   * 把当前的消费记录,写入zk 

   * 

   * @param rdd 

   */  

  def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {  

    val groupId = kafkaParams.get("group.id").get  

    val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  

  

    for (offsets <- offsetsList) {  

      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)  

      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))  

      if (o.isLeft) {  

        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")  

      }  

    }  

  }  

  

  /** 

   * 更新zookeeper上的消费offsets 

   * 把当前的消费记录的offset往前推 

   * 并写入zk 

   * 

   * @param rdd 

   * @param day 

   */  

  def updateZKOffsetsFromoffsetRanges(offsetRanges: Array[OffsetRange], day: Double): Unit = {  

    val groupId = kafkaParams.get("group.id").get  

  

    for (offsets <- offsetRanges) {  

      val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)  

  

      var offsetStreaming = 0l  

  

      println("offsets.untilOffset " + offsets.untilOffset)  

  

      /** 

       * 如果streaming挂掉,则从偏移量的前flag开始计算 

       * 由于在streaming里的window函数中进行了去重处理 

       * 因此不用担心数据重复的问题 

       */  

      if (offsets.untilOffset >= flag) {  

        offsetStreaming = offsets.untilOffset - (flag * day).toLong  

      } else {  

        offsetStreaming = 0  

      }  

  

      println("offsetStreaming " + offsetStreaming)  

  

      val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsetStreaming)))  

      if (o.isLeft) {  

        println(s"Error updating the offset to Kafka cluster: ${o.left.get}")  

      }  

    }  

  }  

}    

提供两个方法

1) 从哪里结束消费,再从那里开始继续

2) 从结束消费向前移动偏移量的操作,然后再重新消费

3、使用案例

[java] view
plain copy

 





val conf = new SparkConf().setAppName("NewsTopNRealRankOffset")//.setMaster("local[3]");  

  

conf.set("spark.streaming.blockInterval", "50ms");  

conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  

conf.set("spark.storage.memoryFraction", "0.4") //executor分配给缓存的内存比例,默认为0.6即60%,剩下40%为task运行的内存,实际上40%是偏小的  

conf.set("spark.locality.wait", "6000") //6000毫秒  

conf.set("spark.streaming.kafka.maxRatePerPartition", "35000") // 限制每秒钟从topic的每个partition最多消费的消息条数  

  

//shuffle优化  

conf.set("spark.shuffle.consolidateFiles", "true")  

conf.set("spark.reducer.maxSizeInFlight", "150m")  

conf.set("spark.shuffle.file.buffer", "128k")  

conf.set("spark.shuffle.io.maxRetries", "8")  

conf.set("spark.shuffle.io.retryWait", "6s")  

conf.set("spark.shuffle.memoryFraction", "0.3")  

  

val sc = new SparkContext(conf);  

val ssc = new StreamingContext(sc, Seconds(25));  

//缓存的数据  

ssc.remember(Minutes(60 * 24 * 2));  

  

val sqlContext = new HiveContext(sc);  

  

Logger.getLogger("org.apache.spark").setLevel(Level.WARN);  

Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.ERROR);  

  

//1.注册UDF  

val udf = UDFUtils();  

udf.registerUdf(sqlContext);  

  

//2.kafka数据处理  

val kafkaServiceOffset = KakfaServiceOffset();  

  

val brokers = "bdc46.hexun.com:9092,bdc53.hexun.com:9092,bdc54.hexun.com:9092";  

val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder",  

  "group.id" -> "tracklogPrdNewsTopN_TrackLogT", "auto.offset.reset" -> "largest"); //largest smallest  

  

val topics = Set("TrackLogT");  

  

//通过自定义的KafkaManager获取kafka数据源  

val km = new KafkaManager(kafkaParams);  

val kafkaStream = km.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics);  

var offsetRanges = Array[OffsetRange]()  

  

//创建kafkastream同时获取偏移量  

val kafkaStreamOffset = kafkaStream.transform { rdd =>  

  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges  

  rdd  

}  

  

val urlClickLogPairsDStream = kafkaServiceOffset.kafkaDStreamForNewsOffset(ssc, kafkaStreamOffset);  

  

//3.缓存hive中的数据  

val cacheUtils = CacheUtils();  

cacheUtils.cacheHiveData(sqlContext);  

  

//4.缓存窗口函数数据处理  

val urlClickCountsDStream = urlClickLogPairsDStream.reduceByKeyAndWindow(  

  (v1: Int, v2: Int) => {  

    v1 + v2  

  },  

  Seconds(3600 * 35),  

  Seconds(250));  

  

//5.处理业务逻辑  

urlClickCountsDStream.foreachRDD(urlClickCountsRDD => {  

  val urlClickCountRowRDD = urlClickCountsRDD.map(tuple => {  

    val datetime = tuple._1.split("\001")(0) + " " + tuple._1.split("\001")(1);  

    val cookieid = tuple._1.split("\001")(2);  

    val url = tuple._1.split("\001")(3);  

    val artId = tuple._1.split("\001")(4);  

    val click_count = tuple._2;  

    Row(datetime, cookieid, url, artId, click_count);  

  });  

  

  val structType = StructType(Array(  

    StructField("datetime", StringType, true),  

    StructField("cookieid", StringType, true),  

    StructField("url", StringType, true),  

    StructField("artId", StringType, true),  

    StructField("click_count", IntegerType, true)));  

  

  val categoryProductCountDF = sqlContext.createDataFrame(urlClickCountRowRDD, structType);  

  

  categoryProductCountDF.registerTempTable("url_click_log_day");  

  

  //消费完的同时,更新操作  

  km.updateZKOffsetsFromoffsetRanges(offsetRanges, 1);  

  

  cacuDayTopN(sqlContext);  

  cacuHourTopN(sqlContext);  

});  

  

//6.启动streaming任务  

ssc.start();  

ssc.awaitTermination();  

注意,设置"group.id" ,即为在kafka队列的偏移量标志在zookeeper中的名称。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: