Apache Flink fault tolerance源码剖析(二)
2016-05-29 21:14
525 查看
继续Flink Fault Tolerance机制剖析。上一篇文章我们结合代码讲解了Flink中检查点是如何应用的(如何根据快照做失败恢复,以及检查点被应用的场景),这篇我们来谈谈检查点的触发机制以及基于
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
该类可以理解为检查点的协调器,用来协调
方法的实现包含两个主要动作:
启动检查点ID计数器
启动触发检查点的定时任务
首先,方法中会去判断一个flag:
如果不能被立即执行,则直接返回。
不能被立即执行的原因是:还有其他处理没有完成。
接着检查正在并发处理的未完成的检查点:
如果未完成的检查点过多,大于允许的并发处理的检查点数目的阈值,则将当前检查点的触发请求设置为不能立即执行,如果定时任务已经启动,则取消定时任务的执行,并返回。
以上这些检查处于基于锁机制实现的同步代码块中。
接着检查需要被触发检查点的
只要有一个
然后检查是否所有需要ack检查点的
如果有一个
以上条件都满足(即没有
下一步,获得
这依赖于该方法的另一个参数
接着创建一个
该类表示一个待处理的检查点。
与此同时,会定义一个针对当前检查点超时进行资源清理的取消器
然后会再次进入同步代码段,对上面的是否新建检查点的判断条件做二次检查,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得
检查后,如果触发检查点的条件仍然是满足的,那么将上面创建的
同时会启动针对当前检查点的超时取消器:
接下来会发送消息给
类
该
该
既然,是基于消息驱动机制,那么就需要各种类型的消息对应不同的业务逻辑。这些消息在Flink中被定义在package:
类图如下:
job:
taskExecutionId:
checkpointId:当前消息协调的检查点ID
除此之外,该实现仅仅override了
主要是触发检查点屏障
它除了
具体的实现在
首先从接收的消息中(
接下来的逻辑是判断当前检查点是否是未完成的检查点:
接下来分为三种情况对待:
如果是未完成的检查点,并且相关资源没有被释放(检查点没有被
置
接下来查找是否还有待处理的检查点,根据检查点时间戳来判断:
根据标识
如果有需要处理的检查点,并且当前能立即处理,则立即触发检查点定时任务;如果有需要处理的检查点,但不能立即处理,则触发入队的定时任务。
如果是未完成的检查点,并且检查点已经被
抛出
如果不是未完成的检查点
如果在最近未完成的检查点列表中找到,则有可能表示消息来迟了,将
最后返回
state
stateSize
检查点首先应答相关的
最后,如果该检查点被转化为已完成的检查点,则:
迭代所有待commit的
实现:
主要是触发
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)
Actor的消息驱动的协同机制。这篇涉及到一个非常关键的类——
CheckpointCoordinator。
org.apache.flink.runtime.checkpoint.CheckpointCoordinator
该类可以理解为检查点的协调器,用来协调
operator和
state的分布式快照。
周期性的检查点触发机制
检查点的触发机制是基于定时器的周期性触发。这涉及到一个定时器的实现类ScheduledTrigger
ScheduledTrigger
触发检查点的定时任务类。其实现就是调用triggerCheckpoint方法。这个方法后面会具体介绍。
public void run() { try { triggerCheckpoint(System.currentTimeMillis()); } catch (Exception e) { LOG.error("Exception while triggering checkpoint", e); } }
startCheckpointScheduler
启动触发检查点的定时任务的方法实现:public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); try { // Multiple start calls are OK checkpointIdCounter.start(); } catch (Exception e) { String msg = "Failed to start checkpoint ID counter: " + e.getMessage(); throw new RuntimeException(msg, e); } periodicScheduling = true; currentPeriodicTrigger = new ScheduledTrigger(); timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval); } }
方法的实现包含两个主要动作:
启动检查点ID计数器
checkpointIdCounter
启动触发检查点的定时任务
stopCheckpointScheduler
关闭定时任务的方法,用来释放资源,重置一些标记变量。triggerCheckpoint
该方法是触发一个新的检查点的核心逻辑。首先,方法中会去判断一个flag:
triggerRequestQueued。该标识表示是否一个检查点的触发请求不能被立即执行。
// sanity check: there should never be more than one trigger request queued if (triggerRequestQueued) { LOG.warn("Trying to trigger another checkpoint while one was queued already"); return false; }
如果不能被立即执行,则直接返回。
不能被立即执行的原因是:还有其他处理没有完成。
接着检查正在并发处理的未完成的检查点:
// if too many checkpoints are currently in progress, we need to mark that a request is queued if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) { triggerRequestQueued = true; if (currentPeriodicTrigger != null) { currentPeriodicTrigger.cancel(); currentPeriodicTrigger = null; } return false; }
如果未完成的检查点过多,大于允许的并发处理的检查点数目的阈值,则将当前检查点的触发请求设置为不能立即执行,如果定时任务已经启动,则取消定时任务的执行,并返回。
以上这些检查处于基于锁机制实现的同步代码块中。
接着检查需要被触发检查点的
task是否都处于运行状态:
ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); if (ee != null && ee.getState() == ExecutionState.RUNNING) { triggerIDs[i] = ee.getAttemptId(); } else { LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", tasksToTrigger[i].getSimpleName()); return false; } }
只要有一个
task不满足条件,则不会触发检查点,并立即返回。
然后检查是否所有需要ack检查点的
task都处于运行状态:
Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length); for (ExecutionVertex ev : tasksToWaitFor) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ackTasks.put(ee.getAttemptId(), ev); } else { LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", ev.getSimpleName()); return false; } }
如果有一个
task不满足条件,则不会触发检查点,并立即返回。
以上条件都满足(即没有
return false;),才具备触发一个检查点的基本条件。
下一步,获得
checkpointId:
final long checkpointID; if (nextCheckpointId < 0) { try { // this must happen outside the locked scope, because it communicates // with external services (in HA mode) and may block for a while. checkpointID = checkpointIdCounter.getAndIncrement(); } catch (Throwable t) { int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers; LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t); return false; } } else { checkpointID = nextCheckpointId; }
这依赖于该方法的另一个参数
nextCheckpointId,如果其值为
-1,则起到标识的作用,指示
checkpointId将从外部获取(比如
Zookeeper,后续文章会谈及检查点ID的生成机制)。
接着创建一个
PendingCheckpoint对象:
final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
该类表示一个待处理的检查点。
与此同时,会定义一个针对当前检查点超时进行资源清理的取消器
canceller。该取消器主要是针对检查点没有释放资源的情况进行资源释放操作,同时还会调用
triggerQueuedRequests方法启动一个触发检查点的定时任务,如果有的话(取决于
triggerRequestQueued是否为true)。
然后会再次进入同步代码段,对上面的是否新建检查点的判断条件做二次检查,防止产生竞态条件。这里做二次检查的原因是,中间有一段关于获得
checkpointId的代码,不在同步块中。
检查后,如果触发检查点的条件仍然是满足的,那么将上面创建的
PendingCheckpoint对象加入集合中:
pendingCheckpoints.put(checkpointID, checkpoint);
同时会启动针对当前检查点的超时取消器:
timer.schedule(canceller, checkpointTimeout);
接下来会发送消息给
task以真正触发检查点(基于消息驱动的协同机制):
for (int i = 0; i < tasksToTrigger.length; i++) { ExecutionAttemptID id = triggerIDs[i]; TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp); tasksToTrigger[i].sendMessageToCurrentExecution(message, id); }
基于Actor的消息驱动的协同机制
上面已经谈到了检查点的触发机制是基于定时任务的周期性触发,那么定时任务的启停机制又是什么?Flink使用的是基于AKKA的Actor模型的消息驱动机制。类
CheckpointCoordinatorDeActivator是一个
Actor的实现,它用于基于消息来驱动检查点的定时任务的启停:
public void handleMessage(Object message) { if (message instanceof ExecutionGraphMessages.JobStatusChanged) { JobStatus status = ((ExecutionGraphMessages.JobStatusChanged) message).newJobStatus(); if (status == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } } // we ignore all other messages }
该
Actor会收到
Job状态的变化通知:
JobStatusChanged。一旦变成
RUNNING,那么检查点的定时任务会被立即启动;否则会被立即关闭。
该
Actor被创建的代码是
CheckpointCoordinator中的
createActivatorDeactivator方法:
public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } if (jobStatusListener == null) { Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID); // wrap the ActorRef in a AkkaActorGateway to support message decoration jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID); } return jobStatusListener; } }
既然,是基于消息驱动机制,那么就需要各种类型的消息对应不同的业务逻辑。这些消息在Flink中被定义在package:
org.apache.flink.runtime.messages.checkpoint中。
类图如下:
AbstractCheckpointMessage
检查点消息的基础抽象类,提供了三个公共属性(从构造器注入):job:
JobID的实例,表示当前这条消息实例的归属;
taskExecutionId:
ExecutionAttemptID的实例,表示检查点的源/目的任务
checkpointId:当前消息协调的检查点ID
除此之外,该实现仅仅override了
hashCode和
equals方法。
TriggerCheckpoint
该消息由JobManager发送给
TaskManager,用于告诉一个
task触发它的检查点。
触发消息
位于CheckpointCoordinator类的
triggerCheckpoint中,上面已经提及过。
for (int i = 0; i < tasksToTrigger.length; i++) { ExecutionAttemptID id = triggerIDs[i]; TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp); tasksToTrigger[i].sendMessageToCurrentExecution(message, id); }
消息处理
TaskManager的
handleCheckpointingMessage实现:
case message: TriggerCheckpoint => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { task.triggerCheckpointBarrier(checkpointId, timestamp) } else { log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.") }
主要是触发检查点屏障
Barrier。
DeclineCheckpoint
该消息由TaskManager发送给
JobManager,用于告诉检查点协调器:检查点的请求还没有能够被处理。这种情况通常发生于:某
task已处于
RUNNING状态,但在内部可能还没有准备好执行检查点。
它除了
AbstractCheckpointMessage需要的三个属性外,还需要用于关联检查点的
timestamp。
触发消息
位于Task类的
triggerCheckpointBarrier方法中:
Runnable runnable = new Runnable() { @Override public void run() { try { boolean success = statefulTask.triggerCheckpoint(checkpointID, checkpointTimestamp); if (!success) { DeclineCheckpoint decline = new DeclineCheckpoint(jobId, getExecutionId(), checkpointID, checkpointTimestamp); jobManager.tell(decline); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new RuntimeException( "Error while triggering checkpoint for " + taskName, t)); } } } };
消息处理
位于JobManager的
handleCheckpointMessage中
具体的实现在
CheckpointCoordinator的
receiveDeclineMessage中:
首先从接收的消息中(
DeclineCheckpoint)获得检查点编号:
final long checkpointId = message.getCheckpointId();
接下来的逻辑是判断当前检查点是否是未完成的检查点:
isPendingCheckpoint
接下来分为三种情况对待:
如果是未完成的检查点,并且相关资源没有被释放(检查点没有被
discarded)
isPendingCheckpoint = true; pendingCheckpoints.remove(checkpointId); checkpoint.discard(userClassLoader); rememberRecentCheckpointId(checkpointId);
置
isPendingCheckpoint为
true,根据检查点编号,将检查点从未完成的检查点集合中移除,
discard检查点,记住最近的检查点(将其保持到到一个最近的检查点列表中)。
接下来查找是否还有待处理的检查点,根据检查点时间戳来判断:
boolean haveMoreRecentPending = false; Iterator<Map.Entry<Long, PendingCheckpoint>> entries = pendingCheckpoints.entrySet().iterator(); while (entries.hasNext()) { PendingCheckpoint p = entries.next().getValue(); if (!p.isDiscarded() && p.getCheckpointTimestamp() >= checkpoint.getCheckpointTimestamp()) { haveMoreRecentPending = true; break; } }
根据标识
haveMoreRecentPending来进入不同的处理逻辑:
if (!haveMoreRecentPending && !triggerRequestQueued) { LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId); triggerCheckpoint(System.currentTimeMillis()); } else if (!haveMoreRecentPending) { LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId); triggerQueuedRequests(); }
如果有需要处理的检查点,并且当前能立即处理,则立即触发检查点定时任务;如果有需要处理的检查点,但不能立即处理,则触发入队的定时任务。
如果是未完成的检查点,并且检查点已经被
discarded
抛出
IllegalStateException异常
如果不是未完成的检查点
如果在最近未完成的检查点列表中找到,则有可能表示消息来迟了,将
isPendingCheckpoint置为
true,否则将
isPendingCheckpoint置为
false.
最后返回
isPendingCheckpoint。
AcknowledgeCheckpoint
该消息是一个应答信号,表示某个独立的task的检查点已经完成。也是由
TaskManager发送给
JobManager。该消息会携带
task的状态:
state
stateSize
触发消息
RuntimeEnvironment类的
acknowledgeCheckpoint方法。
消息处理
具体的实现在CheckpointCoordinator的
receiveAcknowledgeMessage中,开始的实现同
receiveDeclineMessage,也是判断当前接收到的消息中包含的检查点是否是待处理的检查点。如果是,并且也没有
discard掉,则执行如下逻辑:
if (checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getState(), message.getStateSize())) { if (checkpoint.isFullyAcknowledged()) { completed = checkpoint.toCompletedCheckpoint(); completedCheckpointStore.addCheckpoint(completed); LOG.info("Completed checkpoint " + checkpointId + " (in " + completed.getDuration() + " ms)"); LOG.debug(completed.getStates().toString()); pendingCheckpoints.remove(checkpointId); rememberRecentCheckpointId(checkpointId); dropSubsumedCheckpoints(completed.getTimestamp()); onFullyAcknowledgedCheckpoint(completed); triggerQueuedRequests(); } }
检查点首先应答相关的
task,如果检查点已经完全应答完成,则将检查点转换成
CompletedCheckpoint,然后将其加入
completedCheckpointStore列表,并从
pendingCheckpoints中移除。然后调用
dropSubsumedCheckpoints它会从
pendingCheckpoints中
diacard所有时间戳小于当前检查点的时间戳,并从集合中移除。
最后,如果该检查点被转化为已完成的检查点,则:
if (completed != null) { final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp); ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); } } statsTracker.onCompletedCheckpoint(completed); }
迭代所有待commit的
task,发送
NotifyCheckpointComplete消息。同时触发状态跟踪器的
onCompletedCheckpoint回调方法。
NotifyCheckpointComplete
该消息由JobManager发送给
TaskManager,用于告诉一个
task它的检查点已经得到完成确认,
task可以向第三方提交该检查点。
触发消息
位于CheckpointCoordinator类的
receiveAcknowledgeMessage方法中,当检查点ack
task完成,转化为
CompletedCheckpoint之后
if (completed != null) { final long timestamp = completed.getTimestamp(); for (ExecutionVertex ev : tasksToCommitTo) { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp); ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); } } statsTracker.onCompletedCheckpoint(completed); }
消息处理
TaskManager的
handleCheckpointingMessage
实现:
case message: NotifyCheckpointComplete => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { task.notifyCheckpointComplete(checkpointId) } else { log.debug( s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.") }
主要是触发
task的
notifyCheckpointComplete方法。
小结
这篇文章主要讲解了检查点的基于定时任务的周期性的触发机制,以及基于Akka的Actor模型的消息驱动的协同处理机制。
微信扫码关注公众号:Apache_Flink
QQ扫码关注QQ群:Apache Flink学习交流群(123414680)
相关文章推荐
- Apache-commons.BeanUtils浅析
- Windows 上 LAMP 环境 搭建 (MySQL5.7+PHP7+Apache2.4)
- Apache Bench (ab)测试出现 Failed requests
- org/apache/commons/discovery/tools/DiscoverSingleton
- 压测工具的实践
- 使用Apache CXF开发Web Service步骤
- Apache POI 解析 microsoft word 图片文字都不放过
- 怎么优化设置apache的并发数量
- 禁止外部用户访问 apache 上的 应用
- 简单使用Apache POI
- Mac OSX 搭建 Apache php mySql phpMyAdmin 开发环境
- CentOS系统如何设置APACHE和MYSQL服务开机自动运行
- Apache服务器 配置多个网站解决方案
- mac 系统升级到 OS X EI Capitan Apache 配置
- Class org.apache.struts2.json.JSONWriter can not access a member of
- SVN版本管理系统的安装 CentOS + Subversion + Apache + Jsvnadmin
- Maven:程序包org.apache.log4j不存在问题处理
- linux 安装apache 错误:configure: error: /bin/sh build/config.sub failed解决办法
- Apache:BeanUtils和PropertyUtils的区别
- Apache Curator入门实战