您的位置:首页 > 其它

第13课:Spark Streaming源码解读之Driver容错安全性

2016-05-22 21:41 423 查看
本期内容:

1.ReceivedBlockTracker容错安全性

2. DStream和JobGenerator容错安全性

 

ReceivedBlockTracker跟踪数据需要状态。

DStream表达了依赖关系,在你恢复的数据时候需要恢复计算逻辑级别依赖关系,当然需要容错。

JobGenerator:作业生成层面,表明你正在基于怎么ReceivedBlockTracker中的数据以及DStream构成的依赖关系,不断的产生job的过程。

源码

ReceivedBlockTracker

 

/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {

  try {

    val writeResult =
writeToLog(BlockAdditionEvent(receivedBlockInfo))

    if (writeResult) {

      synchronized {

        getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo

      }

      logDebug(s"Stream ${receivedBlockInfo.streamId} received "
+

        s"block ${receivedBlockInfo.blockStoreResult.blockId}")

    } else {

      logDebug(s"Failed to acknowledge stream
$
{receivedBlockInfo.streamId} receiving "
+

        s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")

    }

    writeResult

  } catch {

    case NonFatal(e) =>

      logError(s"Error adding block
$
receivedBlockInfo", e)

      false

 
}

}
allocateBlocksToBatch
/**

 * Allocate all unallocated blocks to the given batch.

 * This event will get written to the write ahead log (if enabled).

 */
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {

  if (lastAllocatedBatchTime ==
null || batchTime > lastAllocatedBatchTime) {

    val streamIdToBlocks = streamIds.map { streamId =>

        (streamId, getReceivedBlockQueue(streamId).dequeueAll(x =>
true))

    }.toMap

    val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)

    if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {

      timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
//至上一次分配完后job的时间

      lastAllocatedBatchTime = batchTime

    } else {

      logInfo(s"Possibly processed batch
$
batchTime need to be processed again in WAL recovery")

    }

  } else {

    // This situation occurs when:

    // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,

    // possibly processed batch job or half-processed batch job need to be processed again,

    // so the batchTime will be equal to lastAllocatedBatchTime.

    // 2. Slow checkpointing makes recovered batch time older than WAL recovered

    // lastAllocatedBatchTime.

    // This situation will only occurs in recovery time.

    logInfo(s"Possibly processed batch
$
batchTime need to be processed again in WAL recovery")

  }

}
 

def
dequeueAll(p: A => Boolean): Seq[A] = {

  if (first0.isEmpty)

    Seq.empty

  else {

    val res =
new ArrayBuffer[A]

    while ((first0.nonEmpty) && p(first0.elem)) {

      res += first0.elem

      first0 = first0.next

      decrementLength()

    }

    if (first0.isEmpty) res

    else removeAllFromList(p, res)

  }

}
private def
removeAllFromList(p: A => Boolean, res: ArrayBuffer[A]): ArrayBuffer[A] = {

  var leftlst = first0

  while (leftlst.next.nonEmpty) {

    if (p(leftlst.next.elem)) {

      res += leftlst.next.elem

      if (leftlst.next eq last0) last0 = leftlst

      leftlst.next = leftlst.next.next

      decrementLength()

    } else leftlst = leftlst.next

  }

  res

}
private val
streamIdToUnallocatedBlockQueues =
new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks =
new mutable.HashMap[Time, AllocatedBlocks]
private val writeAheadLogOption = createWriteAheadLog()

JobGenerator.scala

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {

  logDebug("Got event " + event)

  event match {

    case GenerateJobs(time) =>
generateJobs(time)

    case ClearMetadata(time) => clearMetadata(time)

    case DoCheckpoint(time, clearCheckpointDataLater) =>

      doCheckpoint(time, clearCheckpointDataLater)

    case ClearCheckpointData(time) => clearCheckpointData(time)

  }

}
 

/** Generate jobs and perform checkpoint for the given
`time`. 
*/
private def generateJobs(time: Time) {

  // Set the SparkEnv in this thread, so that job generation code can access the environment

  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager

  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.

  SparkEnv.set(ssc.env)

  Try {

    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch

    graph.generateJobs(time) // generate jobs using allocated block

  } match
{

    case Success(jobs) =>

      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

    case Failure(e) =>

      jobScheduler.reportError("Error generating jobs for time "
+ time, e)

  }

  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))

}

RecurringTimer

private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)

  extends Logging {

  private val thread =
new Thread("RecurringTimer - "
+ name) {

    setDaemon(true)

    override def run() {
loop }

  }
/**

   * Repeatedly call the callback every interval.

   */

  private def loop() {

    try {

      while (!stopped) {

        triggerActionForNextInterval()

      }

      triggerActionForNextInterval()

    } catch {

      case e: InterruptedException =>

    }

  }

}
private def
triggerActionForNextInterval(): Unit = {

  clock.waitTillTime(nextTime)

  callback(nextTime)

  prevTime = nextTime

  nextTime += period

  logDebug("Callback for " + name +
" called at time " + prevTime)

}
 

private val
timer = new
RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,

  longTime => eventLoop.post(GenerateJobs(new
Time(longTime))), "JobGenerator")

JobGenerator.scala

createWriteAheadLog

 

再看看checkpoint

JobGenerator

 

/** Perform checkpoint for the give
`time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {

  if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {

    logInfo("Checkpointing graph for time "
+ time)

    ssc.graph.updateCheckpointData(time)

    checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)

  }

}
def
updateCheckpointData(time: Time) {

  logInfo("Updating checkpoint data for time "
+ time)

  this.synchronized {

    outputStreams.foreach(_.updateCheckpointData(time))

  }

  logInfo("Updated checkpoint data for time "
+ time)

}
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {

  logDebug("Got event " + event)

  event match {

    case GenerateJobs(time) => generateJobs(time)

    case ClearMetadata(time) => clearMetadata(time)

    case DoCheckpoint(time, clearCheckpointDataLater) =>

      doCheckpoint(time, clearCheckpointDataLater)

    case ClearCheckpointData(time) => clearCheckpointData(time)

  }

}
 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: