您的位置:首页 > 其它

第15课:Spark Streaming源码解读之No Receivers彻底思考

2016-05-30 23:31 357 查看
背景:
目前No Receivers在企业中使用的越来越多。No Receivers具有更强的控制度,语义一致性。No Receivers是我们操作数据来源自然方式,操作数据来源使用一个封装器,且是RDD类型的。所以Spark Streaming就产生了自定义RDD –> KafkaRDD.源码分析:
1. KafkaRDD:
**
* A batch-oriented interface for consuming from Kafka.
* Starting and ending offsets are specified in advance,
* so that you can control exactly-once semantics.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" to be set
* with Kafka broker(s) specified in host1:port1,host2:port2 form.
* @param offsetRanges offset ranges that define the Kafka data belonging to this RDD
* @param messageHandler function for translating each message into the desired type
*/private[kafka]
class KafkaRDD[
K: ClassTag,
V: ClassTag,
U <: Decoder[_]: ClassTag,//因为传输的时候需要编码,所以需要Decoder
T <: Decoder[_]: ClassTag,
R: ClassTag] private[spark] (
sc: SparkContext,
kafkaParams: Map[String, String],
val offsetRanges: Array[OffsetRange],//offsetRanges指定数据范围
leaders: Map[TopicAndPartition, (String, Int)],
messageHandler: MessageAndMetadata[K, V] => R
) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
2.  HasOffsetRanges: RDD是a list of partitions.
/**
* Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the
* offset ranges in RDDs generated by the direct Kafka DStream (see
* [[KafkaUtils.createDirectStream()]]).
* {{{
//foreachRDD就可以获取当前batch Duractions中的产生的RDD的分区的数据。
*   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>
*      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
*      ...
*   }
* }}}
*/

trait HasOffsetRanges {
def offsetRanges: Array[OffsetRange]
}
3.  Count: 此时的范围指的是多少条数据。OffsetRange指定了什么topic下的什么partiiton下的读取数据范围。
/**
* Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class
* can be created with `OffsetRange.create()`.
* @param topic Kafka topic name
* @param partition Kafka partition id
* @param fromOffset Inclusive starting offset
* @param untilOffset Exclusive ending offset
*/final class OffsetRange private(
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long) extends Serializable {
import OffsetRange.OffsetRangeTuple//offset消息的偏移量/** Number of messages this OffsetRange refers to */def count(): Long = untilOffset - fromOffset
在KafkaRDD中getPartition

override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
}.toArray
}
5.  KafkaRDDPartition:  相当于Kafka数据来源的指针。或者相当于引用。清晰的说明数据在哪里。
/** @param topic kafka topic name
* @param partition kafka partition id
* @param fromOffset inclusive starting offset
* @param untilOffset exclusive ending offset
* @param host preferred kafka host, i.e. the leader at the time the rdd was created
* @param port preferred kafka host's port
*/private[kafka]class KafkaRDDPartition(
val index: Int,
val topic: String,
val partition: Int,
val fromOffset: Long,
val untilOffset: Long,
val host: String, //就是读取数据来源的host,port同样是
val port: Int
) extends Partition {//KafkaRDD的一个partition只能属于一个topic
/** Number of messages this partition refers to */
def count(): Long = untilOffset - fromOffset
}
6.  KafkaRDD中的compute计算每个数据分片
override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
val part = thePart.asInstanceOf[KafkaRDDPartition]  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
//如果fromOffset和untilOffset相等表面该消息是空  if (part.fromOffset == part.untilOffset) {
log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
s"skipping ${part.topic} ${part.partition}")
Iterator.empty
} else {
new KafkaRDDIterator(part, context)
}
}
7.  KafkaRDDIterator: 获取数据
private class KafkaRDDIterator(
part: KafkaRDDPartition,
context: TaskContext) extends NextIterator[R] {

context.addTaskCompletionListener{ context => closeIfNeeded() }

log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
s"offsets ${part.fromOffset} -> ${part.untilOffset}")

val kc = new KafkaCluster(kafkaParams)
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])    .newInstance(kc.config.props)    .asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])    .newInstance(kc.config.props)    .asInstanceOf[Decoder[V]]
val consumer = connectLeader
var requestOffset = part.fromOffset
var iter: Iterator[MessageAndOffset] = null

// The idea is to use the provided preferred host, except on task retry atttempts,
// to minimize number of kafka metadata requests
private def connectLeader: SimpleConsumer = {
if (context.attemptNumber > 0) {
kc.connectLeader(part.topic, part.partition).fold(
errs => throw new SparkException(
s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
errs.mkString("\n")),
consumer => consumer
)
} else {
kc.connect(part.host, part.port)
}
}

private def handleFetchErr(resp: FetchResponse) {
if (resp.hasError) {
val err = resp.errorCode(part.topic, part.partition)
if (err == ErrorMapping.LeaderNotAvailableCode ||
err == ErrorMapping.NotLeaderForPartitionCode) {
log.error(s"Lost leader for topic ${part.topic} partition ${part.partition}, " +
s" sleeping for ${kc.config.refreshLeaderBackoffMs}ms")
Thread.sleep(kc.config.refreshLeaderBackoffMs)
}
// Let normal rdd retry sort out reconnect attempts
throw ErrorMapping.exceptionFor(err)
}
}

private def fetchBatch: Iterator[MessageAndOffset] = {
val req = new FetchRequestBuilder()      .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)      .build()
val resp = consumer.fetch(req)
handleFetchErr(resp)
// kafka may return a batch that starts before the requested offset
resp.messageSet(part.topic, part.partition)      .iterator
.dropWhile(_.offset < requestOffset)
}

override def close(): Unit = {
if (consumer != null) {
consumer.close()
}
}

override def getNext(): R = {
if (iter == null || !iter.hasNext) {
iter = fetchBatch
}
if (!iter.hasNext) {
assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
finished = true
null.asInstanceOf[R]
} else {
val item = iter.next()
if (item.offset >= part.untilOffset) {
assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
finished = true
null.asInstanceOf[R]
} else {
requestOffset = item.nextOffset
messageHandler(new MessageAndMetadata(
part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
}
}
}
}
8.  KafkaCluster:封装了与Kafka集群交互
/**
* Convenience methods for interacting with a Kafka cluster.
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
*   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
*   NOT zookeeper servers, specified in host1:port1,host2:port2 form
*/private[spark]class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {
举例:一般使用KafkaUtils的createDirectStream读取数据。
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
9.  在使用Kafka的Direct方式操作数据的时候,通过使用11
/**
* Create an input stream that directly pulls messages from Kafka Brokers
* without using any receiver. This stream can guarantee that each message
* from Kafka is included in transformations exactly once (see points below).
*
* Points to note:
*  - No receivers: This stream does not use any receiver. It directly queries Kafka
*  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
*    by the stream itself. For interoperability with Kafka monitoring tools that depend on
*    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
*    You can access the offsets used in each batch from the generated RDDs (see
*    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
*  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
*    in the [[StreamingContext]]. The information on consumed offset can be
*    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
*  - End-to-end semantics: This stream ensures that every records is effectively received and
*    transformed exactly once, but gives no guarantees on whether the transformed data are
*    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
*    that the output operation is idempotent, or use transactions to output records atomically.
*    See the programming guide for more details.
*
* @param ssc StreamingContext object
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
*   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
*   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
*   host1:port1,host2:port2 form.
*   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
*   to determine where the stream starts (defaults to "largest")
* @param topics Names of the topics to consume
* @tparam K type of Kafka message key
* @tparam V type of Kafka message value
* @tparam KD type of Kafka message key decoder
* @tparam VD type of Kafka message value decoder
* @return DStream of (Kafka message key, Kafka message value)
*/
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {  val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)  val kc = new KafkaCluster(kafkaParams)//kc 是Kafka的集群//fromOffsets获取具体的偏移量
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)  new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}
10. getFromOffsets:11
private[kafka] def getFromOffsets(
kc: KafkaCluster,
kafkaParams: Map[String, String],
topics: Set[String]
): Map[TopicAndPartition, Long] = {
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
val result = for {
topicPartitions <- kc.getPartitions(topics).right
leaderOffsets <- (if (reset == Some("smallest")) {
kc.getEarliestLeaderOffsets(topicPartitions)
} else {
kc.getLatestLeaderOffsets(topicPartitions)
}).right
} yield {
leaderOffsets.map { case (tp, lo) =>
//创建Kafka Direct DStream的时候,他会和Kafka集群进行交互,来获得他的partition和offset的信息。
//实质上是通过DirectKafkaInputDStream
(tp, lo.offset)
}
}
KafkaCluster.checkErrors(result)
}
11. 不同topic的partition对应着我们生成的KafkaRDD的partition.11
/**
*  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
* each given Kafka topic/partition corresponds to an RDD partition.
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
*  of messages
* per second that each '''partition''' will accept.
* Starting offsets are specified in advance,
* and this DStream is not responsible for committing offsets,
* so that you can control exactly-once semantics.
* For an easy interface to Kafka-managed offsets,
*  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
* configuration parameters</a>.
*   Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
*   NOT zookeeper servers, specified in host1:port1,host2:port2 form.
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
*  starting point of the stream
* @param messageHandler function for translating each message into the desired type
*/
private[streaming]class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {
12. DirectKafkaInputDStream源码如下:每次compute之后会产生KafkaRDD
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {// untilOffsets需要获取的数据区间。这样你就知道你要计算多少条数据了。val untilOffsets = clamp(latestLeaderOffsets(maxRetries))//创建RDD实例,所以这里DirectKafkaInputDStream和RDD是一一对应的。
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)  // Report the record number and metadata of this batch interval to InputInfoTracker.
val offsetRanges = currentOffsets.map { case (tp, fo) =>    val uo = untilOffsets(tp)
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
}  val description = offsetRanges.filter { offsetRange =>    // Don't display empty ranges.
offsetRange.fromOffset != offsetRange.untilOffset
}.map { offsetRange =>
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
}.mkString("\n")  // Copy offsetRanges to immutable.List to prevent from being modified by the user
val metadata = Map(    "offsets" -> offsetRanges.toList,
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)  val inputInfo = StreamInputInfo(id, rdd.count, metadata)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
Some(rdd)
}
总体流程如下:


采用Direct的好处?
1. Direct方式没有数据缓存,因此不会出现内存溢出。但是如果采用的Receiver的话就需要缓存。
2. 如果采用Receiver的方式的华,不方便做分布式,而Direct方式默认数据就在多台机器上。
3. 在实际操作的时候如果采用Receiver的方式的弊端是假设数据来不及处理,但是Direct就不会,因为是直接读取数据。
4. 语义一致性,Direct的方式数据一定会被执行。备注:1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains本文转自http://blog.csdn.net/snail_gesture/article/details/51525422
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: