副本集RAFT协议的实现(1):heartbeat
2017-11-04 10:30
976 查看
在mongodb的副本集中, 使用了raft协议进行选主,主从切换等来保证副本集的高可用。副本集节点之间通过心跳, 来探测和通知节点的状态以及状态的变化因该采取的处理措施。本文重点介绍一下这两个方面的实现。
heatbeat的调用过程如下:
针对不同的状况, heartbeat的处理方式也是不同的, 具体的是现在_handleHeartbeatResponse里面, 针对不同的response, 产生不同的HeartbeatResponseAction , 然后, 再根据action来处理本地的操作。
HeartbeatResponseAction::NoAction
这种情况下, 副本集的结垢没有发生变换, 主要是缓存的节点状态与实际的状态不一致时的同步操作, 通过如下的2个步骤来实现:
(1)PostMemberStateUpdateAction postUpdateAction = _updateMemberStateFromTopologyCoordinator_inlock();
(2)_performPostMemberStateUpdateAction(postUpdateAction)
_updateMemberStateFromTopologyCoordinator_inlock具体如下:
缓存的MemberState和实际的是一样的, 如果是单节点副本集, postUpdateAction 设为 kActionStartSingleNodeElection, 否则, 看ActionNode;
缓存的节点是Primary, 或者实际的状态的removed 或rollback, 将_replicationWaiterList变为step down的状态, 否则就是kActionFollowerModeStateChange;
如果从secondary变为其他的状态, 本地节点不可读;
如果从非Primary变味secondary, 本地节点可以读;
如果节点从不可读变味可读, 创建snapshot;
如果新的状态是rollback, 删除之前的所有的snapshot;
_performPostMemberStateUpdateAction实现的具体如下:
kActionNode, 什么都不做;
kActionFollowerModeStateChange:
清空之前的sync target, 后面节点的操作需要重新选择target;
kActionCloseAllConnection:
关闭该节点的所有client的连接;
kActionWinElection:
将当前的term当做electionID, 调用ProcessWinElection;
通知Applier停止抓取oplog;
更新节点的状态,并且获取下一个PostMemberStateUpdateAction, 执行下一个action;
scheduleElectionWithNotification任务;
kActionStartSingleNodeElection:
直接调用_startElectSelfV1;
HeartbeatResponseAction::Reconfig
有两种情况会需要reconfig action:
通过command reconfig;
heartbeat响应的过程中, 发现副本集的config version比本地的大, 说明本地的config 已经旧了, 需要更新;
通过_scheduleHeartbeatReconfig, 来通知各个节点来更新config 设定。
HeartbeatResponseAction::StartElection
这个看起来只有protocol version是0 才会用到, 暂不考虑;
HeartbeatResponseAction::StepDownSelf
调用 _stepDownStart, 发出step down任务, 然后通过topology 更新;
HeartbeatResponseAction::StepDownRemotePrimary
调用_requestRemotePrimaryStepdown, 产生replSetStepDown 调用的request, 调用远端的节点;
HeartbeatResponseAction::PriorityTakeover
通过schedule 调用_startElectSelfIfEligibleV1, 最终_startElectSelfV1 完成新的选主过程。
下一节, 我们仔细讨论一下副本集的选主, 以及副本集的状态的变化过程,他们的基础还是heartbeat。
节点之间的hearbeat的发送
当一个副本集启动以后, 就需要进行heartbeat, 或者当副本集的状态发生了改变需要(主从切换, 某个接点掉线等), 都需要取消就得heartbeat, 并且开始一轮新的heartbeat。这个借口在_restartHeartbeats_inlock, heartbeat是周期性的执行的, 每个2秒发送一次, 超时时间是10秒, 每当某个接点超时, 就会把该接点的状态变为down, 如果此节点为primary, 就会stepdown。heatbeat的调用过程如下:
void ReplicationCoordinatorImpl::_restartHeartbeats_inlock( const ReplicationExecutor::CallbackArgs& cbData) { _cancelHeartbeats_inlock(); _startHeartbeats_inlock(cbData); } // 给每一个接点发送heartbeat 任务 void ReplicationCoordinatorImpl::_startHeartbeats_inlock( const ReplicationExecutor::CallbackArgs& cbData) { const Date_t now = _replExecutor.now(); for (int i = 0; i < _rsConfig.getNumMembers(); ++i) { _scheduleHeartbeatToTarget(_rsConfig.getMemberAt(i).getHostAndPort(), i, now); } if (isV1ElectionProtocol()) { for (auto&& slaveInfo : _slaveInfo) { slaveInfo.lastUpdate = _replExecutor.now(); slaveInfo.down = false; } // 计算下一次heartbeat的开始时间 _scheduleNextLivenessUpdate_inlock(cbData); } } void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,const HostAndPort& target, int targetIndex) { // 给其他接点的command, 拼装成一个request const RemoteCommandRequest request( target, "admin", heartbeatObj, BSON(rpc::kReplSetMetadataFieldName << 1), timeout); const ReplicationExecutor::RemoteCommandCallbackFn callback =stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse, this, stdx::placeholders::_1, targetIndex); // 最终, 通过scheduleRemoteCommand来讲任务放进远端的执 // 行队列里, 并且最终执行; _replExecutor.scheduleRemoteCommand(request, callback); }
节点之间heartbeat的响应
上面讲的是heartbeat的发送过程, 接收端是如何处理收到的相应哪? 针对不同的相应会采取什么样的处理哪?针对不同的状况, heartbeat的处理方式也是不同的, 具体的是现在_handleHeartbeatResponse里面, 针对不同的response, 产生不同的HeartbeatResponseAction , 然后, 再根据action来处理本地的操作。
void ReplicationCoordinatorImpl::_handleHeartbeatResponse( const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) { .. // 判断是否要产生Priority overaction HeartbeatResponseAction action = _topCoord->processHeartbeatResponse( now, networkTime, target, hbStatusResponse, lastApplied); // _handleHeartbeatResponseAction 该函数最终处理各种状况 _handleHeartbeatResponseAction(action, hbStatusResponse); } void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction( const HeartbeatResponseAction& action, const StatusWith<ReplSetHeartbeatResponse>& responseStatus) { switch (action.getAction()) { case HeartbeatResponseAction::NoAction: // Update the cached member state if different than the current topology member state if (_memberState != _topCoord->getMemberState()) { const PostMemberStateUpdateAction postUpdateAction = _updateMemberStateFromTopologyCoordinator_inlock(); _performPostMemberStateUpdateAction(postUpdateAction); } break; case HeartbeatResponseAction::Reconfig: _scheduleHeartbeatReconfig(responseStatus.getValue().getConfig()); break; case HeartbeatResponseAction::StartElection: _startElectSelf(); break; case HeartbeatResponseAction::StepDownSelf: invariant(action.getPrimaryConfigIndex() == _selfIndex); log() << "Stepping down from primary in response to heartbeat"; _topCoord->prepareForStepDown(); // Don't need to wait for stepdown to finish. _stepDownStart(); break; case HeartbeatResponseAction::StepDownRemotePrimary: { invariant(action.getPrimaryConfigIndex() != _selfIndex); _requestRemotePrimaryStepdown( _rsConfig.getMemberAt(action.getPrimaryConfigIndex()).getHostAndPort()); break; } case HeartbeatResponseAction::PriorityTakeover: { stdx::unique_lock<stdx::mutex> lk(_mutex); if (!_priorityTakeoverCbh.isValid()) { _priorityTakeoverWhen = _replExecutor.now() + _rsConfig.getPriorityTakeoverDelay(_selfIndex); log() << "Scheduling priority takeover at " << _priorityTakeoverWhen; _priorityTakeoverCbh = _scheduleWorkAt( _priorityTakeoverWhen, stdx::bind( &ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this, true)); } break; } } }
HeartbeatResponseAction::NoAction
这种情况下, 副本集的结垢没有发生变换, 主要是缓存的节点状态与实际的状态不一致时的同步操作, 通过如下的2个步骤来实现:
(1)PostMemberStateUpdateAction postUpdateAction = _updateMemberStateFromTopologyCoordinator_inlock();
(2)_performPostMemberStateUpdateAction(postUpdateAction)
_updateMemberStateFromTopologyCoordinator_inlock具体如下:
缓存的MemberState和实际的是一样的, 如果是单节点副本集, postUpdateAction 设为 kActionStartSingleNodeElection, 否则, 看ActionNode;
缓存的节点是Primary, 或者实际的状态的removed 或rollback, 将_replicationWaiterList变为step down的状态, 否则就是kActionFollowerModeStateChange;
如果从secondary变为其他的状态, 本地节点不可读;
如果从非Primary变味secondary, 本地节点可以读;
如果节点从不可读变味可读, 创建snapshot;
如果新的状态是rollback, 删除之前的所有的snapshot;
_performPostMemberStateUpdateAction实现的具体如下:
kActionNode, 什么都不做;
kActionFollowerModeStateChange:
清空之前的sync target, 后面节点的操作需要重新选择target;
kActionCloseAllConnection:
关闭该节点的所有client的连接;
kActionWinElection:
将当前的term当做electionID, 调用ProcessWinElection;
通知Applier停止抓取oplog;
更新节点的状态,并且获取下一个PostMemberStateUpdateAction, 执行下一个action;
scheduleElectionWithNotification任务;
kActionStartSingleNodeElection:
直接调用_startElectSelfV1;
HeartbeatResponseAction::Reconfig
有两种情况会需要reconfig action:
通过command reconfig;
heartbeat响应的过程中, 发现副本集的config version比本地的大, 说明本地的config 已经旧了, 需要更新;
通过_scheduleHeartbeatReconfig, 来通知各个节点来更新config 设定。
HeartbeatResponseAction::StartElection
这个看起来只有protocol version是0 才会用到, 暂不考虑;
HeartbeatResponseAction::StepDownSelf
调用 _stepDownStart, 发出step down任务, 然后通过topology 更新;
HeartbeatResponseAction::StepDownRemotePrimary
调用_requestRemotePrimaryStepdown, 产生replSetStepDown 调用的request, 调用远端的节点;
HeartbeatResponseAction::PriorityTakeover
通过schedule 调用_startElectSelfIfEligibleV1, 最终_startElectSelfV1 完成新的选主过程。
下一节, 我们仔细讨论一下副本集的选主, 以及副本集的状态的变化过程,他们的基础还是heartbeat。
相关文章推荐
- 副本集RAFT协议的实现(2):选主,主从切换
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- redis集群实现(五) sentinel的架构与raft协议
- 百度的bfs中raft协议实现(2)
- 百度的bfs中raft协议实现(1)
- Kafka源码深度解析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- raft协议的go语言实现
- Kafka源码分析-序列7 -Consumer -coordinator协议与heartbeat实现原理
- 分布式多副本一致性协议:raft
- Raft一致性协议实现源码
- 以raft协议为基础的模拟zoomkeep选举唯一主机实现
- redis集群实现(五) sentinel的架构与raft协议
- Socket 之 TCP 协议通信-c#实现
- FPGA作为从机与STM32进行SPI协议通信---Verilog实现 [转]
- TCP协议可靠性数据传输实现原理分析
- 配合secureCRT实现自动化控制简单协议的交互
- HLS协议实现
- Linux网络编程5——使用UDP协议实现群聊
- 怎么实现https协议?
- OpenWrt安装迅雷成功,实现BT emule等多协议下载