您的位置:首页 > 其它

Caused by: kafka.common.OffsetOutOfRangeException

2017-04-20 16:01 405 查看
val req = new TopicMetadataRequest(topics.toList, 0)

    

    val getLeaderConsumer = new SimpleConsumer(brokers, 9092, 10000, 10000, "OffsetLookup") // 第一个参数是 kafka broker 的host,第二个是 port

    val res = getLeaderConsumer.send(req)

    val topicMetaOption = res.topicsMetadata.headOption

    val partitions = topicMetaOption match {

      case Some(tm) =>

        tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String] // 将结果转化为 partition -> leader 的映射关系

      case None =>

        Map[Int, String]()

    }

    val bks = partitions.map(pair => pair._2).toSet.mkString(":9092,") + ":9092"

    logger.info("-----------partitions:\n" + partitions)

    logger.info("-----------bks:\n" + bks)

    val kafkaParam = Map(

      "metadata.broker.list" -> bks,

      //"auto.offset.reset" -> "smallest", // default is largest  to consume

      "group.id" -> groupID);

   

    val topicDirs = new ZKGroupTopicDirs(groupID, topic) // 创建一个 ZKGroupTopicDirs 对象,对保存 第一个参数是 kafka broker 的host,第二个是 port

    val zkTopicPath = s"${topicDirs.consumerOffsetDir}" // 获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name

    // val wordDStream = KafkaSourceHelper.kafkaStream(ssc, brokers, new ZooKeeperOffsetsStore(zkHosts, zkPath), topic).cache() 

    val zkClient = new ZkClient(zkHosts, Integer.MAX_VALUE, 10000, ZKStringSerializer) // zookeeper 的host 和 ip,创建一个 client

    val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)

    logger.info("------zkHosts:" + zkHosts)

    var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置

    logger.info("------children:" + children)

    // 获取offset 创建kafkaStream: InputDStream 

    var kafkaStream: InputDStream[(String, String)] = null

    if (children > 0) { //如果保存过 offset,这里更好的做法,还应该和  kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误

      for (i <- 0 until children) {

        val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")

        val tp = TopicAndPartition(topic, i)

        val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))

        // Manager Offset:

        // this default use the smallest offset from where to consume ,if you want to manager 

        // offset ,another choice is manually commit offset by hand in zookeeper client or use java API 

        // Here SimpleConsumer constructor need every partition leader not fixed broker.

        val consumerMin = new SimpleConsumer( partitions(i), 9092, 10000, 10000, "getMinOffset") //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面

        val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets

        var nextOffset = partitionOffset.toLong

        // if offset is less than zk store ,use zk offset otherwise use the minimum offset get from each partition

        // 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择

        if (curOffsets.length > 0 && nextOffset < curOffsets.head) {

        // logger.info("----------curOffsets.head:" + curOffsets.head +"---nextOffset:" +nextOffset )

        nextOffset = curOffsets.head

        }

        fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的现象

        logger.info("get from zk topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")

        logger.info("----------fromOffsets:" + fromOffsets)

      }

      logger.info("------------------- createDirectStream from zk offset -------")

      //这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple

      val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())

      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)

    } else {

      logger.info("------------------- chilern <=0 createDirectStream -------")

      kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset

    }

     logger.warn("--------brokers:" + brokers)

    var offsetRanges = Array[OffsetRange]()

    val disDStream = kafkaStream.transform { rdd =>

      offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset

      rdd
    }

其他信息参阅 :http://blog.csdn.net/kk303/article/details/52767260

java版本offset 获取 :http://blog.csdn.net/rongyongfeikai2/article/details/50727661
http://blog.csdn.net/rongyongfeikai2/article/details/49784785
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: