您的位置:首页 > 大数据

大数据IMF传奇行动绝密课程第40课:CacheManager彻底解密

2016-08-26 10:10 501 查看

CacheManager彻底解密

1、CacheManager重大价值

2、CacheManager运行原理图

3、CacheManager源码解析

Spark厉害的原因

1、基于RDD构成了一体化多元化的大数据处理中心

2、迭代

一、CacheManager分析:

1、CacheManager管理的缓存,而缓存可以是基于内存的缓存,也可以是基于磁盘的缓存

2、CacheManager需要通过BlockManager来操作数据

3、当Task运行的时候会调用RDD的compute方法进行计算,而compute方法会调用iterator方法

/**
* Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
* This should ''not'' be called by users directly, but is available for implementors of custom
* subclasses of RDD.
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}


二、CacheManager源码详解

1、Cache在工作的时候会最大化的保留数据,但是数据不一定绝对完整,因为当前的计算如果需要内存空间的话,那么Cache在内存中的数据必须让出空间。此时如果在RDD持久化的时候同时指定了可以把数据放在Disk上,那么部分Cache的数据就可以从内存转入磁盘,否则的话,数据就会丢失!

2、具体CacheManager在获得缓存数据的时候会通过BlockManager来抓到数据

def get(blockId: BlockId): Option[BlockResult] = {
val local = getLocal(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
val remote = getRemote(blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}


如果无法找到cache,需要加锁进行计算。straggleTask推测,对于一个Partition在两台机器启动两个任务?

/**
* Acquire a loading lock for the partition identified by the given block ID.
*
* If the lock is free, just acquire it and return None. Otherwise, another thread is already
* loading the partition, so we wait for it to finish and return the values loaded by the thread.
*/
private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
loading.synchronized {
if (!loading.contains(id)) {
// If the partition is free, acquire its lock to compute its value
loading.add(id)
None
} else {
// Otherwise, wait for another thread to finish and return its result
logInfo(s"Another thread is loading $id, waiting for it to finish...")
while (loading.contains(id)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Exception while waiting for another thread to load $id", e)
}
}
logInfo(s"Finished waiting for $id")
val values = blockManager.get(id)
if (!values.isDefined) {
/* The block is not guaranteed to exist even after the other thread has finished.
* For instance, the block could be evicted after it was put, but before our get.
* In this case, we still need to load the partition ourselves. */
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}


3、如果CacheManager没有通过BlockManager获得缓存内容的话,此时会通过RDD的如下方法来获得数据

val computedValues = rdd.computeOrReadCheckpoint(partition, context)

上述方法首先会查看当前的RDD是否进行了Checkpoint,如果进行了的话,就直接读取Checkpoint的数据,否则的话就必须进行计算。

4、计算之后通过putInBlockManager会把数据按照StorageLevel重新缓存起来,缓存的时候如果需要放在内存中
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark
相关文章推荐