您的位置:首页 > 其它

spark源码学习(十二)--- checkpoint机制分析

2016-03-03 21:33 417 查看
checkpoint原理:

上篇cacheManager源码分析文章中提到,当RDD使用cache机制从内存中读取数据,如果数据没有读到,会使用checkpoint机制读取数据。此时如果没有checkpoint机制,那么就需要找到父RDD重新计算数据了,因此checkpoint是个很重要的容错机制。checkpoint就是对于一个RDD chain,如果中间某些中间结果RDD,后面需要反复使用该数据,可能因为一些故障导致该中间数据丢失,那么就可以针对该RDD启动checkpoint机制,checkpoint,首先需要调用sparkContext的setCheckpoint方法,设置一个容错文件系统目录,比如hdfs,然后对RDD调用checkpoint方法。之后再RDD所处的job运行结束后,会启动一个单独的job,来讲checkpoint过的数据写入之前设置的文件系统持久化,进行高可用。所以后面的计算在使用该RDD时,如果数据丢失了,但是还是可以从它的checkpoint中读取数据,不需要重新计算。

checkpoint与persist或者cache的区别在于,持久化只是将数据保存在BlockManager中但是其lineage是不变的,但是checkpoint执行完后,rdd已经没有依赖RDD,只有一个checkpointRDD,checkpoint之后,RDD的lineage就改变了。而且,持久化的数据丢失的可能性更大,因为可能磁盘或内存被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。

checkpoint的分析还是从RDD的读取开始,正常情况下会调用cacheManager的方法获取持久化数据,但是当RDD不能从持久化的磁盘或者缓存中读取时候,就寻找checkpoint来读取数据:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}

computeOrReadCheckpoint首先判断是否checkpoint,如果有,那么调用父RDD的iterator方法,由于此时该RDD的lineage已经没有了,他的父rdd就是checkpointRDD。否则重新计算,进入computeOrReadCheckpoint方法:
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}此处调用其父RDD即checkpointRDD的compute方法来从Hadoop读取数据:
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
CheckpointRDD.readFromFile(file, broadcastedConf, context)
}

最后使用Hadoop的filesystem类来读取hdfs上的数据:
def readFromFile[T](
path: Path,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
context: TaskContext
): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(broadcastedConf.value.value)
val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
//调用Hadoop的fileSystem打开输入流
val fileInputStream = fs.open(path, bufferSize)
//读取数据
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)

// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => deserializeStream.close())

deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: