Spark RDD概念学习系列之RDD的checkpoint(九)
2016-07-31 19:51
471 查看
RDD的检查点
[b] 首先,要清楚。为什么spark要引入检查点机制?引入RDD的检查点?[/b]
[b][b] [/b][/b][b]答[/b]:如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入检查点机制。
RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入检查点(checkpoint)机制。
[b]RDD的缓存和RDD的checkpoint的区别[/b]
RDD的缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。
而RDD的检查点不同,它是在计算完成后,重新建立一个Job来计算。
为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。
RDD的checkpoint的处理
在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。为了理解checkpoint的RDD是如何读取计算结果的,需要先看一下checkpoint的数据是如何写入的。
首先在Job结束后,会判断是否需要checkpoint。如果需要,就调用org.apache.spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;接着创建一个org.apache.spark.rdd.CheckpointRDD;最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。
简要的核心逻辑如下:
至此,RDD的checkpoint完成,其中checkpoint的数据可以通过checkpointRDD的readFromFile读取。但是,上述逻辑在清除了RDD的依赖后,并没有和check-pointRDD建立联系。
[b] 那么Spark是如何确定一个RDD是否被checkpoint了,而且正确读取checkpoint的数据呢?[/b]
答案就在org.apache.spark.rdd.RDD#dependencies的实现,它会首先判断当前的RDD是否已经Checkpoint过,如果有,那么RDD的依赖就变成了对应的Ch
理解了Checkpoint的实现过程。
接下来看一下[b]computeOrReadCheckpoint的实现[/b]。前面提到了,它一共在两个地方被调用,org.apache.spark.rdd.RDD#iterator和org.apache.spark.CacheManager#getOrCompute。它实现的逻辑比较简单,首先检查当前RDD是否被Checkpoint过,如果有,读取Checkpoint的数据;否则开始计算。
实现如下:
同时,大家可以关注我的个人博客:
http://www.cnblogs.com/zlslch/ 和 http://www.cnblogs.com/lchzls/
人生苦短,我愿分享。本公众号将秉持活到老学到老学习无休止的交流分享开源精神,汇聚于互联网和个人学习工作的精华干货知识,一切来于互联网,反馈回互联网。
目前研究领域:大数据、机器学习、深度学习、人工智能、数据挖掘、数据分析。 语言涉及:Java、Scala、Python、Shell、Linux等 。同时还涉及平常所使用的手机、电脑和互联网上的使用技巧、问题和实用软件。 只要你一直关注和呆在群里,每天必须有收获
以及对应本平台的QQ群:161156071(大数据躺过的坑)
[b] 首先,要清楚。为什么spark要引入检查点机制?引入RDD的检查点?[/b]
[b][b] [/b][/b][b]答[/b]:如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入检查点机制。
RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。但是,如果缓存丢失了,则需要重新计算。如果计算特别复杂或者计算耗时特别多,那么缓存丢失对于整个Job的影响是不容忽视的。为了避免缓存丢失重新计算带来的开销,Spark又引入检查点(checkpoint)机制。
[b]RDD的缓存和RDD的checkpoint的区别[/b]
RDD的缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。
而RDD的检查点不同,它是在计算完成后,重新建立一个Job来计算。
为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。
RDD的checkpoint的处理
在缓存没有命中的情况下,首先会判断是否保存了RDD的checkpoint,如果有,则读取checkpoint。为了理解checkpoint的RDD是如何读取计算结果的,需要先看一下checkpoint的数据是如何写入的。
首先在Job结束后,会判断是否需要checkpoint。如果需要,就调用org.apache.spark.rdd.RDDCheckpointData#doCheckpoint。doCheckpoint首先为数据创建一个目录;然后启动一个新的Job来计算,并且将计算结果写入新创建的目录;接着创建一个org.apache.spark.rdd.CheckpointRDD;最后,原始RDD的所有依赖被清除,这就意味着RDD的转换的计算链(compute chain)等信息都被清除。这个处理逻辑中,数据写入的实现在org.apache.spark.rdd.CheckpointRDD$#writeToFile。
简要的核心逻辑如下:
//创建一个保存checkpoint数据的目录 val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) val fs = path.getFileSystem(rdd.context.hadoopConfiguration) if (!fs.mkdirs(path)) { throw new SparkException("Failed to create checkpoint path " + path) } //创建广播变量 val broadcastedConf = rdd.context.broadcast( new SerializableWritable(rdd.context.hadoopConfiguration)) //开始一个新的Job进行计算,计算结果存入路径path中 rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _) //根据结果的路径path来创建CheckpointRDD val newRDD = new CheckpointRDD[T](rdd.context, path.toString) //保存结果,清除原始RDD的依赖、Partition信息等 RDDCheckpointData.synchronized { cpFile = Some(path.toString) cpRDD = Some(newRDD) // RDDCheckpointData对应的CheckpointRDD rdd.markCheckpointed(newRDD) //清除原始RDD的依赖,Partition cpState = Checkpointed //标记checkpoint的状态为完成 }
至此,RDD的checkpoint完成,其中checkpoint的数据可以通过checkpointRDD的readFromFile读取。但是,上述逻辑在清除了RDD的依赖后,并没有和check-pointRDD建立联系。
[b] 那么Spark是如何确定一个RDD是否被checkpoint了,而且正确读取checkpoint的数据呢?[/b]
答案就在org.apache.spark.rdd.RDD#dependencies的实现,它会首先判断当前的RDD是否已经Checkpoint过,如果有,那么RDD的依赖就变成了对应的Ch
eckpointRDD: privatedefcheckpointRDD: Option[RDD[T]]=checkpointData.flatMap(_.checkpointRDD) final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { //没有checkpoint dependencies_ = getDependencies } dependencies_ } }
理解了Checkpoint的实现过程。
接下来看一下[b]computeOrReadCheckpoint的实现[/b]。前面提到了,它一共在两个地方被调用,org.apache.spark.rdd.RDD#iterator和org.apache.spark.CacheManager#getOrCompute。它实现的逻辑比较简单,首先检查当前RDD是否被Checkpoint过,如果有,读取Checkpoint的数据;否则开始计算。
实现如下:
private[spark] def [b]computeOrReadCheckpoint[/b](split: Partition, context: TaskContext) : Iterator[T] = { if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) } firstParent[T].iterator(split,context)会调用对应CheckpointRDD的iterator,最终调用到它的compute: override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file=new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) CheckpointRDD.readFromFile(file, broadcastedConf, context) // 读取Checkpoint的数据 }
RDD的缓存
欢迎大家,加入我的微信公众号:大数据躺过的坑 免费给分享同时,大家可以关注我的个人博客:
http://www.cnblogs.com/zlslch/ 和 http://www.cnblogs.com/lchzls/
人生苦短,我愿分享。本公众号将秉持活到老学到老学习无休止的交流分享开源精神,汇聚于互联网和个人学习工作的精华干货知识,一切来于互联网,反馈回互联网。
目前研究领域:大数据、机器学习、深度学习、人工智能、数据挖掘、数据分析。 语言涉及:Java、Scala、Python、Shell、Linux等 。同时还涉及平常所使用的手机、电脑和互联网上的使用技巧、问题和实用软件。 只要你一直关注和呆在群里,每天必须有收获
以及对应本平台的QQ群:161156071(大数据躺过的坑)
相关文章推荐
- Spark RDD概念学习系列之RDD的依赖关系(宽依赖和窄依赖)(三)
- Spark RDD概念学习系列之RDD是什么?(四)
- Spark RDD概念学习系列之RDD的依赖关系(宽依赖和窄依赖)(三)
- Spark RDD概念学习系列之Pair RDD的action操作
- Spark RDD概念学习系列之Pair RDD的分区控制
- Spark RDD概念学习系列之为什么会引入RDD?(一)
- Spark RDD概念学习系列之RDD的缺点(二)
- Spark RDD概念学习系列之RDD的5大特点(五)
- Spark RDD概念学习系列之RDD的重要内部属性(十五)
- Spark RDD概念学习系列之RDD的缓存(八)
- Spark RDD概念学习系列之RDD的转换图解
- Spark RDD概念学习系列之Spark Hash Shuffle内幕彻底解密(二十)
- spark RDD系列------2.HadoopRDD分区的创建以及计算
- Spark RDD概念学习系列之action操作
- Spark RDD概念学习系列之Spark的算子的分类(十一)
- Spark RDD概念学习系列之什么是Pair RDD
- Spark RDD概念学习系列之RDD的5大特点
- Spark RDD概念学习系列之如何创建Pair RDD
- Spark RDD概念学习系列之Pair RDD的transformation操作
- Spark RDD概念学习系列之RDD的容错机制(十七)