Caused by: kafka.common.OffsetOutOfRangeException
2017-04-20 16:01
405 查看
val req = new TopicMetadataRequest(topics.toList, 0)
val getLeaderConsumer = new SimpleConsumer(brokers, 9092, 10000, 10000, "OffsetLookup") // 第一个参数是 kafka broker 的host,第二个是 port
val res = getLeaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String] // 将结果转化为 partition -> leader 的映射关系
case None =>
Map[Int, String]()
}
val bks = partitions.map(pair => pair._2).toSet.mkString(":9092,") + ":9092"
logger.info("-----------partitions:\n" + partitions)
logger.info("-----------bks:\n" + bks)
val kafkaParam = Map(
"metadata.broker.list" -> bks,
//"auto.offset.reset" -> "smallest", // default is largest to consume
"group.id" -> groupID);
val topicDirs = new ZKGroupTopicDirs(groupID, topic) // 创建一个 ZKGroupTopicDirs 对象,对保存 第一个参数是 kafka broker 的host,第二个是 port
val zkTopicPath = s"${topicDirs.consumerOffsetDir}" // 获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
// val wordDStream = KafkaSourceHelper.kafkaStream(ssc, brokers, new ZooKeeperOffsetsStore(zkHosts, zkPath), topic).cache()
val zkClient = new ZkClient(zkHosts, Integer.MAX_VALUE, 10000, ZKStringSerializer) // zookeeper 的host 和 ip,创建一个 client
val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
logger.info("------zkHosts:" + zkHosts)
var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
logger.info("------children:" + children)
// 获取offset 创建kafkaStream: InputDStream
var kafkaStream: InputDStream[(String, String)] = null
if (children > 0) { //如果保存过 offset,这里更好的做法,还应该和 kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
for (i <- 0 until children) {
val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
val tp = TopicAndPartition(topic, i)
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
// Manager Offset:
// this default use the smallest offset from where to consume ,if you want to manager
// offset ,another choice is manually commit offset by hand in zookeeper client or use java API
// Here SimpleConsumer constructor need every partition leader not fixed broker.
val consumerMin = new SimpleConsumer( partitions(i), 9092, 10000, 10000, "getMinOffset") //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
// if offset is less than zk store ,use zk offset otherwise use the minimum offset get from each partition
// 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
if (curOffsets.length > 0 && nextOffset < curOffsets.head) {
// logger.info("----------curOffsets.head:" + curOffsets.head +"---nextOffset:" +nextOffset )
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的现象
logger.info("get from zk topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
logger.info("----------fromOffsets:" + fromOffsets)
}
logger.info("------------------- createDirectStream from zk offset -------")
//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)
} else {
logger.info("------------------- chilern <=0 createDirectStream -------")
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
}
logger.warn("--------brokers:" + brokers)
var offsetRanges = Array[OffsetRange]()
val disDStream = kafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
rdd
}
其他信息参阅 :http://blog.csdn.net/kk303/article/details/52767260
java版本offset 获取 :http://blog.csdn.net/rongyongfeikai2/article/details/50727661
http://blog.csdn.net/rongyongfeikai2/article/details/49784785
val getLeaderConsumer = new SimpleConsumer(brokers, 9092, 10000, 10000, "OffsetLookup") // 第一个参数是 kafka broker 的host,第二个是 port
val res = getLeaderConsumer.send(req)
val topicMetaOption = res.topicsMetadata.headOption
val partitions = topicMetaOption match {
case Some(tm) =>
tm.partitionsMetadata.map(pm => (pm.partitionId, pm.leader.get.host)).toMap[Int, String] // 将结果转化为 partition -> leader 的映射关系
case None =>
Map[Int, String]()
}
val bks = partitions.map(pair => pair._2).toSet.mkString(":9092,") + ":9092"
logger.info("-----------partitions:\n" + partitions)
logger.info("-----------bks:\n" + bks)
val kafkaParam = Map(
"metadata.broker.list" -> bks,
//"auto.offset.reset" -> "smallest", // default is largest to consume
"group.id" -> groupID);
val topicDirs = new ZKGroupTopicDirs(groupID, topic) // 创建一个 ZKGroupTopicDirs 对象,对保存 第一个参数是 kafka broker 的host,第二个是 port
val zkTopicPath = s"${topicDirs.consumerOffsetDir}" // 获取 zookeeper 中的路径,这里会变成 /consumers/test_spark_streaming_group/offsets/topic_name
// val wordDStream = KafkaSourceHelper.kafkaStream(ssc, brokers, new ZooKeeperOffsetsStore(zkHosts, zkPath), topic).cache()
val zkClient = new ZkClient(zkHosts, Integer.MAX_VALUE, 10000, ZKStringSerializer) // zookeeper 的host 和 ip,创建一个 client
val children = zkClient.countChildren(s"${topicDirs.consumerOffsetDir}") //查询该路径下是否字节点(默认有字节点为我们自己保存不同 partition 时生成的)
logger.info("------zkHosts:" + zkHosts)
var fromOffsets: Map[TopicAndPartition, Long] = Map() //如果 zookeeper 中有保存 offset,我们会利用这个 offset 作为 kafkaStream 的起始位置
logger.info("------children:" + children)
// 获取offset 创建kafkaStream: InputDStream
var kafkaStream: InputDStream[(String, String)] = null
if (children > 0) { //如果保存过 offset,这里更好的做法,还应该和 kafka 上最小的 offset 做对比,不然会报 OutOfRange 的错误
for (i <- 0 until children) {
val partitionOffset = zkClient.readData[String](s"${topicDirs.consumerOffsetDir}/${i}")
val tp = TopicAndPartition(topic, i)
val requestMin = OffsetRequest(Map(tp -> PartitionOffsetRequestInfo(OffsetRequest.EarliestTime, 1)))
// Manager Offset:
// this default use the smallest offset from where to consume ,if you want to manager
// offset ,another choice is manually commit offset by hand in zookeeper client or use java API
// Here SimpleConsumer constructor need every partition leader not fixed broker.
val consumerMin = new SimpleConsumer( partitions(i), 9092, 10000, 10000, "getMinOffset") //注意这里的 broker_host,因为这里会导致查询不到,解决方法在下面
val curOffsets = consumerMin.getOffsetsBefore(requestMin).partitionErrorAndOffsets(tp).offsets
var nextOffset = partitionOffset.toLong
// if offset is less than zk store ,use zk offset otherwise use the minimum offset get from each partition
// 通过比较从 kafka 上该 partition 的最小 offset 和 zk 上保存的 offset,进行选择
if (curOffsets.length > 0 && nextOffset < curOffsets.head) {
// logger.info("----------curOffsets.head:" + curOffsets.head +"---nextOffset:" +nextOffset )
nextOffset = curOffsets.head
}
fromOffsets += (tp -> nextOffset) //设置正确的 offset,这里将 nextOffset 设置为 0(0 只是一个特殊值),可以观察到 offset 过期的现象
logger.info("get from zk topic[" + topic + "] partition[" + i + "] offset[" + partitionOffset + "] @@@@@@")
logger.info("----------fromOffsets:" + fromOffsets)
}
logger.info("------------------- createDirectStream from zk offset -------")
//这个会将 kafka 的消息进行 transform,最终 kafak 的数据都会变成 (topic_name, message) 这样的 tuple
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParam, fromOffsets, messageHandler)
} else {
logger.info("------------------- chilern <=0 createDirectStream -------")
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics) //如果未保存,根据 kafkaParam 的配置使用最新或者最旧的 offset
}
logger.warn("--------brokers:" + brokers)
var offsetRanges = Array[OffsetRange]()
val disDStream = kafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //得到该 rdd 对应 kafka 的消息的 offset
rdd
}
其他信息参阅 :http://blog.csdn.net/kk303/article/details/52767260
java版本offset 获取 :http://blog.csdn.net/rongyongfeikai2/article/details/50727661
http://blog.csdn.net/rongyongfeikai2/article/details/49784785
相关文章推荐
- java.lang.NoSuchMethodError和kafka.common.OffsetOutOfRangeException(Spark)
- Kafka kafka.common.OffsetOutOfRangeException 问题处理
- kafka.common.OffsetOutOfRangeException 问题处理
- Spark streaming kafka OffsetOutOfRangeException 异常分析与解决
- Kafka 副本OffsetOutOfRangeException
- kafka的OffsetOutOfRangeException
- spark streaming kafka OffsetOutOfRangeException 异常分析与解决
- kafka-storm spout拉取数据问题offset out of range
- kafka-storm spout拉取数据问题offset out of range(todo)
- Caused by: java.lang.RuntimeException: by java.lang.OutOfMemoryError: PermGen space(tomcat 启动时提示内存溢出)
- Kafka--Caused by: kafka.common.ConsumerRebalanceFailedException
- kafka的OffsetOutOfRangeError
- Caused by: java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0
- java.lang.IllegalArgumentException: timeout arguments out of range异常
- Android 多点触控错误处理(java.lang.IllegalArgumentException: pointerIndex out of range)
- C# B/S程序中使用DropDownList出现System.ArgumentOutOfRangeException: “DropDownList1”有一个无效 SelectedValue,因为它不在项目列表中的解决方法
- System.IndexOutOfRangeException: 无法找到表 0解决办法
- System.ArgumentOutOfRangeException: 年、月和日参数描述无法表示的 DateTime。
- Caused by: java.lang.OutOfMemoryError: Java heap space
- System.ArgumentOutOfRangeException: 容量超出了最大容量