您的位置:首页 > 其它

Spark应用开发中使用缓存或checkpoint实现容错

2016-02-21 21:30 281 查看
Spark应用开发中使用缓存或checkpoint实现容错,并且避免了多次重复进行RDD的计算。

1、如果程序中,对某一个RDD,基于它进行了多次transformation或者action操作。那么就非常有必要对其进行持久化操作,以避免对一个RDD反复进行计算。

使用方法:

val file = sc.textFile("/tmp/spark/1.data")
file.persist()
file.count()


2、如果要保证在RDD的持久化数据可能丢失的情况下,还要保证高性能,那么可以对RDD进行Checkpoint操作。checkpoint是针对外部存储系统的可靠性,比cache更高,通常为hdfs

使用方法:

val data = sc.parallelize(1 to 1000000, 15)
sc.setCheckpointDir("/tmp/spark/checkpoint")
import org.apache.spark.storage._
data.persist(StorageLevel.DISK_ONLY)
data.checkpoint
data.count


在hdfs上查看数据

hadoop fs -ls /tmp/spark/checkpoint/7c9c00ea-2bb1-4e35-8722-5f6331e03c3b/rdd-0



3、注意:如果需要进行checkpoint操作之前,给要checkpoint的RDD,先进行persist(StorageLevel.DISK_ONLY)

 * 先persist(),再用checkpoint()的原理如下:

 * 那么首先执行到该rdd的iterator()之后,会先发现storageLevel != StorageLevel.NONE

 * 那么就会通过CacheManager去获取数据,此时会发现通过BlockManager获取不到数据(因为是第一次执行)

 * 那么就会第一次还是会计算一次rdd的数据,然后通过CacheManager的putInBlockManager()将其通过BlockMananger进行持久化

 * rdd所在的job运行结束,然后启动单独job进行checkpoint操作,此时是不是又会执行到该rdd的iterator()方法

 * 那么,就会发现持久化级别不为空,默认BlockManager()直接读取持久化数据

 * 如果持久化数据丢,则会调用computeOrReadCheckpoint进行rdd partition的计算

 * 判断isCheckpointed为true,那么就会从外部分件系统读取数据
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: