您的位置:首页 > 其它

[spark streaming] DStream 和 DStreamGraph 解析

2017-11-30 18:10 190 查看
看 spark streaming 源码解析之前最好先了解spark core的内容。

前言

Spark Streaming 是基于Spark Core将流式计算分解成一系列的小批处理任务来执行。

在Spark Streaming里,总体负责任务的动态调度是
JobScheduler
,而
JobScheduler
有两个很重要的成员:
JobGenerator
ReceiverTracker
JobGenerator
负责将每个 batch 生成具体的 RDD DAG ,而
ReceiverTracker
负责数据的来源。

Spark Streaming里的
DStream
可以看成是Spark Core里的RDD的模板,
DStreamGraph
是RDD DAG的模板。

跟着例子看流程

DStream 也和 RDD 一样有着转换(transformation)和 输出(output)操作,通过
transformation
操作会产生新的
DStream
,典型的
transformation
操作有map(), filter(), reduce(), join()等。RDD的输出操作会触发action,而DStream的输出操作也会新建一个
ForeachDStream
,用一个函数func来记录所需要做的操作。

下面看一个例子:

val conf = new SparkConf().setMaster("local[2]")
.setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()


在创建
StreamingContext
的时候实创建了 graph: DStreamGraph:

private[streaming] val graph: DStreamGraph = {
if (isCheckpointPresent) {
_cp.graph.setContext(this)
_cp.graph.restoreCheckpointData()
_cp.graph
} else {
require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
val newGraph = new DStreamGraph()
newGraph.setBatchDuration(_batchDur)
newGraph
}
}


checkpoint
可用,会优先从 checkpoint 恢复 graph,否则新建一个。graph用来动态的创建RDD DAG,
DStreamGraph
有两个重要的成员:
inputStreams
outputStreams


private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()


Spark Streaming记录DStream DAG 的方式就是通过
DStreamGraph
实例记录所有的
outputStreams
,因为
outputStream
会通过依赖

dependencies
来和parent DStream形成依赖链,通过
outputStreams
向前追溯遍历就可以得到所有上游的DStream,另外,
DStreamGraph
还会记录所有的
inputStreams
,避免每次为查找 input stream 而对 output steam 进行 BFS 的消耗。

继续回到例子,这里通过ssc.socketTextStream 创建了一个
ReceiverInputDStream
,在其父类 InputDStream 中会将该
ReceiverInputDStream
添加到
inputStream
里。

接着调用了flatMap方法:

def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}

--------------------------------------------------------------------

private[streaming]
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => TraversableOnce[U]
) extends DStream[U](parent.ssc) {

override def dependencies: List[DStream[_]] = List(parent)

override def slideDuration: Duration = parent.slideDuration

override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}


创建了一个
FlatMappedDStream
,而该类的compute方法是在父 DStream(ReceiverInputDStream) 在对应batch时间的RDD上调用了flatMap方法,也就是构造了
rdd.flatMap(func)
这样的代码,后面的操作类似,随后形成的是
rdd.flatMap(func1).map(func2).reduceByKey(func3).take()
,这不就是我们spark core里的东西吗。另外其
dependencies
是直接指向了其构造参数parent,也就是刚才的
ReceiverInputDStream
,每个新建的DStream的dependencies都是指向了其父DStream,这样就构成了一个依赖链,也就是形成了DStream DAG。

这里我们再看看最后的 print() 操作:

----
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) => Unit = {
(rdd: RDD[T], time: Time) => {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println("-------------------------------------------")
println(s"Time: $time")
println("-------------------------------------------")
firstNum.take(num).foreach(println)
if (firstNum.length > num) println("...")
println()
// scalastyle:on println
}
}
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
}
----
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
----
#ForEachDStream
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}


在print() 方法里构建了一个foreachFunc方法:对一个rdd进行了take操作并打印(spark core中的action操作)。随后创建了ForEachDStream实例并调用了register()方法:

private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}


将 OutputStream 添加到
DStreamGraph
outputStreams
里。可以看到刚才构建的 foreachFunc 方法最终用在了
ForEachDStream
实例的
generateJob
方法里,并创建了一个Streaming 中的Job,在job中的run方法中会调用这个方法,也就是会触发action操作。

注意这里Spark Streaming的Job和Spark Core里的Job是不一样的,Streaming的Job执行的是前面构造的方法,方法里面是Core里的Job,方法可以定义多个core里的Job,也可以一个core里的job都没有。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: