RDD容错机制-checkpoint
2013-11-28 23:24
267 查看
1 checkpoint (本质将RDD写入disk进行做检查点)checkpoint是为了lineage做辅助,血统过长会造成容错成本过高,这样的话就不如去中间做先checkpoint然后血统从checkpoint开始算起
在RDD
private[spark] def doCheckpoint() {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
在RDDCheckpointData中
def doCheckpoint() {
// If it is marked for checkpointing AND checkpointing is not already in progress,
// then set it to be in progress, else return
RDDCheckpointData.synchronized {
if (cpState == MarkedForCheckpoint) {
cpState = CheckpointingInProgress
} else {
return
}
}
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(new Configuration())
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// Save to file, and reload it as an RDD此处重要
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
cpFile = Some(path.toString)
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
RDDCheckpointData.clearTaskCaches()
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
}
}
在CheckPointRDD
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
//本质相当于在hadoop 的 分布式文件系统将RDD写进去
val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
if (fs.exists(tempOutputPath)) {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
//此处体现 缓冲区大小的设置,根据数据量不同设置不同的缓冲区大小,即为 写入hdfs 的使用的缓冲区大小
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
//此处为写入操作 关键在iterator上相当于将iteraor迭代器的对象序列化写到hdfs中与数据库的检查点略不相同
serializeStream.writeAll(iterator)
serializeStream.close()
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo("Deleting tempOutputPath " + tempOutputPath)
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: "
+ ctx.attemptId + " and final output path does not exist")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
fs.delete(tempOutputPath, false)
}
}
}
///////////////////写入操作
trait SerializationStream {
def writeObject[T](t: T): SerializationStream
def flush(): Unit
def close(): Unit
def writeAll[T](iter: Iterator[T]): SerializationStream = {
while (iter.hasNext) {
writeObject(iter.next())
}
this
}
}
kryo的序列化
private[spark]
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
val output = new KryoOutput(outStream)
def writeObject[T](t: T): SerializationStream = {
kryo.writeClassAndObject(output, t)
this
}
def flush() { output.flush() }
def close() { output.close() }
}
在RDD
private[spark] def doCheckpoint() {
if (!doCheckpointCalled) {
doCheckpointCalled = true
if (checkpointData.isDefined) {
checkpointData.get.doCheckpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
在RDDCheckpointData中
def doCheckpoint() {
// If it is marked for checkpointing AND checkpointing is not already in progress,
// then set it to be in progress, else return
RDDCheckpointData.synchronized {
if (cpState == MarkedForCheckpoint) {
cpState = CheckpointingInProgress
} else {
return
}
}
// Create the output path for the checkpoint
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
val fs = path.getFileSystem(new Configuration())
if (!fs.mkdirs(path)) {
throw new SparkException("Failed to create checkpoint path " + path)
}
// Save to file, and reload it as an RDD此处重要
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
cpFile = Some(path.toString)
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed
RDDCheckpointData.clearTaskCaches()
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
}
}
在CheckPointRDD
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
val outputDir = new Path(path)
//本质相当于在hadoop 的 分布式文件系统将RDD写进去
val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
val finalOutputName = splitIdToFile(ctx.splitId)
val finalOutputPath = new Path(outputDir, finalOutputName)
val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
if (fs.exists(tempOutputPath)) {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
//此处体现 缓冲区大小的设置,根据数据量不同设置不同的缓冲区大小,即为 写入hdfs 的使用的缓冲区大小
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
} else {
// This is mainly for testing purpose
fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
}
val serializer = env.serializer.newInstance()
val serializeStream = serializer.serializeStream(fileOutputStream)
//此处为写入操作 关键在iterator上相当于将iteraor迭代器的对象序列化写到hdfs中与数据库的检查点略不相同
serializeStream.writeAll(iterator)
serializeStream.close()
if (!fs.rename(tempOutputPath, finalOutputPath)) {
if (!fs.exists(finalOutputPath)) {
logInfo("Deleting tempOutputPath " + tempOutputPath)
fs.delete(tempOutputPath, false)
throw new IOException("Checkpoint failed: failed to save output of task: "
+ ctx.attemptId + " and final output path does not exist")
} else {
// Some other copy of this task must've finished before us and renamed it
logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
fs.delete(tempOutputPath, false)
}
}
}
///////////////////写入操作
trait SerializationStream {
def writeObject[T](t: T): SerializationStream
def flush(): Unit
def close(): Unit
def writeAll[T](iter: Iterator[T]): SerializationStream = {
while (iter.hasNext) {
writeObject(iter.next())
}
this
}
}
kryo的序列化
private[spark]
class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
val output = new KryoOutput(outStream)
def writeObject[T](t: T): SerializationStream = {
kryo.writeClassAndObject(output, t)
this
}
def flush() { output.flush() }
def close() { output.close() }
}
相关文章推荐
- RDD的依赖关系、窄依赖、宽依赖、RDD的缓存、RDD缓存方式、DAG的生成、RDD容错机制之Checkpoint
- Spark RDD概念学习系列之RDD的容错机制(十七)
- RDD之七:Spark容错机制
- sprak 容错机制-checkpoint
- 大数据框架对比:Hadoop、Storm、Samza、Spark和Flink--容错机制(ACK,RDD,基于log和状态快照),消息处理at least once,exactly once两个是关键
- 架构师之路--搜索业务和技术介绍及容错机制
- Hadoop2容错机制
- 第3课:SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错
- RDD:基于内存的集群计算容错抽象
- 常见容错机制:failover、failfast、failback、failsafe
- spark源码学习(十二)--- checkpoint机制分析
- Storm入门与实践(4)Storm的容错机制
- RDD论文翻译:基于内存的集群计算容错抽象
- spark源码之RDD(3)checkpoint
- InnoDB脏页刷新机制Checkpoint
- Spark RDD概念学习系列之RDD的checkpoint(九)
- RDD机制实现模型Spark初识
- 【Spark】Spark容错机制
- Spark定制班第3课:通过案例对SparkStreaming 透彻理解三板斧之三:解密SparkStreaming运行机制和架构进阶之Job和容错
- hadoop namenode的工作机制 (checkpoint过程、元数据合并一个意思)