您的位置:首页 > 其它

RDD容错机制-checkpoint

2013-11-28 23:24 267 查看
1  checkpoint  (本质将RDD写入disk进行做检查点)checkpoint是为了lineage做辅助,血统过长会造成容错成本过高,这样的话就不如去中间做先checkpoint然后血统从checkpoint开始算起

在RDD

private[spark] def doCheckpoint() {

    if (!doCheckpointCalled) {

      doCheckpointCalled = true

      if (checkpointData.isDefined) {

        checkpointData.get.doCheckpoint()

      } else {

        dependencies.foreach(_.rdd.doCheckpoint())

      }

    }

  }

在RDDCheckpointData中

 def doCheckpoint() {

    // If it is marked for checkpointing AND checkpointing is not already in progress,

    // then set it to be in progress, else return

    RDDCheckpointData.synchronized {

      if (cpState == MarkedForCheckpoint) {

        cpState = CheckpointingInProgress

      } else {

        return

      }

    }

    // Create the output path for the checkpoint

    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)

    val fs = path.getFileSystem(new Configuration())

    if (!fs.mkdirs(path)) {

      throw new SparkException("Failed to create checkpoint path " + path)

    }

    // Save to file, and reload it as an RDD此处重要

    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)

    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)

    // Change the dependencies and partitions of the RDD

    RDDCheckpointData.synchronized {

      cpFile = Some(path.toString)

      cpRDD = Some(newRDD)

      rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions

      cpState = Checkpointed

      RDDCheckpointData.clearTaskCaches()

      logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)

    }

  }

在CheckPointRDD

def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {

    val env = SparkEnv.get

    val outputDir = new Path(path)

//本质相当于在hadoop 的 分布式文件系统将RDD写进去

    val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())

    val finalOutputName = splitIdToFile(ctx.splitId)

    val finalOutputPath = new Path(outputDir, finalOutputName)

    val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)

    if (fs.exists(tempOutputPath)) {

      throw new IOException("Checkpoint failed: temporary path " +

        tempOutputPath + " already exists")

    }

//此处体现  缓冲区大小的设置,根据数据量不同设置不同的缓冲区大小,即为 写入hdfs 的使用的缓冲区大小

    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt

    val fileOutputStream = if (blockSize < 0) {

      fs.create(tempOutputPath, false, bufferSize)

    } else {

      // This is mainly for testing purpose

      fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)

    }

    val serializer = env.serializer.newInstance()

    val serializeStream = serializer.serializeStream(fileOutputStream)

//此处为写入操作  关键在iterator上相当于将iteraor迭代器的对象序列化写到hdfs中与数据库的检查点略不相同

    serializeStream.writeAll(iterator)

    serializeStream.close()

    if (!fs.rename(tempOutputPath, finalOutputPath)) {

      if (!fs.exists(finalOutputPath)) {

        logInfo("Deleting tempOutputPath " + tempOutputPath)

        fs.delete(tempOutputPath, false)

        throw new IOException("Checkpoint failed: failed to save output of task: "

          + ctx.attemptId + " and final output path does not exist")

      } else {

        // Some other copy of this task must've finished before us and renamed it

        logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")

        fs.delete(tempOutputPath, false)

      }

    }

  }

///////////////////写入操作

trait SerializationStream {

  def writeObject[T](t: T): SerializationStream

  def flush(): Unit

  def close(): Unit

  def writeAll[T](iter: Iterator[T]): SerializationStream = {

    while (iter.hasNext) {

      writeObject(iter.next())

    }

    this

  }

}

kryo的序列化

private[spark]

class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {

  val output = new KryoOutput(outStream)

  def writeObject[T](t: T): SerializationStream = {

    kryo.writeClassAndObject(output, t)

    this

  }

  def flush() { output.flush() }

  def close() { output.close() }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: