您的位置:首页 > 其它

Kafka到SparkStreaming的两种方式

2018-01-28 22:42 316 查看
转自:Kafka到SparkStreaming的两种方式

1.通过Receiver,这个是个高级API,不需要自己去维护offset

主要代码:
package cn.lijie

import org.apache.log4j.Level
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

/**
* User: lijie
* Date: 2017/8/4
* Time: 15:27
*/
object Kafka2SparkStreaming01 {

def myFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(x => {
(x._1, x._2.sum + x._3.getOrElse(0))
})
}

def main(args: Array[String]): Unit = {
MyLog.setLogLeavel(Level.ERROR)
val conf = new SparkConf().setAppName("k2s").setMaster("local[2]")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\checkpointks01")
val zk = "192.168.80.123:2181"
val groupId = "lijieGrp"
//key表示topic  value表示处理线程数量
val topics = Map("lijietest001" -> 3)
//参数ssc zk groupId topics storageLevel,注意这里的topics是不可变map 习惯用可变map发现一直报错最后才发现需要用不可变map
//type Map[A, +B] = scala.collection.immutable.Map[A, B]
val ds = KafkaUtils.createStream(ssc, zk, groupId, topics)
val res = ds.map((_._2)).flatMap(_.split(" ")).map((_, 1)).updateStateByKey(myFunc, new HashPartitioner(sc.defaultParallelism), true)
res.print()
ssc.start()
ssc.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

启动kafka
[root@lijie kafka_2.11-0.10.1.1]# /usr/java/kafka_2.11-0.10.1.1/bin/kafka-server-start.sh -daemon ./config/server.properties
1

创建topic
[root@lijie kafka_2.11-0.10.1.1]# /usr/java/kafka_2.11-0.10.1.1/bin/kafka-topics.sh --create --zookeeper 192.168.80.123:2181 --replication-factor 1 --partitions 3 --topic lijietest001
Created topic "lijietest001".
1
2

模拟生产者
[root@lijie kafka_2.11-0.10.1.1]# /usr/java/kafka_2.11-0.10.1.1/bin/kafka-console-producer.sh --broker-list 192.168.80.123:9092 --topic lijietest001
lijie lijie lijie hehe haha hehe
hehe haha lijie hehe heihei
1
2
3

结果



2.通过直连的方式直接连接kafka的broker并且自己管理offset

参考自:http://blog.csdn.net/ligt0610/article/details/47311771



其中KafkaCluster这个类是直接拷贝的源码中的org.apache.spark.streaming.kafka包下面的KafkaCluster类的内容,将里面的private[spark] 都去掉, 然后KafkaManager继承KafkaCluster

其中KafkaCluster如下:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cn.lijie.kafka

import java.util.Properties

import kafka.api._
import kafka.common.{ErrorMapping, OffsetAndMetadata, OffsetMetadataAndError, TopicAndPartition}
import kafka.consumer.{ConsumerConfig, SimpleConsumer}
import org.apache.spark.SparkException

import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import scala.util.control.NonFatal

/**
* Convenience methods for interacting with a Kafka cluster.
*
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
*                    configuration parameters</a>.
*                    Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
*                    NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/
class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {

import KafkaCluster.{Err, LeaderOffset, SimpleConsumerConfig}

// ConsumerConfig isn't serializable
@transient var _config: SimpleConsumerConfig = null

def config: SimpleConsumerConfig = this.synchronized {
if (_config == null) {
_config = SimpleConsumerConfig(kafkaParams)
}
_config
}

def connect(host: String, port: Int): SimpleConsumer =
new SimpleConsumer(host, port, config.socketTimeoutMs,
config.socketReceiveBufferBytes, config.clientId)

def connectLeader(topic: String, partition: Int): Either[Err, SimpleConsumer] =
findLeader(topic, partition).right.map(hp => connect(hp._1, hp._2))

// Metadata api
// scalastyle:off
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-MetadataAPI // scalastyle:on

def findLeader(topic: String, partition: Int): Either[Err, (String, Int)] = {
val req = TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
0, config.clientId, Seq(topic))
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
resp.topicsMetadata.find(_.topic == topic).flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.find(_.partitionId == partition)
}.foreach { pm: PartitionMetadata =>
pm.leader.foreach { leader =>
return Right((leader.host, leader.port))
}
}
}
Left(errs)
}

def findLeaders(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, (String, Int)]] = {
val topics = topicAndPartitions.map(_.topic)
val response = getPartitionMetadata(topics).right
val answer = response.flatMap { tms: Set[TopicMetadata] =>
val leaderMap = tms.flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.flatMap { pm: PartitionMetadata =>
val tp = TopicAndPartition(tm.topic, pm.partitionId)
if (topicAndPartitions(tp)) {
pm.leader.map { l =>
tp -> (l.host -> l.port)
}
} else {
None
}
}
}.toMap

if (leaderMap.keys.size == topicAndPartitions.size) {
Right(leaderMap)
} else {
val missing = topicAndPartitions.diff(leaderMap.keySet)
val err = new Err
err.append(new SparkException(s"Couldn't find leaders for ${missing}"))
Left(err)
}
}
answer
}

def getPartitions(topics: Set[String]): Either[Err, Set[TopicAndPartition]] = {
getPartitionMetadata(topics).right.map { r =>
r.flatMap { tm: TopicMetadata =>
tm.partitionsMetadata.map { pm: PartitionMetadata =>
TopicAndPartition(tm.topic, pm.partitionId)
}
}
}
}

def getPartitionMetadata(topics: Set[String]): Either[Err, Set[TopicMetadata]] = {
val req = TopicMetadataRequest(
TopicMetadataRequest.CurrentVersion, 0, config.clientId, topics.toSeq)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp: TopicMetadataResponse = consumer.send(req)
val respErrs = resp.topicsMetadata.filter(m => m.errorCode != ErrorMapping.NoError)

if (respErrs.isEmpty) {
return Right(resp.topicsMetadata.toSet)
} else {
respErrs.foreach { m =>
val cause = ErrorMapping.exceptionFor(m.errorCode)
val msg = s"Error getting partition metadata for '${m.topic}'. Does the topic exist?"
errs.append(new SparkException(msg, cause))
}
}
}
Left(errs)
}

// Leader offset api
// scalastyle:off
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI // scalastyle:on

def getLatestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)

def getEarliestLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)

def getLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition],
before: Long
): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
r.map { kv =>
// mapValues isnt serializable, see SI-7005
kv._1 -> kv._2.head
}
}
}

def flip[K, V](m: Map[K, V]): Map[V, Seq[K]] =
m.groupBy(_._2).map { kv =>
kv._1 -> kv._2.keys.toSeq
}

def getLeaderOffsets(
topicAndPartitions: Set[TopicAndPartition],
before: Long,
maxNumOffsets: Int
): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>
val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
val leaders = leaderToTp.keys
var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
val errs = new Err
withBrokers(leaders, errs) { consumer =>
val partitionsToGetOffsets: Seq[TopicAndPartition] =
leaderToTp((consumer.host, consumer.port))
val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
}.toMap
val req = OffsetRequest(reqMap)
val resp = consumer.getOffsetsBefore(req)
val respMap = resp.partitionErrorAndOffsets
partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
if (por.error == ErrorMapping.NoError) {
if (por.offsets.nonEmpty) {
result += tp -> por.offsets.map { off =>
LeaderOffset(consumer.host, consumer.port, off)
}
} else {
errs.append(new SparkException(
s"Empty offsets for ${tp}, is ${before} before log beginning?"))
}
} else {
errs.append(ErrorMapping.exceptionFor(por.error))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't find leader offsets for ${missing}"))
Left(errs)
}
}

// Consumer offset api
// scalastyle:off
// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI // scalastyle:on

// this 0 here indicates api version, in this case the original ZK backed api.
def defaultConsumerApiVersion: Short = 0

/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, Long]] =
getConsumerOffsets(groupId, topicAndPartitions, defaultConsumerApiVersion)

def getConsumerOffsets(
groupId: String,
topicAndPartitions: Set[TopicAndPartition],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Long]] = {
getConsumerOffsetMetadata(groupId, topicAndPartitions, consumerApiVersion).right.map { r =>
r.map { kv =>
kv._1 -> kv._2.offset
}
}
}

/** Requires Kafka >= 0.8.1.1 */
def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition]
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] =
getConsumerOffsetMetadata(groupId, topicAndPartitions, defaultConsumerApiVersion)

def getConsumerOffsetMetadata(
groupId: String,
topicAndPartitions: Set[TopicAndPartition],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, OffsetMetadataAndError]] = {
var result = Map[TopicAndPartition, OffsetMetadataAndError]()
val req = OffsetFetchRequest(groupId, topicAndPartitions.toSeq, consumerApiVersion)
val errs = new Err
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.fetchOffsets(req)
val respMap = resp.requestInfo
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { ome: OffsetMetadataAndError =>
if (ome.error == ErrorMapping.NoError) {
result += tp -> ome
} else {
errs.append(ErrorMapping.exceptionFor(ome.error))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't find consumer offsets for ${missing}"))
Left(errs)
}

/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long]
): Either[Err, Map[TopicAndPartition, Short]] =
setConsumerOffsets(groupId, offsets, defaultConsumerApiVersion)

def setConsumerOffsets(
groupId: String,
offsets: Map[TopicAndPartition, Long],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
val meta = offsets.map { kv =>
kv._1 -> OffsetAndMetadata(kv._2)
}
setConsumerOffsetMetadata(groupId, meta, consumerApiVersion)
}

/** Requires Kafka >= 0.8.1.1 */
def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata]
): Either[Err, Map[TopicAndPartition, Short]] =
setConsumerOffsetMetadata(groupId, metadata, defaultConsumerApiVersion)

def setConsumerOffsetMetadata(
groupId: String,
metadata: Map[TopicAndPartition, OffsetAndMetadata],
consumerApiVersion: Short
): Either[Err, Map[TopicAndPartition, Short]] = {
var result = Map[TopicAndPartition, Short]()
val req = OffsetCommitRequest(groupId, metadata, consumerApiVersion)
val errs = new Err
val topicAndPartitions = metadata.keySet
withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>
val resp = consumer.commitOffsets(req)
val respMap = resp.commitStatus
val needed = topicAndPartitions.diff(result.keySet)
needed.foreach { tp: TopicAndPartition =>
respMap.get(tp).foreach { err: Short =>
if (err == ErrorMapping.NoError) {
result += tp -> err
} else {
errs.append(ErrorMapping.exceptionFor(err))
}
}
}
if (result.keys.size == topicAndPartitions.size) {
return Right(result)
}
}
val missing = topicAndPartitions.diff(result.keySet)
errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))
Left(errs)
}

// Try a call against potentially multiple brokers, accumulating errors
def withBrokers(brokers: Iterable[(String, Int)], errs: Err)
(fn: SimpleConsumer => Any): Unit = {
brokers.foreach { hp =>
var consumer: SimpleConsumer = null
try {
consumer = connect(hp._1, hp._2)
fn(consumer)
} catch {
case NonFatal(e) =>
errs.append(e)
} finally {
if (consumer != null) {
consumer.close()
}
}
}
}
}

object KafkaCluster {
type Err = ArrayBuffer[Throwable]

/** If the result is right, return it, otherwise throw SparkException */
def checkErrors[T](result: Either[Err, T]): T = {
result.fold(
errs => throw new SparkException(errs.mkString("\n")),
ok => ok
)
}

case class LeaderOffset(host: String, port: Int, offset: Long)

/**
* High-level kafka consumers connect to ZK.  ConsumerConfig assumes this use case.
* Simple consumers connect directly to brokers, but need many of the same configs.
* This subclass won't warn about missing ZK params, or presence of broker params.
*/

class SimpleConsumerConfig(brokers: String, originalProps: Properties)
extends ConsumerConfig(originalProps) {
val seedBrokers: Array[(String, Int)] = brokers.split(",").map { hp =>
val hpa = hp.split(":")
if (hpa.size == 1) {
throw new SparkException(s"Broker not in the correct format of <host>:<port> [$brokers]")
}
(hpa(0), hpa(1).toInt)
}
}

object SimpleConsumerConfig {
/**
* Make a consumer config without requiring group.id or zookeeper.connect,
* since communicating with brokers also needs common settings such as timeout
*/
def apply(kafkaParams: Map[String, String]): SimpleConsumerConfig = {
// These keys are from other pre-existing kafka configs for specifying brokers, accept either
val brokers = kafkaParams.get("metadata.broker.list")
.orElse(kafkaParams.get("bootstrap.servers"))
.getOrElse(throw new SparkException(
"Must specify metadata.broker.list or bootstrap.servers"))

val props = new Properties()
kafkaParams.foreach { case (key, value) =>
// prevent warnings on parameters ConsumerConfig doesn't know about
if (key != "metadata.broker.list" && key != "bootstrap.servers") {
props.put(key, value)
}
}

Seq("zookeeper.connect", "group.id").foreach { s =>
if (!props.containsKey(s)) {
props.setProperty(s, "")
}
}

new SimpleConsumerConfig(brokers, props)
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425

KafkaManager如下:
package cn.lijie.kafka

import cn.lijie.kafka.KafkaCluster.LeaderOffset
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.Decoder
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}

import scala.reflect.ClassTag

/**
* Created by knowpigxia on 15-8-5.
*/
class KafkaManager(val kafkaParams: Map[String, String]) extends Serializable {

private val kc = new KafkaCluster(kafkaParams)

/**
* 创建数据流
*
* @param ssc
* @param kafkaParams
* @param topics
* @tparam K
* @tparam V
* @tparam KD
* @tparam VD
* @return
*/
def createDirectStream[K: ClassTag, V: ClassTag, KD <: Decoder[K] : ClassTag, VD <: Decoder[V] : ClassTag](
ssc: StreamingContext, kafkaParams: Map[String, String], topics: Set[String]): InputDStream[(K, V)] = {
val groupId = kafkaParams.get("group.id").get
// 在zookeeper上读取offsets前先根据实际情况更新offsets
setOrUpdateOffsets(topics, groupId)

//从zookeeper上读取offset开始消费message
val messages = {
val partitionsE = kc.getPartitions(topics)
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft)
throw new SparkException(s"get kafka consumer offsets failed: ${consumerOffsetsE.left.get}")
val consumerOffsets = consumerOffsetsE.right.get
KafkaUtils.createDirectStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, consumerOffsets, (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message))
}
messages
}

/**
* 创建数据流前,根据实际消费情况更新消费offsets
*
* @param topics
* @param groupId
*/
private def setOrUpdateOffsets(topics: Set[String], groupId: String): Unit = {
topics.foreach(topic => {
var hasConsumed = true
val partitionsE = kc.getPartitions(Set(topic))
if (partitionsE.isLeft)
throw new SparkException(s"get kafka partition failed: ${partitionsE.left.get}")
val partitions = partitionsE.right.get
val consumerOffsetsE = kc.getConsumerOffsets(groupId, partitions)
if (consumerOffsetsE.isLeft) hasConsumed = false
if (hasConsumed) {
// 消费过
/**
* 如果streaming程序执行的时候出现kafka.common.OffsetOutOfRangeException,
* 说明zk上保存的offsets已经过时了,即kafka的定时清理策略已经将包含该offsets的文件删除。
* 针对这种情况,只要判断一下zk上的consumerOffsets和earliestLeaderOffsets的大小,
* 如果consumerOffsets比earliestLeaderOffsets还小的话,说明consumerOffsets已过时,
* 这时把consumerOffsets更新为earliestLeaderOffsets
*/
val earliestLeaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (earliestLeaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${earliestLeaderOffsetsE.left.get}")
val earliestLeaderOffsets = earliestLeaderOffsetsE.right.get
val consumerOffsets = consumerOffsetsE.right.get

// 可能只是存在部分分区consumerOffsets过时,所以只更新过时分区的consumerOffsets为earliestLeaderOffsets
var offsets: Map[TopicAndPartition, Long] = Map()
consumerOffsets.foreach({ case (tp, n) =>
val earliestLeaderOffset = earliestLeaderOffsets(tp).offset
if (n < earliestLeaderOffset) {
println("consumer group:" + groupId + ",topic:" + tp.topic + ",partition:" + tp.partition +
" offsets已经过时,更新为" + earliestLeaderOffset)
offsets += (tp -> earliestLeaderOffset)
}
})
if (!offsets.isEmpty) {
kc.setConsumerOffsets(groupId, offsets)
}
} else {
// 没有消费过
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
var leaderOffsets: Map[TopicAndPartition, LeaderOffset] = null
if (reset == Some("smallest")) {
val leaderOffsetsE = kc.getEarliestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get earliest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
} else {
val leaderOffsetsE = kc.getLatestLeaderOffsets(partitions)
if (leaderOffsetsE.isLeft)
throw new SparkException(s"get latest leader offsets failed: ${leaderOffsetsE.left.get}")
leaderOffsets = leaderOffsetsE.right.get
}
val offsets = leaderOffsets.map {
case (tp, offset) => (tp, offset.offset)
}
kc.setConsumerOffsets(groupId, offsets)
}
})
}

/**
* 更新zookeeper上的消费offsets
*
* @param rdd
*/
def updateZKOffsets(rdd: RDD[(String, String)]): Unit = {
val groupId = kafkaParams.get("group.id").get
val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

for (offsets <- offsetsList) {
val topicAndPartition = TopicAndPartition(offsets.topic, offsets.partition)
val o = kc.setConsumerOffsets(groupId, Map((topicAndPartition, offsets.untilOffset)))
if (o.isLeft) {
println(s"Error updating the offset to Kafka cluster: ${o.left.get}")
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139

Kafka2SparkStreaming02如下:
import cn.lijie.MyLog
import cn.lijie.kafka.KafkaManager
import kafka.serializer.StringDecoder
import org.apache.log4j.Level
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}

object Kafka2SparkStreaming02 {

def myFunc = (it: Iterator[(String, Seq[Int], Option[Int])]) => {
it.map(x => {
(x._1, x._2.sum + x._3.getOrElse(0))
})
}

def main(args: Array[String]) {
MyLog.setLogLeavel(Level.WARN)
val brokers = "192.168.80.123:9092"
val topics = "lijietest001"
val groupId = "groupId"
val sparkConf = new SparkConf().setAppName("Direct").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val ssc = new StreamingContext(sc, Seconds(5))
sc.setCheckpointDir("C:\\Users\\Administrator\\Desktop\\checkpointks02")
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String](
"metadata.broker.list" -> brokers,
"group.id" -> groupId,
"auto.offset.reset" -> "smallest"
)
val km = new KafkaManager(kafkaParams)
val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val ds = messages.map((_._2)).flatMap(_.split(" ")).map((_, 1)).updateStateByKey(myFunc, new HashPartitioner(sc.defaultParallelism), true)
//保存offset
messages.foreachRDD(rdd => {
km.updateZKOffsets(rdd)
})
ds.print()
ssc.start()
ssc.awaitTermination()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

启动程序并且在kafka生产者端生产数据,结果如下:

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: