1.2 DStream 生成 RDD 实例详解
2016-05-20 11:05
260 查看
转自:https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/1.2%20DStream%20%E7%94%9F%E6%88%90%20RDD%20%E5%AE%9E%E4%BE%8B%E8%AF%A6%E8%A7%A3.md
[酷玩 Spark] Spark Streaming 源码解析系列 ,返回目录请 猛戳这里
「腾讯·广点通」技术团队荣誉出品
本文内容适用范围:
2016.02.25 update, Spark 2.0 全系列 √ (2.0.0-SNAPSHOT 尚未正式发布)
2016.03.10 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1)
2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1)
阅读本文前,请一定先阅读 Spark
Streaming 实现思路与模块概述 一文,其中概述了 Spark Streaming 的 4 大模块的基本作用,有了全局概念后再看本文对
我们在前面的文章讲过,Spark Streaming 的
DAG 的实例。
在 Spark Streaming 里,这个 RDD “模板”对应的具体的类是
本文我们就来详解
我们在前文 DStream,
DStreamGraph 详解 中引用了 Spark
Streaming 官方的 quick example 的这段对 DStream DAG 的定义,注意看代码中的注释讲解内容:
这里我们找到
也就是
然后我们继续找到下一行
也就是
后面几行也是如此,所以我们如果用 DStream DAG 图来表示之前那段 quick example 的话,就是这个样子:
也即,我们给出的那段代码,用具体的实现来替换的话,结果如下:
—— 如每 15s 生成一个 batch 的话,那么这里的 key 的时间就是
batch。
需要注意,每一个不同的
最主要还是调用了一个 abstract 的
abstract 的,但在每个具体的子类里都提供了实现。
而
所以,结合以上
(1) 先通过一个 findNewFiles() 方法,找到 validTime 以后产生的多个新 file
(2) 对每个新 file,都将其作为参数调用 sc.newAPIHadoopFile(file),生成一个 RDD 实例
(3) 将 (2) 中的多个新 file 对应的多个 RDD 实例进行 union,返回一个 union 后的 UnionRDD
其它
前一小节的
batch 产生
batch 产生
具体的,我们看两个具体
的实现:
可以看到,首先在构造函数里传入了两个重要内容:
parent,是本
mapFunc,是本次 map() 转换的具体函数
在前文 DStream,
DStreamGraph 详解 中的 quick example 里的
所以在
(1) 获取 parent
(2) 在这个 parent
完全相当于用 RDD API 写了这样的代码:
再看看
同
parent,是本
filterFunc,是本次 filter() 转换的具体函数
所以在
(1) 获取 parent
(2) 在这个 parent
完全相当于用 RDD API 写了这样的代码:
总结上面
batch 里生成
——
batch 里生成
——
在最开始,
得到的新
batch 的
这也就是
batch 里实例化
上面分析了
我们前面讲过,对一个
同前面一样,
parent,是本
foreachFunc,是本次 output 的具体函数
所以在
(1) 获取 parent
(2) 以这个 parent
例如,我们看看
就可以知道,如果对着
batch 里,都会在
driver 端,然后再
在前文 Spark
Streaming 实现思路与模块概述 中,我们曾经讲过,在每个 batch 时,都由
“模板” 来创建
具体的,是
那么翻出来
也就是说,是
—— 而我们知道,只有 ForEachDStream 是 outputStream,所以将调用
举个例子,如上图,由于我们在代码里的两次 print() 操作产生了两个
Job。
但是……
Job 到底是啥?那我们先插播一下
Spark Streaming 里重新定义了一个
所以其实
—— 就像
下面我们继续来看
就是这里牵扯到了
用一个时序图来表达这里的调用关系会清晰很多:
所以最后的时候,由于对
同样的,
同样,最后的时候,由于对
所以当
比如在上图里,
至此,在给定的 batch 里,
尽快运行起来。
而且,每个新 batch 生成时,都会调用
到此,整个
“模板” 为每个 batch 实例化
的 “模板” 为每个 batch 实例化
DStream 生成 RDD 实例详解
[酷玩 Spark] Spark Streaming 源码解析系列 ,返回目录请 猛戳这里「腾讯·广点通」技术团队荣誉出品
本文内容适用范围:
2016.02.25 update, Spark 2.0 全系列 √ (2.0.0-SNAPSHOT 尚未正式发布)
2016.03.10 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1)
2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1)
阅读本文前,请一定先阅读 Spark
Streaming 实现思路与模块概述 一文,其中概述了 Spark Streaming 的 4 大模块的基本作用,有了全局概念后再看本文对
模块 1 DAG 静态定义细节的解释。
引言
我们在前面的文章讲过,Spark Streaming 的 模块 1 DAG 静态定义要解决的问题就是如何把计算逻辑描述为一个 RDD DAG 的“模板”,在后面 Job 动态生成的时候,针对每个 batch,都将根据这个“模板”生成一个 RDD
DAG 的实例。
在 Spark Streaming 里,这个 RDD “模板”对应的具体的类是
DStream,RDD DAG “模板”对应的具体类是
DStreamGraph。
DStream 的全限定名是:org.apache.spark.streaming.dstream.DStream DStreamGraph 的全限定名是:org.apache.spark.streaming.DStreamGraph
本文我们就来详解
DStream最主要的功能:为每个 batch 生成
RDD实例。
Quick
Example
我们在前文 DStream,DStreamGraph 详解 中引用了 Spark
Streaming 官方的 quick example 的这段对 DStream DAG 的定义,注意看代码中的注释讲解内容:
// ssc.socketTextStream() 将创建一个 SocketInputDStream;这个 InputDStream 的 SocketReceiver 将监听本机 9999 端口 val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) // DStream transformation val pairs = words.map(word => (word, 1)) // DStream transformation val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation wordCounts.print() // DStream output
这里我们找到
ssc.socketTextStream("localhost", 9999)的源码实现:
def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
也就是
ssc.socketTextStream()将
new出来一个
DStream具体子类
SocketInputDStream的实例。
然后我们继续找到下一行
lines.flatMap(_.split(" "))的源码实现:
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
也就是
lines.flatMap(_.split(" "))将
new出来一个
DStream具体子类
FlatMappedDStream的实例。
后面几行也是如此,所以我们如果用 DStream DAG 图来表示之前那段 quick example 的话,就是这个样子:
也即,我们给出的那段代码,用具体的实现来替换的话,结果如下:
val lines = new SocketInputDStream("localhost", 9999) // 类型是 SocketInputDStream val words = new FlatMappedDStream(lines, _.split(" ")) // 类型是 FlatMappedDStream val pairs = new MappedDStream(words, word => (word, 1)) // 类型是 MappedDStream val wordCounts = new ShuffledDStream(pairs, _ + _) // 类型是 ShuffledDStream new ForeachDStream(wordCounts, cnt => cnt.print()) // 类型是 ForeachDStream
DStream
通过 generatedRDD
管理已生成的 RDD
DStream内部用一个类型是
HashMap的变量
generatedRDD来记录已经生成过的
RDD:
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
generatedRDD的 key 是一个
Time;这个
Time是与用户指定的
batchDuration对齐了的时间
—— 如每 15s 生成一个 batch 的话,那么这里的 key 的时间就是
08h:00m:00s,
08h:00m:15s这种,所以其实也就代表是第几个
batch。
generatedRDD的 value 就是
RDD的实例。
需要注意,每一个不同的
DStream实例,都有一个自己的
generatedRDD。如在下图中,
DStream a, b, c, d各有自己的
generatedRDD变量;图中也示意了
DStream a的
generatedRDD变量。
DStream对这个
HashMap的存取主要是通过
getOrCompute(time: Time)方法,实现也很简单,就是一个 —— 查表,如果有就直接返回,如果没有就生成了放入表、再返回 —— 的逻辑:
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // 从 generatedRDDs 里 get 一下:如果有 rdd 就返回,没有 rdd 就进行 orElse 下面的 rdd 生成步骤 generatedRDDs.get(time).orElse { // 验证 time 需要是 valid if (isTimeValid(time)) { // 然后调用 compute(time) 方法获得 rdd 实例,并存入 rddOption 变量 val rddOption = createRDDWithLocalProperties(time) { PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } // 将刚刚实例化出来的 rddOption 放入 generatedRDDs 对应的 time 位置 generatedRDDs.put(time, newRDD) } // 返回刚刚实例化出来的 rddOption rddOption } else { None } } }
最主要还是调用了一个 abstract 的
compute(time)方法。这个方法用于生成
RDD实例,生成后被放进
generatedRDD里供后续的查询和使用。这个
compute(time)方法在
DStream类里是
abstract 的,但在每个具体的子类里都提供了实现。
(a) InputDStream
的 compute(time)
实现
InputDStream是个有很多子类的抽象类,我们看一个具体的子类
FileInputDStream。
// 来自 FileInputDStream override def compute(validTime: Time): Option[RDD[(K, V)]] = { // 通过一个 findNewFiles() 方法,找到 validTime 以后产生的新 file 的数据 val newFiles = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) batchTimeToSelectedFiles += ((validTime, newFiles)) recentlySelectedFiles ++= newFiles // 找到了一些新 file;以新 file 的数组为参数,通过 filesToRDD() 生成单个 RDD 实例 rdds val rdds = Some(filesToRDD(newFiles)) val metadata = Map( "files" -> newFiles.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n")) val inputInfo = StreamInputInfo(id, 0, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // 返回生成的单个 RDD 实例 rdds rdds }
而
filesToRDD()实现如下:
// 来自 FileInputDStream private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { // 对每个 file,都 sc.newAPIHadoopFile(file) 来生成一个 RDD val fileRDDs = files.map { file => val rdd = serializableConfOpt.map(_.value) match { case Some(config) => context.sparkContext.newAPIHadoopFile( file, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], config) case None => context.sparkContext.newAPIHadoopFile[K, V, F](file) } if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") } rdd } // 将每个 file 对应的 RDD 进行 union,返回一个 union 后的 UnionRDD new UnionRDD(context.sparkContext, fileRDDs) }
所以,结合以上
compute(validTime: Time)和
filesToRDD(files: Seq[String])方法,我们得出
FileInputDStream为每个 batch 生成 RDD 的实例过程如下:
(1) 先通过一个 findNewFiles() 方法,找到 validTime 以后产生的多个新 file
(2) 对每个新 file,都将其作为参数调用 sc.newAPIHadoopFile(file),生成一个 RDD 实例
(3) 将 (2) 中的多个新 file 对应的多个 RDD 实例进行 union,返回一个 union 后的 UnionRDD
其它
InputDStream的为每个 batch 生成
RDD实例的过程也比较类似了。
(b)
一般 DStream
的 compute(time)
实现
前一小节的 InputDStream没有上游依赖的
DStream,可以直接为每个
batch 产生
RDD实例。一般
DStream都是由transofrmation 生成的,都有上游依赖的
DStream,所以为了为
batch 产生
RDD实例,就需要在
compute(time)方法里先获取上游依赖的
DStream产生的
RDD实例。
具体的,我们看两个具体
DStream——
MappedDStream,
FilteredDStream——
的实现:
MappedDStream
的 compute(time)
实现
MappedDStream很简单,全类实现如下:
package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } }
可以看到,首先在构造函数里传入了两个重要内容:
parent,是本
MappedDStream上游依赖的
DStream
mapFunc,是本次 map() 转换的具体函数
在前文 DStream,
DStreamGraph 详解 中的 quick example 里的
val pairs = words.map(word => (word, 1))的
mapFunc就是
word => (word, 1)
所以在
compute(time)的具体实现里,就很简单了:
(1) 获取 parent
DStream在本 batch 里对应的
RDD实例
(2) 在这个 parent
RDD实例上,以
mapFunc为参数调用
.map(mapFunc)方法,将得到的新
RDD实例返回
完全相当于用 RDD API 写了这样的代码:
return parentRDD.map(mapFunc)
FilteredDStream
的 compute(time)
实现
再看看 FilteredDStream的全部实现:
package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag private[streaming] class FilteredDStream[T: ClassTag]( parent: DStream[T], filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { parent.getOrCompute(validTime).map(_.filter(filterFunc)) } }
同
MappedDStream一样,
FilteredDStream也在构造函数里传入了两个重要内容:
parent,是本
FilteredDStream上游依赖的
DStream
filterFunc,是本次 filter() 转换的具体函数
所以在
compute(time)的具体实现里,就很简单了:
(1) 获取 parent
DStream在本 batch 里对应的
RDD实例
(2) 在这个 parent
RDD实例上,以
filterFunc为参数调用
.filter(filterFunc)方法,将得到的新
RDD实例返回
完全相当于用 RDD API 写了这样的代码:
return parentRDD.filter(filterFunc)
总结一般 DStream
的 compute(time)
实现
总结上面 MappedDStream和
FilteredDStream的实现,可以看到:
DStream的
.map()操作生成了
MappedDStream,而
MappedDStream在每个
batch 里生成
RDD实例时,将对
parentRDD调用
RDD的
.map()操作
——
DStream.map()操作完美复制为每个 batch 的
RDD.map()操作
DStream的
.filter()操作生成了
FilteredDStream,而
FilteredDStream在每个
batch 里生成
RDD实例时,将对
parentRDD调用
RDD的
.filter()操作
——
DStream.filter()操作完美复制为每个 batch 的
RDD.filter()操作
在最开始,
DStream的 transformation 的 API 设计与
RDD的 transformation 设计保持了一致,就使得,每一个
dStreamA.transformation()
得到的新
dStreamB能将
dStreamA.transformation() 操作完美复制为每个
batch 的
rddA.transformation() 操作。
这也就是
DStream能够作为
RDD模板,在每个
batch 里实例化
RDD的根本原因。
(c) ForEachDStream
的 compute(time)
实现
上面分析了 DStream的 transformation 如何在
compute(time)里复制为
RDD的 transformation,下面我们分析
DStream的output 如何在
compute(time)里复制为
RDD的 action。
我们前面讲过,对一个
DStream进行 output 操作,将生成一个新的
ForEachDStream,这个
ForEachDStream用一个
foreachFunc成员来记录 output 的具体内容。
ForEachDStream全部实现如下:
package org.apache.spark.streaming.dstream import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.streaming.scheduler.Job import scala.reflect.ClassTag private[streaming] class ForEachDStream[T: ClassTag] ( parent: DStream[T], foreachFunc: (RDD[T], Time) => Unit ) extends DStream[Unit](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[Unit]] = None override def generateJob(time: Time): Option[Job] = { parent.getOrCompute(time) match { case Some(rdd) => val jobFunc = () => createRDDWithLocalProperties(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } Some(new Job(time, jobFunc)) case None => None } } }
同前面一样,
ForEachDStream也在构造函数里传入了两个重要内容:
parent,是本
ForEachDStream上游依赖的
DStream
foreachFunc,是本次 output 的具体函数
所以在
compute(time)的具体实现里,就很简单了:
(1) 获取 parent
DStream在本 batch 里对应的
RDD实例
(2) 以这个 parent
RDD和本次 batch 的 time 为参数,调用
foreachFunc(parentRDD, time)方法
例如,我们看看
DStream.print()里
foreachFunc(rdd, time)的具体实现:
def foreachFunc: (RDD[T], Time) => Unit = { val firstNum = rdd.take(num + 1) println("-------------------------------------------") println("Time: " + time) println("-------------------------------------------") firstNum.take(num).foreach(println) if (firstNum.length > num) println("...") println() }
就可以知道,如果对着
rdd调用上面这个
foreachFunc的话,就会在每个
batch 里,都会在
rdd上执行
.take()获取一些元素到
driver 端,然后再
.foreach(println);也就形成了在 driver 端打印这个
DStream的一些内容的效果了!
DStreamGraph
生成 RDD DAG 实例
在前文 SparkStreaming 实现思路与模块概述 中,我们曾经讲过,在每个 batch 时,都由
JobGenerator来要求
RDDDAG
“模板” 来创建
RDDDAG 实例,即下图中的第 (2) 步。
具体的,是
JobGenerator来调用
DStreamGraph的
generateJobs(time)方法。
那么翻出来
generateJobs()的实现:
// 来自 DStreamGraph def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { outputStreams.flatMap(outputStream => outputStream.generateJob(time)) } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs }
也就是说,是
DStreamGraph继续调用了每个
outputStream的
generateJob(time)方法
—— 而我们知道,只有 ForEachDStream 是 outputStream,所以将调用
ForEachDStream的
generateJob(time)方法。
举个例子,如上图,由于我们在代码里的两次 print() 操作产生了两个
ForEachDStream节点
x和
y,那么
DStreamGraph.generateJobs(time)就将先后调用
x.generateJob(time)和
y.generateJob(time)方法,并将各获得一个
Job。
但是……
x.generateJob(time)和
y.generateJob(time)的返回值
Job 到底是啥?那我们先插播一下
Job。
Spark
Streaming 的 Job
Spark Streaming 里重新定义了一个 Job类,功能与
Java的
Runnable差不多:一个
Job能够自定义一个
func() 函数,而
Job的
.run()方法实现就是执行这个
func()。
// 节选自 org.apache.spark.streaming.scheduler.Job private[streaming] class Job(val time: Time, func: () => _) { ... def run() { _result = Try(func()) } ... }
所以其实
Job的本质是将实际的
func()定义和
func()被调用分离了
—— 就像
Runnable是将
run()的具体定义和
run()的被调用分离了一样。
下面我们继续来看
x.generateJob(time)和
y.generateJob(time)实现。
x.generateJob(time)
过程
x是一个
ForEachDStream,其
generateJob(time)的实现如下:
// 来自 ForEachDStream override def generateJob(time: Time): Option[Job] = { // 【首先调用 parentDStream 的 getOrCompute() 来获取 parentRDD】 parent.getOrCompute(time) match { case Some(rdd) => // 【然后定义 jobFunc 为在 parentRDD 上执行 foreachFun() 】 val jobFunc = () => createRDDWithLocalProperties(time) { ssc.sparkContext.setCallSite(creationSite) foreachFunc(rdd, time) } // 【最后将 jobFunc 包装为 Job 返回】 Some(new Job(time, jobFunc)) case None => None } }
就是这里牵扯到了
x的
parentDStream.getOrCompute(time),即
d.getOrCompute(time);而
d.getOrCompute(time)会牵扯
c.getOrCompute(time),乃至
a.getOrCompute(time),
b.getOrCompute(time)
用一个时序图来表达这里的调用关系会清晰很多:
所以最后的时候,由于对
x.generateJob(time)形成的递归调用, 将形成一个 Job,其内容
func如下图:
y.generateJob(time)
过程
同样的,y节点生成 Job 的过程,与
x节点的过程非常类似,只是在
b.getOrCompute(time)时,会命中
get(time)而不需要触发
compute(time)了,这是因为该
RDD实例已经在
x节点的生成过程中被实例化过一次,所以在这里只需要取出来用就可以了。
同样,最后的时候,由于对
y.generateJob(time)形成的递归调用, 将形成一个 Job,其内容
func如下图:
返回
Seq[Job]
所以当 DStreamGraph.generateJobs(time)结束时,会返回多个
Job,是因为作为
output stream的每个
ForEachDStream都通过
generateJob(time)方法贡献了一个
Job。
比如在上图里,
DStreamGraph.generateJobs(time)会返回一个
Job的序列,其大小为
2,其内容分别为:
至此,在给定的 batch 里,
DStreamGraph.generateJobs(time)的工作已经全部完成,
Seq[Job]作为结果返回给
JobGenerator后,
JobGenerator也会尽快提交到
JobSheduler那里尽快调用
Job.run()使得这
2个
RDDDAG
尽快运行起来。
而且,每个新 batch 生成时,都会调用
DStreamGraph.generateJobs(time),也进而触发我们之前讨论这个
Job生成过程,周而复始。
到此,整个
DStream作为
RDD的
“模板” 为每个 batch 实例化
RDD,
DStreamGraph作为
RDDDAG
的 “模板” 为每个 batch 实例化
RDDDAG,就分析完成了。
相关文章推荐
- git SSH keys
- Nginx架构解析
- 520 | 用c语言程序撩妹
- 根据ios或者安卓扫描二维码进行相应下载
- Java设计模式之工厂方法设计模式
- Web请求过程
- 23种设计模式(17)--Memento模式
- topic 和queue的区别和应用 activeMQ
- CentOS让一个用户没有登录权限
- C++第7次上机作业
- ASP.NET对象
- Hive原理及查询优化
- 1.1 DStream, DStreamGraph 详解
- border-radius图解
- 360浏览器兼容模式默认显示ie最高版本
- C语言--位运算
- WPF 之 布局(二)
- git windows 记住用户名密码
- 一个带有Kruskal、Prim、Dijkstra算法的图类型 - C++ for C Programmers
- 圆桌问题[网络流24题之5]