您的位置:首页 > 其它

spark streaming 实现kafka的createDirectStream方式!!不坑

2017-03-09 16:58 405 查看
网上搜了很多spark streaming 用createDirectStream方式消费kafka的,信息是有很多,但是照着做都遇到了坑,最大的坑就是KafkaCluster是private的!根本就new不了,折腾了一会终于搞定了,也不复杂

1.

新建一个包org.apache.spark.streaming.kafka,就是在你的project建一个这个目录的包,在这个包下面的类里,就可以new出KafkaCluster了!

2.

new出KafkaCluster,后面的就都是小问题了,但是网上给的例子都太复杂,又或者太简单,我们只需要实现重启接着上次的offset消费,消费完保存offset这么简单的功能就行了。

下面是我根据网上找的精简的代码

class KafkaManager(val kafkaParams: HashMap[String, String]) extends Serializable {

private val kc = new KafkaCluster(kafkaParams)

/**
* 创建数据流
*
* @param ssc
* @param kafkaParams
* @param topics
* @tparam K
* @tparam V
* @tparam KD
* @tparam VD
* @return
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](

ssc: StreamingContext, kafkaParams: HashMap[String, String], topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
//从zookeeper上读取offset开始消费message
//    val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (!consumerOffsetsE.isLeft) {
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
} else {
val p = kafkaParams + ("auto.offset.reset" -> "largest")
KafkaUtils.createDirectStream(ssc, p, topics)
}
//    }
//      messages
}

/**
* 更新zookeeper上的消费offsets
*
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, HashMap((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}


只有两个方法 cteate 和 update

create方法比较简单,当保存的offset在kafka上不存在时会出异常,因为我们集群kafka保留时间是7天,也就是说我程序停了7天再启动才会报错,都停7天了,肯定是使用新的groupid了,也没多大影响,想实现比较完善的话,可以百度下,有复杂的实现。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark kafka