您的位置:首页 > 其它

Spark源码之checkpoint方法解析

2017-07-11 13:10 477 查看
今天在阅读Spark源码的时候看到了checkpoint方法,之前也在处理数据的的时候用到过,但是没有深入理解这个方法,今天结合官方文档以及网上博客重新认识了一下这个方法,这里做个总结。主要从两个方面讲解:

1.官方对这个方法的解释

2.这个方法的使用场景

checkpoint官方源码以及解释

/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its  parent RDDs will be removed.
* 给这个RDD做checkpoint,之后这个RDD会以file的形式保存在之前设定的checkpoint目录下面并且
* 此RDD之前的父RDD们会被删除
* This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
* 这个方法必须在RDD执行任何Job之前被使用。并且这里强烈推荐在checkpoint之前应该将该RDD持久化到内存中去,否则将它保存为一个文件会做重复计算的多余工作
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
//这里可以看出,在使用checkpoint方法之前必须要设置checkpoint的保存目录,否则会抛出异常
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}


从上面的解释可以看出,checkpoint方法是一个action算子,会触发一个job,所以在执行该方法前需要将RDD持久化,不然执行下一个job的时候会重新计算该RDD,造成时间效率较低。

应用场景

checkpoint方法其实是将RDD之前的血统lineage关系持久化到磁盘上去,这样在计算过程中如果出现错误情况,可以直接读取磁盘上的持久化的RDD,而避免了之前的许多重复计算,也是容错的一种手段,特别适用于迭代次数很多,数据量较大的计算场景。试想一下,如果你对大量的数据做10000此迭代处理,在第9998次迭代出现错误,如果之前没有做checkpoint处理的话,那么根据RDD的自我容错机制会重新计算出现错误的分区,这样也就是说需要重新进行之前的9998次迭代计算,势必会造成时间效率低下;如果之前在9997次做了checkpoint的话,那么即使在第9998次出现错误,程序会加载第9997次保存的RDD然后再从第9998次重新计算即可,不需要再计算前面那么多次的迭代了,极大的节省的计算时间,所以当迭代次数很多,数据量较大的时候,适当的做checkpoint是很有必要的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark