大数据IMF传奇行动绝密课程第40课:CacheManager彻底解密
2016-08-26 10:10
501 查看
CacheManager彻底解密
1、CacheManager重大价值2、CacheManager运行原理图
3、CacheManager源码解析
Spark厉害的原因
1、基于RDD构成了一体化多元化的大数据处理中心
2、迭代
一、CacheManager分析:
1、CacheManager管理的缓存,而缓存可以是基于内存的缓存,也可以是基于磁盘的缓存
2、CacheManager需要通过BlockManager来操作数据
3、当Task运行的时候会调用RDD的compute方法进行计算,而compute方法会调用iterator方法
/** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }
二、CacheManager源码详解
1、Cache在工作的时候会最大化的保留数据,但是数据不一定绝对完整,因为当前的计算如果需要内存空间的话,那么Cache在内存中的数据必须让出空间。此时如果在RDD持久化的时候同时指定了可以把数据放在Disk上,那么部分Cache的数据就可以从内存转入磁盘,否则的话,数据就会丢失!
2、具体CacheManager在获得缓存数据的时候会通过BlockManager来抓到数据
def get(blockId: BlockId): Option[BlockResult] = { val local = getLocal(blockId) if (local.isDefined) { logInfo(s"Found block $blockId locally") return local } val remote = getRemote(blockId) if (remote.isDefined) { logInfo(s"Found block $blockId remotely") return remote } None }
如果无法找到cache,需要加锁进行计算。straggleTask推测,对于一个Partition在两台机器启动两个任务?
/** * Acquire a loading lock for the partition identified by the given block ID. * * If the lock is free, just acquire it and return None. Otherwise, another thread is already * loading the partition, so we wait for it to finish and return the values loaded by the thread. */ private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = { loading.synchronized { if (!loading.contains(id)) { // If the partition is free, acquire its lock to compute its value loading.add(id) None } else { // Otherwise, wait for another thread to finish and return its result logInfo(s"Another thread is loading $id, waiting for it to finish...") while (loading.contains(id)) { try { loading.wait() } catch { case e: Exception => logWarning(s"Exception while waiting for another thread to load $id", e) } } logInfo(s"Finished waiting for $id") val values = blockManager.get(id) if (!values.isDefined) { /* The block is not guaranteed to exist even after the other thread has finished. * For instance, the block could be evicted after it was put, but before our get. * In this case, we still need to load the partition ourselves. */ logInfo(s"Whoever was loading $id failed; we'll try it ourselves") loading.add(id) } values.map(_.data.asInstanceOf[Iterator[T]]) } } }
3、如果CacheManager没有通过BlockManager获得缓存内容的话,此时会通过RDD的如下方法来获得数据
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
上述方法首先会查看当前的RDD是否进行了Checkpoint,如果进行了的话,就直接读取Checkpoint的数据,否则的话就必须进行计算。
4、计算之后通过putInBlockManager会把数据按照StorageLevel重新缓存起来,缓存的时候如果需要放在内存中
相关文章推荐
- 大数据IMF传奇行动绝密课程第33课:Spark Executor内幕彻底解密
- 大数据IMF传奇行动绝密课程第15课:RDD创建内幕彻底解密
- 大数据IMF传奇行动绝密课程第25课:Spark Sort-Based Shuffle内幕彻底解密
- 大数据IMF传奇行动绝密课程第27课:Spark on Yarn彻底解密
- 大数据IMF传奇行动绝密课程第25课:Spark Sort-Based Shuffle内幕彻底解密
- 大数据IMF传奇行动绝密课程第53课:Spark性能优化第九季 Spark Tungsten内存使用彻底解密
- 大数据IMF传奇行动绝密课程第41课:Checkpoint彻底解密
- 大数据IMF传奇行动绝密课程第11课:彻底解密WordCount运行原理
- 大数据IMF传奇行动绝密课程第29课:Master HA彻底解密
- 大数据IMF传奇行动绝密课程第22课:RDD的依赖关系彻底解密
- 大数据IMF传奇行动绝密课程第19课:Spark高级排序彻底解密
- 大数据IMF传奇行动绝密课程第31课:Spark资源调度分配内幕天机彻底解密
- 大数据IMF传奇行动绝密课程第22课:RDD的依赖关系彻底解密
- 大数据IMF传奇行动绝密课程第11课:彻底解密WordCount运行原理
- 大数据IMF传奇行动绝密课程第24课:Spark Shuffle内幕彻底揭秘
- 大数据IMF传奇行动绝密课程第61课:Spark SQL数据加载和保存内幕深度解密实战
- 大数据IMF传奇行动绝密课程第12课:HA下的Spark集群工作原理解密
- 大数据IMF传奇行动绝密课程第63课:Spark SQL下Parquet内幕深度解密
- 大数据IMF传奇行动绝密课程第30课:Master的注册机制和状态管理解密
- 大数据IMF传奇行动绝密课程第42课:Checkpoint内幕解密