您的位置:首页 > 数据库 > Mongodb

副本集RAFT协议的实现(1):heartbeat

2017-11-04 10:30 976 查看
在mongodb的副本集中, 使用了raft协议进行选主,主从切换等来保证副本集的高可用。副本集节点之间通过心跳, 来探测和通知节点的状态以及状态的变化因该采取的处理措施。本文重点介绍一下这两个方面的实现。

节点之间的hearbeat的发送

当一个副本集启动以后, 就需要进行heartbeat, 或者当副本集的状态发生了改变需要(主从切换, 某个接点掉线等), 都需要取消就得heartbeat, 并且开始一轮新的heartbeat。这个借口在_restartHeartbeats_inlock, heartbeat是周期性的执行的, 每个2秒发送一次, 超时时间是10秒, 每当某个接点超时, 就会把该接点的状态变为down, 如果此节点为primary, 就会stepdown。

heatbeat的调用过程如下:

void ReplicationCoordinatorImpl::_restartHeartbeats_inlock(
const ReplicationExecutor::CallbackArgs& cbData) {
_cancelHeartbeats_inlock();
_startHeartbeats_inlock(cbData);
}

// 给每一个接点发送heartbeat 任务
void ReplicationCoordinatorImpl::_startHeartbeats_inlock(
const ReplicationExecutor::CallbackArgs& cbData) {
const Date_t now = _replExecutor.now();
for (int i = 0; i < _rsConfig.getNumMembers(); ++i) {
_scheduleHeartbeatToTarget(_rsConfig.getMemberAt(i).getHostAndPort(), i, now);
}
if (isV1ElectionProtocol()) {
for (auto&& slaveInfo : _slaveInfo) {
slaveInfo.lastUpdate = _replExecutor.now();
slaveInfo.down = false;
}

// 计算下一次heartbeat的开始时间
_scheduleNextLivenessUpdate_inlock(cbData);
}
}

void ReplicationCoordinatorImpl::_doMemberHeartbeat(ReplicationExecutor::CallbackArgs cbData,const HostAndPort& target, int targetIndex) {
// 给其他接点的command, 拼装成一个request
const RemoteCommandRequest request(
target, "admin", heartbeatObj,                     BSON(rpc::kReplSetMetadataFieldName << 1), timeout);
const ReplicationExecutor::RemoteCommandCallbackFn callback =stdx::bind(&ReplicationCoordinatorImpl::_handleHeartbeatResponse,
this,
stdx::placeholders::_1,
targetIndex);

// 最终, 通过scheduleRemoteCommand来讲任务放进远端的执
// 行队列里, 并且最终执行;
_replExecutor.scheduleRemoteCommand(request, callback);
}


节点之间heartbeat的响应

上面讲的是heartbeat的发送过程, 接收端是如何处理收到的相应哪? 针对不同的相应会采取什么样的处理哪?

针对不同的状况, heartbeat的处理方式也是不同的, 具体的是现在_handleHeartbeatResponse里面, 针对不同的response, 产生不同的HeartbeatResponseAction , 然后, 再根据action来处理本地的操作。

void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) {
..
// 判断是否要产生Priority overaction
HeartbeatResponseAction action = _topCoord->processHeartbeatResponse(
now, networkTime, target, hbStatusResponse, lastApplied);
// _handleHeartbeatResponseAction 该函数最终处理各种状况
_handleHeartbeatResponseAction(action, hbStatusResponse);
}

