[spark streaming] DStream 和 DStreamGraph 解析
2017-11-30 18:10
190 查看
看 spark streaming 源码解析之前最好先了解spark core的内容。
在Spark Streaming里,总体负责任务的动态调度是
Spark Streaming里的
下面看一个例子:
在创建
若
Spark Streaming记录DStream DAG 的方式就是通过
继续回到例子,这里通过ssc.socketTextStream 创建了一个
接着调用了flatMap方法:
创建了一个
这里我们再看看最后的 print() 操作:
在print() 方法里构建了一个foreachFunc方法:对一个rdd进行了take操作并打印(spark core中的action操作)。随后创建了ForEachDStream实例并调用了register()方法:
将 OutputStream 添加到
注意这里Spark Streaming的Job和Spark Core里的Job是不一样的,Streaming的Job执行的是前面构造的方法,方法里面是Core里的Job,方法可以定义多个core里的Job,也可以一个core里的job都没有。
前言
Spark Streaming 是基于Spark Core将流式计算分解成一系列的小批处理任务来执行。在Spark Streaming里,总体负责任务的动态调度是
JobScheduler,而
JobScheduler有两个很重要的成员:
JobGenerator和
ReceiverTracker。
JobGenerator负责将每个 batch 生成具体的 RDD DAG ,而
ReceiverTracker负责数据的来源。
Spark Streaming里的
DStream可以看成是Spark Core里的RDD的模板,
DStreamGraph是RDD DAG的模板。
跟着例子看流程
DStream 也和 RDD 一样有着转换(transformation)和 输出(output)操作,通过transformation操作会产生新的
DStream,典型的
transformation操作有map(), filter(), reduce(), join()等。RDD的输出操作会触发action,而DStream的输出操作也会新建一个
ForeachDStream,用一个函数func来记录所需要做的操作。
下面看一个例子:
val conf = new SparkConf().setMaster("local[2]") .setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination()
在创建
StreamingContext的时候实创建了 graph: DStreamGraph:
private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { _cp.graph.setContext(this) _cp.graph.restoreCheckpointData() _cp.graph } else { require(_batchDur != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() newGraph.setBatchDuration(_batchDur) newGraph } }
若
checkpoint可用,会优先从 checkpoint 恢复 graph,否则新建一个。graph用来动态的创建RDD DAG,
DStreamGraph有两个重要的成员:
inputStreams和
outputStreams。
private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]()
Spark Streaming记录DStream DAG 的方式就是通过
DStreamGraph实例记录所有的
outputStreams,因为
outputStream会通过依赖
dependencies来和parent DStream形成依赖链,通过
outputStreams向前追溯遍历就可以得到所有上游的DStream,另外,
DStreamGraph还会记录所有的
inputStreams,避免每次为查找 input stream 而对 output steam 进行 BFS 的消耗。
继续回到例子,这里通过ssc.socketTextStream 创建了一个
ReceiverInputDStream,在其父类 InputDStream 中会将该
ReceiverInputDStream添加到
inputStream里。
接着调用了flatMap方法:
def flatMap[U: ClassTag](flatMapFunc: T => TraversableOnce[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) } -------------------------------------------------------------------- private[streaming] class FlatMappedDStream[T: ClassTag, U: ClassTag]( parent: DStream[T], flatMapFunc: T => TraversableOnce[U] ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) } }
创建了一个
FlatMappedDStream,而该类的compute方法是在父 DStream(ReceiverInputDStream) 在对应batch时间的RDD上调用了flatMap方法,也就是构造了
rdd.flatMap(func)这样的代码,后面的操作类似,随后形成的是
rdd.flatMap(func1).map(func2).reduceByKey(func3).take(),这不就是我们spark core里的东西吗。另外其
dependencies是直接指向了其构造参数parent,也就是刚才的
ReceiverInputDStream,每个新建的DStream的dependencies都是指向了其父DStream,这样就构成了一个依赖链,也就是形成了DStream DAG。
这里我们再看看最后的 print() 操作:
---- def print(num: Int): Unit = ssc.withScope { def foreachFunc: (RDD[T], Time) => Unit = { (rdd: RDD[T], time: Time) => { val firstNum = rdd.take(num + 1) // scalastyle:off println println("-------------------------------------------") println(s"Time: $time") println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() // scalastyle:on println } } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false) } ---- private def foreachRDD( foreachFunc: (RDD[T], Time) => Unit, displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register() } ---- #ForEachDStream override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) { foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } }
在print() 方法里构建了一个foreachFunc方法:对一个rdd进行了take操作并打印(spark core中的action操作)。随后创建了ForEachDStream实例并调用了register()方法:
private[streaming] def register(): DStream[T] = { ssc.graph.addOutputStream(this) this }
将 OutputStream 添加到
DStreamGraph的
outputStreams里。可以看到刚才构建的 foreachFunc 方法最终用在了
ForEachDStream实例的
generateJob方法里,并创建了一个Streaming 中的Job,在job中的run方法中会调用这个方法,也就是会触发action操作。
注意这里Spark Streaming的Job和Spark Core里的Job是不一样的,Streaming的Job执行的是前面构造的方法,方法里面是Core里的Job,方法可以定义多个core里的Job,也可以一个core里的job都没有。
相关文章推荐
- SparkException: org.apache.spark.streaming.dstream.MappedDStream has not been initialized
- Spark Streaming之二:DStream解析
- spark streaming 4: DStreamGraph JobScheduler
- Spark源码解析:DStream
- spark streaming 实现kafka的createDirectStream方式!!不坑
- DStream, DStreamGraph 详解
- 第1课:SparkStreaming 三板斧之一:解密SparkStreaming另类实验及SparkStreaming本质解析
- Kafka+ Spark Streaming 创建stream编译报错
- Spark组件之Spark Streaming学习6--如何调用Dstream里面的getOrCompute方法?
- SparkStreaming在启动执行步鄹和DStream的理解
- spark streaming 实现kafka的createDirectStream方式!!不坑
- Spark Streaming自定义数据源-实现自定义输入DStream和接收器
- Spark Streaming on Kafka解析和安装实战
- spark-streaming direct stream的部分实现原理
- spark streaming 实现kafka的createDirectStream方式!!不坑
- Spark源码解析之SparkStreaming数据处理及流动
- spark streaming 实现kafka的createDirectStream方式!!不坑
- spark streaming kafka1.4.1中的低阶api createDirectStream使用总结
- Spark Streaming之一:StreamingContext解析
- IMF传奇行动第84课:Spark Streaming第三课:StreamingContext、DStream、Receiver深度剖析