您的位置:首页 > 其它

sprak 容错机制-checkpoint

2018-01-11 23:36 162 查看
   我们知道spark具有很强的数据容错机制,为了保证RDD的完整性,RDD 通过血统(Lineage)的关系,它采用粗粒度的方式记录了RDD的演变过程,这种方式相比于细粒度的方式确实限制了spark的运用场景,但是它却提高了spark的性能。



当RDD在运行的过程中,出现错误导致数据不完整,这时spark会根据血统的关系,重新从头计算RDD的方式来恢复数据,这样在RDD的迭代次数比较少时,性能不会有太大差别,但是通常在使用spark执行机器学习算法时,往往需要迭代上百次,假如一个机器学习算法需要迭代RDD100次,但是在执行第100次时,spark出现故障,为保证数据的完整性,spark需要从头开始重新计算RDD,这样会导致spark的性能下降,为了应对这种情况,spark提供了另外一个机制-Checkpoint。

checkPoint可以将中间执行的RDD缓存到磁盘,当后面的RDD在执行时出现问题,spark运行机制就不必从头重新计算RDD,只需在checkPoint点获取数据重新计算后面的RDD即可,这样对于迭代次数比较多的spark任务,可以很好的提高其运行性能。下面看一下checkPoint的spark源码。

/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}在源码的注释中可以看出,checkpoint的RDD将会保存到通过SparkContext设置的CheckPoint的目录下面,并且会移除checkpoint的RDD之前所有的RDD,还有就是checkpoint方法要在RDD执行action方法之前调用。注释的后半句也是相当重要,强烈建议RDD持久化到内存中在进行checkpoint操作,不然在checkpoint操作时,将会重新计算RDD,这样会很影响性能。
温馨提示:在进行checkpoint操作时 ,请先设置checkpoint保存的目录

具体设置方式如:sc.setCheckpointDir("hdfs://data/checkpoint20180122")

否则将会如源码所写抛出checkpoint目录在SparkContext中没有设置异常。 throw new SparkException("Checkpoint directory has not been set in the SparkContext")
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息