第13课:Spark Streaming源码解读之Driver容错安全性
2016-05-22 21:41
423 查看
本期内容:
1.ReceivedBlockTracker容错安全性
2. DStream和JobGenerator容错安全性
ReceivedBlockTracker跟踪数据需要状态。
DStream表达了依赖关系,在你恢复的数据时候需要恢复计算逻辑级别依赖关系,当然需要容错。
JobGenerator:作业生成层面,表明你正在基于怎么ReceivedBlockTracker中的数据以及DStream构成的依赖关系,不断的产生job的过程。
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult =
writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received "
+
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream
${receivedBlockInfo.streamId} receiving "
+
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block
$receivedBlockInfo", e)
false
}
}
allocateBlocksToBatch
/**
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime ==
null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x =>
true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
//至上一次分配完后job的时间
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch
$batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch
$batchTime need to be processed again in WAL recovery")
}
}
def
dequeueAll(p: A => Boolean): Seq[A] = {
if (first0.isEmpty)
Seq.empty
else {
val res =
new ArrayBuffer[A]
while ((first0.nonEmpty) && p(first0.elem)) {
res += first0.elem
first0 = first0.next
decrementLength()
}
if (first0.isEmpty) res
else removeAllFromList(p, res)
}
}
private def
removeAllFromList(p: A => Boolean, res: ArrayBuffer[A]): ArrayBuffer[A] = {
var leftlst = first0
while (leftlst.next.nonEmpty) {
if (p(leftlst.next.elem)) {
res += leftlst.next.elem
if (leftlst.next eq last0) last0 = leftlst
leftlst.next = leftlst.next.next
decrementLength()
} else leftlst = leftlst.next
}
res
}
private val
streamIdToUnallocatedBlockQueues =
new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks =
new mutable.HashMap[Time, AllocatedBlocks]
private val writeAheadLogOption = createWriteAheadLog()
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) =>
generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
/** Generate jobs and perform checkpoint for the given
`time`.
*/
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match
{
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time "
+ time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
extends Logging {
private val thread =
new Thread("RecurringTimer - "
+ name) {
setDaemon(true)
override def run() {
loop }
}
/**
* Repeatedly call the callback every interval.
*/
private def loop() {
try {
while (!stopped) {
triggerActionForNextInterval()
}
triggerActionForNextInterval()
} catch {
case e: InterruptedException =>
}
}
}
private def
triggerActionForNextInterval(): Unit = {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name +
" called at time " + prevTime)
}
private val
timer = new
RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new
Time(longTime))), "JobGenerator")
再看看checkpoint
/** Perform checkpoint for the give
`time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time "
+ time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
def
updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time "
+ time)
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
logInfo("Updated checkpoint data for time "
+ time)
}
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
1.ReceivedBlockTracker容错安全性
2. DStream和JobGenerator容错安全性
ReceivedBlockTracker跟踪数据需要状态。
DStream表达了依赖关系,在你恢复的数据时候需要恢复计算逻辑级别依赖关系,当然需要容错。
JobGenerator:作业生成层面,表明你正在基于怎么ReceivedBlockTracker中的数据以及DStream构成的依赖关系,不断的产生job的过程。
源码
ReceivedBlockTracker
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
try {
val writeResult =
writeToLog(BlockAdditionEvent(receivedBlockInfo))
if (writeResult) {
synchronized {
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
logDebug(s"Stream ${receivedBlockInfo.streamId} received "
+
s"block ${receivedBlockInfo.blockStoreResult.blockId}")
} else {
logDebug(s"Failed to acknowledge stream
${receivedBlockInfo.streamId} receiving "
+
s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
}
writeResult
} catch {
case NonFatal(e) =>
logError(s"Error adding block
$receivedBlockInfo", e)
false
}
}
allocateBlocksToBatch
/**
* Allocate all unallocated blocks to the given batch.
* This event will get written to the write ahead log (if enabled).
*/
def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
if (lastAllocatedBatchTime ==
null || batchTime > lastAllocatedBatchTime) {
val streamIdToBlocks = streamIds.map { streamId =>
(streamId, getReceivedBlockQueue(streamId).dequeueAll(x =>
true))
}.toMap
val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
//至上一次分配完后job的时间
lastAllocatedBatchTime = batchTime
} else {
logInfo(s"Possibly processed batch
$batchTime need to be processed again in WAL recovery")
}
} else {
// This situation occurs when:
// 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
// possibly processed batch job or half-processed batch job need to be processed again,
// so the batchTime will be equal to lastAllocatedBatchTime.
// 2. Slow checkpointing makes recovered batch time older than WAL recovered
// lastAllocatedBatchTime.
// This situation will only occurs in recovery time.
logInfo(s"Possibly processed batch
$batchTime need to be processed again in WAL recovery")
}
}
def
dequeueAll(p: A => Boolean): Seq[A] = {
if (first0.isEmpty)
Seq.empty
else {
val res =
new ArrayBuffer[A]
while ((first0.nonEmpty) && p(first0.elem)) {
res += first0.elem
first0 = first0.next
decrementLength()
}
if (first0.isEmpty) res
else removeAllFromList(p, res)
}
}
private def
removeAllFromList(p: A => Boolean, res: ArrayBuffer[A]): ArrayBuffer[A] = {
var leftlst = first0
while (leftlst.next.nonEmpty) {
if (p(leftlst.next.elem)) {
res += leftlst.next.elem
if (leftlst.next eq last0) last0 = leftlst
leftlst.next = leftlst.next.next
decrementLength()
} else leftlst = leftlst.next
}
res
}
private val
streamIdToUnallocatedBlockQueues =
new mutable.HashMap[Int, ReceivedBlockQueue]
private val timeToAllocatedBlocks =
new mutable.HashMap[Time, AllocatedBlocks]
private val writeAheadLogOption = createWriteAheadLog()
JobGenerator.scala
/** Processes all events */private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) =>
generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
/** Generate jobs and perform checkpoint for the given
`time`.
*/
private def generateJobs(time: Time) {
// Set the SparkEnv in this thread, so that job generation code can access the environment
// Example: BlockRDDs are created in this thread, and it needs to access BlockManager
// Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
SparkEnv.set(ssc.env)
Try {
jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
graph.generateJobs(time) // generate jobs using allocated block
} match
{
case Success(jobs) =>
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time "
+ time, e)
}
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}
RecurringTimer
private[streaming]class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
extends Logging {
private val thread =
new Thread("RecurringTimer - "
+ name) {
setDaemon(true)
override def run() {
loop }
}
/**
* Repeatedly call the callback every interval.
*/
private def loop() {
try {
while (!stopped) {
triggerActionForNextInterval()
}
triggerActionForNextInterval()
} catch {
case e: InterruptedException =>
}
}
}
private def
triggerActionForNextInterval(): Unit = {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name +
" called at time " + prevTime)
}
private val
timer = new
RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventLoop.post(GenerateJobs(new
Time(longTime))), "JobGenerator")
JobGenerator.scala
createWriteAheadLog再看看checkpoint
JobGenerator
/** Perform checkpoint for the give
`time`. */
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time "
+ time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}
def
updateCheckpointData(time: Time) {
logInfo("Updating checkpoint data for time "
+ time)
this.synchronized {
outputStreams.foreach(_.updateCheckpointData(time))
}
logInfo("Updated checkpoint data for time "
+ time)
}
/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
相关文章推荐
- 【MVC】初识
- Dos命令编译C#文件
- 聊一聊这三个月
- SQL命令和常用语句大全
- java设计对象处理
- Android图片压缩(包含拍照或从相册选取图片,PopupWindow的使用)
- 转转 iOS多线程的初步研究
- 守护进程的作用
- Android系统文件夹组织结构
- Android中使用HttpConnection发送中文到服务器端乱码解决办法
- Poj 2790:迷宫
- shell学习-特殊变量
- 关于在服务器上发布网站遇到的两个问题之解决方案
- 学习进度条
- Android Notification的基本应用 (8.1)
- 枚举enum
- 大小端模式
- 极限学习机学习笔记
- (Android studio)关于drawable文件夹的问题
- 各类排序算法总结