void ReplicationCoordinatorImpl::_handleHeartbeatResponseAction(
const HeartbeatResponseAction& action,
const StatusWith<ReplSetHeartbeatResponse>& responseStatus) {
switch (action.getAction()) {
case HeartbeatResponseAction::NoAction:
// Update the cached member state if different than the current topology member state
if (_memberState != _topCoord->getMemberState()) {
const PostMemberStateUpdateAction postUpdateAction =
_updateMemberStateFromTopologyCoordinator_inlock();
_performPostMemberStateUpdateAction(postUpdateAction);
}
break;
case HeartbeatResponseAction::Reconfig:
_scheduleHeartbeatReconfig(responseStatus.getValue().getConfig());
break;
case HeartbeatResponseAction::StartElection:
_startElectSelf();
break;
case HeartbeatResponseAction::StepDownSelf:
invariant(action.getPrimaryConfigIndex() == _selfIndex);
log() << "Stepping down from primary in response to heartbeat";
_topCoord->prepareForStepDown();
// Don't need to wait for stepdown to finish.
_stepDownStart();
break;
case HeartbeatResponseAction::StepDownRemotePrimary: {
invariant(action.getPrimaryConfigIndex() != _selfIndex);
_requestRemotePrimaryStepdown(
_rsConfig.getMemberAt(action.getPrimaryConfigIndex()).getHostAndPort());
break;
}
case HeartbeatResponseAction::PriorityTakeover: {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (!_priorityTakeoverCbh.isValid()) {
_priorityTakeoverWhen =
_replExecutor.now() + _rsConfig.getPriorityTakeoverDelay(_selfIndex);
log() << "Scheduling priority takeover at " << _priorityTakeoverWhen;
_priorityTakeoverCbh = _scheduleWorkAt(
_priorityTakeoverWhen,
stdx::bind(
&ReplicationCoordinatorImpl::_startElectSelfIfEligibleV1, this, true));
}
break;
}
}
}


HeartbeatResponseAction::NoAction

这种情况下, 副本集的结垢没有发生变换, 主要是缓存的节点状态与实际的状态不一致时的同步操作, 通过如下的2个步骤来实现:

(1)PostMemberStateUpdateAction postUpdateAction = _updateMemberStateFromTopologyCoordinator_inlock();

(2)_performPostMemberStateUpdateAction(postUpdateAction)

_updateMemberStateFromTopologyCoordinator_inlock具体如下:

缓存的MemberState和实际的是一样的, 如果是单节点副本集, postUpdateAction 设为 kActionStartSingleNodeElection, 否则, 看ActionNode;

缓存的节点是Primary, 或者实际的状态的removed 或rollback, 将_replicationWaiterList变为step down的状态, 否则就是kActionFollowerModeStateChange;

如果从secondary变为其他的状态, 本地节点不可读;

如果从非Primary变味secondary, 本地节点可以读;

如果节点从不可读变味可读, 创建snapshot;

如果新的状态是rollback, 删除之前的所有的snapshot;

_performPostMemberStateUpdateAction实现的具体如下:

kActionNode, 什么都不做;

kActionFollowerModeStateChange:

清空之前的sync target, 后面节点的操作需要重新选择target;

kActionCloseAllConnection:

关闭该节点的所有client的连接;

kActionWinElection:

将当前的term当做electionID, 调用ProcessWinElection;

通知Applier停止抓取oplog;

更新节点的状态,并且获取下一个PostMemberStateUpdateAction, 执行下一个action;

scheduleElectionWithNotification任务;

kActionStartSingleNodeElection:

直接调用_startElectSelfV1;

HeartbeatResponseAction::Reconfig

有两种情况会需要reconfig action:

通过command reconfig;

heartbeat响应的过程中, 发现副本集的config version比本地的大, 说明本地的config 已经旧了, 需要更新;

通过_scheduleHeartbeatReconfig, 来通知各个节点来更新config 设定。

HeartbeatResponseAction::StartElection

这个看起来只有protocol version是0 才会用到, 暂不考虑;

HeartbeatResponseAction::StepDownSelf

调用 _stepDownStart, 发出step down任务, 然后通过topology 更新;

HeartbeatResponseAction::StepDownRemotePrimary

调用_requestRemotePrimaryStepdown, 产生replSetStepDown 调用的request, 调用远端的节点;

HeartbeatResponseAction::PriorityTakeover

通过schedule 调用_startElectSelfIfEligibleV1, 最终_startElectSelfV1 完成新的选主过程。

下一节, 我们仔细讨论一下副本集的选主, 以及副本集的状态的变化过程,他们的基础还是heartbeat。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息