Spark RDD概念学习系列之RDD的缓存(八)
2016-07-31 19:50
471 查看
RDD的缓存
[b]RDD的缓存和RDD的checkpoint的区别[/b]
缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。
而检查点不同,它是在计算完成后,重新建立一个Job来计算。
为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。
RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。
Spark速度非常快的原因之一,就是在不同操作中在内存中持久化(或缓存)一个数据集。当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其他动作(action)中重用。这使得后续的动作变得更加迅速(通常快10倍)。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。
通过persist()或cache()方法可以标记一个要被持久化的RDD,一旦首次被触发,该RDD将会被保留在计算节点的内存中并重用。实际上,cache()是使用persist()的快捷方法,它们的实现如下:
图1中,假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。
图1 RDD缓存过的Partition可以加快下一次的计算速度
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除。RDD的缓存的容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列的转换,丢失的数据会被重算。RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
RDD的缓存的处理
如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。什么是存储级别?从用户的角度来看就是缓存保存到不同的存储位置,比如内存、硬盘、Tachyon;还有缓存的数据是否需要序列化等。
cacheManager对Storage模块进行了封装,使得RDD可以更加简单地从Storage模块读取或者写入数据。RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。在系统实现的层面上,可以认为Partition和Block是一一对应的。cacheManager会通过getOrCompute来判断当前的RDD是否需要进行计算。
首先,cacheManager会通过RDD的ID和当前计算的Partition的ID向Storage模块的BlockManager发起查询请求,如果能够获得Block的信息,会直接返回Block的信息。否则,代表该RDD是需要计算的。这个RDD以前可能计算过并且被存储到了内存中,但是后来由于内存紧张,这部分内存被清理了。在计算结束后,计算结果会根据用户定义的存储级别,写入BlockManager中。这样,下次就可以不经过计算而直接读取该RDD的计算结果了。
核心实现逻辑如下:
欢迎大家,加入我的微信公众号:大数据躺过的坑 免费给分享
同时,大家可以关注我的个人博客:
http://www.cnblogs.com/zlslch/ 和 http://www.cnblogs.com/lchzls/
人生苦短,我愿分享。本公众号将秉持活到老学到老学习无休止的交流分享开源精神,汇聚于互联网和个人学习工作的精华干货知识,一切来于互联网,反馈回互联网。
目前研究领域:大数据、机器学习、深度学习、人工智能、数据挖掘、数据分析。 语言涉及:Java、Scala、Python、Shell、Linux等 。同时还涉及平常所使用的手机、电脑和互联网上的使用技巧、问题和实用软件。 只要你一直关注和呆在群里,每天必须有收获
以及对应本平台的QQ群:161156071(大数据躺过的坑)
[b]RDD的缓存和RDD的checkpoint的区别[/b]
缓存是在计算结束后,直接将计算结果通过用户定义的存储级别(存储级别定义了缓存存储的介质,现在支持内存、本地文件系统和Tachyon)写入不同的介质。
而检查点不同,它是在计算完成后,重新建立一个Job来计算。
为了避免重复计算,推荐先将RDD缓存,这样就能保证检查点的操作可以快速完成。
RDD的缓存能够在第一次计算完成后,将计算结果保存到内存、本地文件系统或者Tachyon(分布式内存文件系统)中。通过缓存,Spark避免了RDD上的重复计算,能够极大地提升计算速度。
Spark速度非常快的原因之一,就是在不同操作中在内存中持久化(或缓存)一个数据集。当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其他动作(action)中重用。这使得后续的动作变得更加迅速(通常快10倍)。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。
通过persist()或cache()方法可以标记一个要被持久化的RDD,一旦首次被触发,该RDD将会被保留在计算节点的内存中并重用。实际上,cache()是使用persist()的快捷方法,它们的实现如下:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()
图1中,假设首先进行了RDD0→RDD1→RDD2的计算作业,那么计算结束时,RDD1就已经缓存在系统中了。在进行RDD0→RDD1→RDD3的计算作业时,由于RDD1已经缓存在系统中,因此RDD0→RDD1的转换不会重复进行,计算作业只须进行RDD1→RDD3的计算就可以了,因此计算速度可以得到很大提升。
图1 RDD缓存过的Partition可以加快下一次的计算速度
缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除。RDD的缓存的容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列的转换,丢失的数据会被重算。RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。
RDD的缓存的处理
如果存储级别不是NONE,那么先检查是否有缓存;没有缓存则要进行计算。什么是存储级别?从用户的角度来看就是缓存保存到不同的存储位置,比如内存、硬盘、Tachyon;还有缓存的数据是否需要序列化等。
cacheManager对Storage模块进行了封装,使得RDD可以更加简单地从Storage模块读取或者写入数据。RDD的每个Partition对应Storage模块的一个Block,只不过Block是Partition经过处理后的数据。在系统实现的层面上,可以认为Partition和Block是一一对应的。cacheManager会通过getOrCompute来判断当前的RDD是否需要进行计算。
首先,cacheManager会通过RDD的ID和当前计算的Partition的ID向Storage模块的BlockManager发起查询请求,如果能够获得Block的信息,会直接返回Block的信息。否则,代表该RDD是需要计算的。这个RDD以前可能计算过并且被存储到了内存中,但是后来由于内存紧张,这部分内存被清理了。在计算结束后,计算结果会根据用户定义的存储级别,写入BlockManager中。这样,下次就可以不经过计算而直接读取该RDD的计算结果了。
核心实现逻辑如下:
def getOrCompute[T]( rdd: RDD[T], partition: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { //获取RDD的BlockId val key = RDDBlockId(rdd.id, partition.index) logDebug(s"Looking for partition $key") blockManager.get(key) match { //向BlockManager查询是否有缓存 case Some(blockResult) => //缓存命中 //更新统计信息,将缓存作为结果返回 context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => //没有缓存命中,需要计算 //判断当前是否有线程在处理当前的Partition,如果有那么等待它结束后,直接从Block // Manager中读取处理结果如果没有线程在计算,那么storedValues就是None,否则 //就是计算的结果 val storedValues = acquireLockForPartition[T](key) if (storedValues.isDefined) { //已经被其他线程处理了,直接返回结果 return new InterruptibleIterator[T](context, storedValues.get) } //需要计算 try { //如果被checkpoint过,那么读取checkpoint的数据;否则调用rdd的compute()开始计算 val computedValues = rdd.computeOrReadCheckpoint(partition,context) // Task是在Driver端执行的话就不需要缓存结果,这个主要是为了first()或者take() //这种仅仅有一个执行阶段的任务的快速执行。这类任务由于没有Shuffle阶段,直接运行 //在Driver端可能会更省时间 if (context.isRunningLocally) { return computedValues } //将计算结果写入到BlockManager val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) //更新任务的统计信息 val metrics = context.taskMetrics val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse( Seq[(BlockId, BlockStatus)]()) metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) new InterruptibleIterator(context, cachedValues) } finally { loading.synchronized { loading.remove(key) //如果有其他的线程在等待该Partition的处理结果,那么通知它们计算已经完成,结果已 //经存到BlockManager中(注意前面那类不会写入BlockManager的本地任务) // loading.notifyAll() } } } }
欢迎大家,加入我的微信公众号:大数据躺过的坑 免费给分享
同时,大家可以关注我的个人博客:
http://www.cnblogs.com/zlslch/ 和 http://www.cnblogs.com/lchzls/
人生苦短,我愿分享。本公众号将秉持活到老学到老学习无休止的交流分享开源精神,汇聚于互联网和个人学习工作的精华干货知识,一切来于互联网,反馈回互联网。
目前研究领域:大数据、机器学习、深度学习、人工智能、数据挖掘、数据分析。 语言涉及:Java、Scala、Python、Shell、Linux等 。同时还涉及平常所使用的手机、电脑和互联网上的使用技巧、问题和实用软件。 只要你一直关注和呆在群里,每天必须有收获
以及对应本平台的QQ群:161156071(大数据躺过的坑)
相关文章推荐
- Spark RDD概念学习系列之Pair RDD的action操作
- Spark RDD概念学习系列之Pair RDD的分区控制
- Spark RDD概念学习系列之RDD的依赖关系(宽依赖和窄依赖)(三)
- Spark RDD概念学习系列之RDD的5大特点(五)
- Spark RDD概念学习系列之为什么会引入RDD?(一)
- Spark RDD概念学习系列之RDD的checkpoint(九)
- Spark RDD概念学习系列之RDD的重要内部属性(十五)
- Spark RDD概念学习系列之RDD的缺点(二)
- Spark RDD概念学习系列之RDD的依赖关系(宽依赖和窄依赖)(三)
- Spark RDD概念学习系列之RDD的转换图解
- Spark RDD概念学习系列之RDD是什么?(四)
- Spark RDD概念学习系列之如何创建Pair RDD
- Spark RDD系列-------1. 决定Spark RDD分区算法因素的总结
- Spark RDD概念学习系列之Spark的算子的分类(十一)
- Spark RDD概念学习系列之Spark的算子的分类
- Spark RDD概念学习系列之Pair RDD的transformation操作
- Spark RDD的缓存 rdd.cache() 和 rdd.persist()
- Spark RDD概念学习系列之Spark的算子的分类(十一)
- Spark RDD概念学习系列之Spark Hash Shuffle内幕彻底解密(二十)
- Spark RDD概念学习系列之RDD的五大特征