您的位置:首页 > 其它

百度的bfs中raft协议实现(1)

2018-01-22 17:23 302 查看
raft具体原理就不说了,在这里只分析下百度bfs中的代码实现。

为了使用不同的同步协议,bfs中把同步抽象了接口,实现方面一个是传统的主从(带快照,类似mysql dump数据+GTID同步方式),一个就是raft了。

具体抽象接口如下:

class RaftImpl : public Sync {
public:
RaftImpl();
~RaftImpl();
void Init(SyncCallbacks callbacks);//初始化一系列callback,如log具体怎么落地,生成快照方式
bool IsLeader(std::string* leader_addr = NULL);//自己是否leader,传出真正leader
bool Log(const std::string& entry, int timeout_ms = 10000);//log调用入口,等待timeout_ms的时间
void Log(const std::string& entry, std::function<void (bool)> callback);//log入口,成功后回调
void SwitchToLeader() {}
std::string GetStatus();//返回状态,leader/follower
public:
google::protobuf::Service* GetService(); // 获取服务提供者
private:
RaftNodeImpl* raft_node_;//真正实现者
};

然后看看实现者,代码本身已经带了一些注释了:
enum NodeState {
kFollower = 0,
kCandidate = 1,
kLeader = 2,
};

class RaftNodeImpl : public RaftNode {
public:
RaftNodeImpl(const std::string& raft_nodes, int node_index,
int election_timeout, const std::string& db_path);// 初始化节点信息,选举时间,log存储目录
~RaftNodeImpl();
void Vote(::google::protobuf::RpcController* controller,
const ::baidu::bfs::VoteRequest* request,
::baidu::bfs::VoteResponse* response,
::google::protobuf::Closure* done);// 要求其他节点给我投票
void AppendEntries(::google::protobuf::RpcController* controller,
const ::baidu::bfs::AppendEntriesRequest* request,
::baidu::bfs::AppendEntriesResponse* response,
::google::protobuf::Closure* done);// 主动给其他节点复制日志(包含心跳)
public:
bool GetLeader(std::string* leader);// 获取leader ip,返回自己是否leader
void AppendLog(const std::string& log, std::function<void (bool)> callback);//抽象接口功能实现,同上描述
bool AppendLog(const std::string& log, int timeout_ms = 10000);// 同上
void Init(std::function<void (const std::string& log)> callback, // 同上
std::function<void (int32_t, std::string*)> snapshot_callback);
private:
bool StoreContext(const std::string& context, int64_t value); //存储meta信息
bool StoreContext(const std::string& context, const std::string& value);//同上

std::string Index2Logkey(int64_t index);//atoi
void LoadStorage(const std::string& db_path);//初始化meta
bool CancelElection();//放弃选举
void ResetElection();//重置选举
void ReplicateLogForNode(uint32_t id);//leader复制日志
void ReplicateLogWorker(uint32_t id);//复制日志的线程函数
void Election();//发起选举
bool CheckTerm(int64_t term);//比较term和本地term
void ElectionCallback(const VoteRequest* request,
VoteResponse* response,
bool failed,
int error,
const std::string& node_addr);// 选举的cb
bool StoreLog(int64_t term, int64_t index, const std::string& log, LogType type = kUserLog);//存储log
void ApplyLog();//应用commitlog

std::string LoadVoteFor();// 以下函数都是和状态相关,已废弃
void SetVeteFor(const std::string& votefor);
int64_t LoadCurrentTerm();
void SetCurrentTerm(int64_t);
void SetLastApplied(int64_t index);
int64_t GetLastApplied(int64_t index);
private:
std::vector<std::string> nodes_;
std::string self_;

int64_t current_term_;      /// 当前term
std::string voted_for_;     /// 当前term下投的票
LogDB* log_db_;             /// log持久存储
int64_t log_index_;         /// 上一条log的index
int64_t log_term_;          /// 上一条log的term

int64_t commit_index_;      /// 提交的log的index
int64_t last_applied_;      /// 应用到状态机的index
bool applying_;             /// 正在提交到状态机

bool node_stop_;//节点停止,用于日志同步标志位
struct FollowerContext {//follower同步状态
int64_t next_index;//下一个日志序号
int64_t match_index;//已经match的序号
common::ThreadPool worker;
common::CondVar condition;
FollowerContext(Mutex* mu) : next_index(0), match_index(0), worker(1), condition(mu) {}
};
std::vector<FollowerContext*> follower_context_;

Mutex mu_;
common::ThreadPool*  thread_pool_;
RpcClient*   rpc_client_;
std::set<std::string> voted_;   /// 谁投我了
std::string leader_;
int64_t election_taskid_;//选举定时任务id,可以用于取消
int32_t election_timeout_;//选举超时

std::function<void (const std::string& log)> log_callback_;
std::map<int64_t, std::function<void (bool)> > callback_map_;//日志被复制后对应的cb
NodeState node_state_;
};

先看看构造:
/*具体代码不贴,基本是初始化raft节点,状态恢复(LoadStorage),重置选举(ResetElection)等。
LoadStorage中,恢复了4个变量
last_applied_,current_term_,voted_for_,log_index_。然后对每个非自身节点初始化其FollowerContext,next_index = log_index_+1,并初始化工作线程为如下函数:*/
void RaftNodeImpl::ReplicateLogWorker(uint32_t id) {
FollowerContext* follower = follower_context_[id];
while (true) {
MutexLock lock(&mu_);
while (node_state_ != kLeader && !node_stop_) {//如果不是leader且没stop,那就等着leader通知
follower->condition.Wait();
}
if (node_stop_) {//因退出而唤醒的,退出循环
return;
}
int64_t s = common::timer::get_micros();//记录开始时间
ReplicateLogForNode(id);//具体复制日志过程(顺带心跳)
int64_t d = election_timeout_ / 2 - (common::timer::get_micros() - s) / 1000;
if (d > 0) {//d=选举一趟时间-复制时间;如果d还有剩余则变相sleep,除非leader主动通知(比如有新日志了,leader状态变了之类),这样做的结果也就是控制心跳时间,使其至少要>选举一趟时间
follower->condition.TimeWait(d);
}
}
}
/*这个函数对非leader状态的节点,会一直阻塞在wait。如果是leader状态,则会定期调到ReplicateLogForNode,有日志(match_index < log_index_)则是复制,没有则是心跳作用(也就是心跳间隔>选举单趟时间)。*/


void RaftNodeImpl::ResetElection() {
mu_.AssertHeld();
if (election_taskid_ != -1) {
CancelElection();
}
election_taskid_ =
thread_pool_->DelayTask(election_timeout_ + rand() % election_timeout_,
std::bind(&RaftNodeImpl::Election, this));
//LOG(INFO, "Reset election %ld", election_taskid_);
}
/*功能很简单,就是取消线程池中的定时器任务,并重启。这里和raft标准类似的[t,t * 2]随机时间,如果没有取消定时任务就会开始一轮新选举任务。*/

接下来是最长的函数了,具体的Leader日志复制逻辑:

void RaftNodeImpl::ReplicateLogForNode(uint32_t id) {
mu_.AssertHeld();

FollowerContext* follower = follower_context_[id];
int64_t next_index = follower->next_index;//该follower的下一个日志序号
int64_t match_index = follower->match_index;//已经match的序号

mu_.Unlock();
int64_t max_index = 0;
int64_t max_term = -1;
std::unique_ptr<AppendEntriesRequest> request(new AppendEntriesRequest);
std::unique_ptr<AppendEntriesResponse> response(new AppendEntriesResponse);
request->set_term(current_term_);
request->set_leader(self_);
request->set_leader_commit(commit_index_);//构建req
LOG(INFO, "M %ld N %ld I %ld", match_index, next_index, log_index_);
if (match_index < log_index_) {//follower的match点<当前log点,说明有新日志
assert(match_index <= next_index);
int64_t prev_index = 0;
int64_t prev_term = 0;
std::string prev_log;
StatusCode s = log_db_->Read(next_index - 1, &prev_log);
if (s == kOK) {// 获取上一次提交的最后一条日志的index和term
LogEntry prev_entry;
bool ret = prev_entry.ParseFromString(prev_log);
assert(ret);
prev_index = prev_entry.index();
prev_term = prev_entry.term();
}
request->set_prev_log_index(prev_index);
request->set_prev_log_term(prev_term);
for (int64_t i = next_index; i <= log_index_; i++) {//把新产生的日志放req里
std::string log;
s = log_db_->Read(i, &log);
if (s != kOK) {
LOG(FATAL, "Data lost: %ld", i);
break;
}
LOG(INFO, "Add %ld to request", i);
LogEntry* entry = request->add_entries();
bool ret = entry->ParseFromString(log);
assert(ret);
max_index = entry->index();//记录最后一个log的index和term,等下更新用
max_term = entry->term();
if (request->ByteSize() >= 1024*1024) {//限制req大小,如果发太多但失败了,重发就恶心了
break;
}
}
}
RaftNode_Stub* node;
rpc_client_->GetStub(nodes_[id], &node);//给follower发送日志,这里使用阻塞rpc,等返回结果,follower的响应函数为AppendEntries。先直接看下面leader对结果的处理
bool ret = rpc_client_->SendRequest(node, &RaftNode_Stub::AppendEntries,
request.get(), response.get(), 1, 1);
LOG(INFO, "Replicate %d entrys to %s return %d",
request->entries_size(), nodes_[id].c_str(), ret);
delete node;

mu_.Lock();
if (ret) {
int64_t term = response->term();
if (CheckTerm(term)) {//如果term匹配的(term<=current_term_)
if (response->success()) {//这里表示日志匹配了(空日志也可以)
if (max_index && max_term == current_term_) {//有日志且是当前term的日志
follower->match_index = max_index;//更新follower的match,next状态
follower->next_index = max_index + 1;
LOG(INFO, "Replicate to %s success match %ld next %ld",
nodes_[id].c_str(), max_index, max_index + 1);
std::vector<int64_t> match_index;//log被接收后搜集所有follower的match_index
for (uint32_t i = 0; i < nodes_.size(); i++) {
if (nodes_[i] == self_) {
match_index.push_back(1LL<<60);//如果是自身,一定match了,当做最大
} else {
match_index.push_back(follower_context_[i]->match_index);
}
}
std::sort(match_index.begin(), match_index.end());
int mid_pos = (nodes_.size() - 1) / 2;//排序,获取多数派的match_index
int64_t commit_index = match_index[mid_pos];
if (commit_index > commit_index_) {// commit_index更新了,本地commit
LOG(INFO, "Update commit_index from %ld to %ld",
commit_index_, commit_index);
commit_index_ = commit_index;
while (last_applied_ < commit_index) {//依次本地应用日志
last_applied_ ++;
LOG(INFO, "[Raft] Apply %ld to leader", last_applied_);
std::map<int64_t, std::function<void (bool)> >::iterator cb_it =
callback_map_.find(last_applied_);//日志的callback
if (cb_it != callback_map_.end()) {
std::function<void (bool)> callback = cb_it->second;
callback_map_.erase(cb_it);
mu_.Unlock();
LOG(INFO, "[Raft] AppendLog callback %ld", last_applied_);
callback(true);//处理callback
mu_.Lock();
} else {
LOG(INFO, "[Raft] no callback for %ld", last_applied_);
}
}
if (last_applied_ == commit_index) {//如果处理完才持久化last_applied
StoreContext("last_applied", last_applied_);
}
}
}// end of response->success(),没日志,也就是心跳不处理任何事
} else {// 日志不匹配,这里是回退一个log_index_,等待下一次复制(Q:差距比较大会不会导致效率低?)
if (follower->next_index > follower->match_index
&& follower->next_index > 1) {
--follower->next_index;
}
}
}
}
}

这里是日志复制follower的处理逻辑:

void RaftNodeImpl::AppendEntries(::google::protobuf::RpcController* controller,
const ::baidu::bfs::AppendEntriesRequest* request,
::baidu::bfs::AppendEntriesResponse* response,
::google::protobuf::Closure* done) {
MutexLock lock(&mu_);
int64_t term = request->term();// leader的term
if (term < current_term_) {//leader term还没自己大?可能是延迟到达,但我已经当选leader。也可能是发生分区,旧leader已经不是最新leader了
LOG(INFO, "AppendEntries old term %ld / %ld", term, current_term_);
response->set_success(false);
done->Run();//不知道干什么的?回包?
return;
}

CheckTerm(term);//比较term,可能转变为follower
if (term == current_term_ && node_state_ == kCandidate) {//对方已经开始发送日志,term相等,但我还是候选状态,说明对方已经当选leader了
node_state_ = kFollower;
}
leader_ = request->leader();
ResetElection();//日志(心跳)到了,重置选举超时时间
int64_t prev_log_term = request->prev_log_term();
int64_t prev_log_index = request->prev_log_index();//上次一次同步的最后一个log_index
if (prev_log_index > 0) {   // check prev term,检查日志连续性
std::string log;
StatusCode s = log_db_->Read(prev_log_index, &log);
LogEntry entry;
if (s == kOK && !entry.ParseFromString(log)) {
LOG(FATAL, "Paser logdb value fail:%ld", prev_log_index);
}
if (s == kNsNotFound || entry.term() != prev_log_term) {//日志不连续,让leader调整next_index
LOG(INFO, "[Raft] Last index %ld term %ld / %ld mismatch",
prev_log_index, prev_log_term, entry.term());
response->set_success(false);
done->Run();
return;
}
LOG(INFO, "[Raft] Last index %ld term %ld match", prev_log_index, prev_log_term);
}

/// log match...
int64_t leader_commit = request->leader_commit();
int entry_count = request->entries_size();
if (entry_count > 0) {//有日志要处理
LOG(INFO, "AppendEntries from %s %ld \"%s\" %ld",
request->leader().c_str(), term,
request->entries(0).log_data().c_str(), leader_commit);
for (int i = 0; i < entry_count; i++) {
int64_t index = request->entries(i).index();
if (log_index_ >= index) {//有重复的日志
LOG(INFO, "[raft] Ignore duplicate entry %ld, last log_index: %ld",
index, log_index_);
continue;
} else if (log_index_ + 1 != index && log_index_ != -1) {//预期index是连续的(Q:不连续怎么办?)
LOG(FATAL, "[Raft] Wrong index %ld in AppendEntries, last log_index: %ld",
index, log_index_);
}
bool ret = StoreLog(current_term_, index,
request->entries(i).log_data());//存储日志
log_index_ = index;//对follower来说log_index_是接受的最后一个日志的index
if (!ret) {
response->set_success(false);
done->Run();
return;
}
}
}

response->set_term(current_term_);
response->set_success(true);
done->Run();
//到这里日志复制都成功了,更新本地commit_index_
if (leader_commit > commit_index_) {
commit_index_ = leader_commit;
}
if (commit_index_ > last_applied_) {//有新日志可以应用,异步处理
thread_pool_->AddTask(std::bind(&RaftNodeImpl::ApplyLog, this));
}
}

接着看看应用日志,代码很简单,读取[last_applied_+1, commit_index_]区间的日志,依次调用callback,并更新last_applied_落地。不过这里值得注意的是,只有log_type为kUserLog的才应用,log还有一种kRaftCmd类型,这个类型日志后面说明。
日志复制大概就是以上流程,上面提到,每次如果没有收到日志复制(心跳)RPC,则有一个随机时间的delay_task触发选举,选举主体:

void RaftNodeImpl::Election() {
MutexLock lock(&mu_);
if (node_state_ == kLeader) {//已经是leader,取消本次操作(Q:因为其他节点回应慢了,所以多触发一次?)
election_taskid_ = -1;
return;
}

voted_.clear();//重新统计投票信息
current_term_ ++;
node_state_ = kCandidate;//候选状态
voted_for_ = self_;//先投自己(标识投过谁,每个term只投一次,不重复)
if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) {
LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_);
}
voted_.insert(self_);//谁投过我,多数派判断的地方
LOG(INFO, "Start Election: CT%ld LI%ld LT%ld", current_term_, log_index_, log_term_);
for (uint32_t i = 0; i < nodes_.size(); i++) {//对每个非自己的node,要求他们投我
if (nodes_[i] == self_) {
continue;
}
LOG(INFO, "Send VoteRequest to %s", nodes_[i].c_str());
VoteRequest* request = new VoteRequest;
request->set_term(current_term_);
request->set_candidate(self_);
request->set_last_log_index(log_index_);
request->set_last_log_term(log_term_);//拿出自己最后log的term,index,当前选举term.(Q:log_term_好像并没有用到,一直都是0?)
VoteResponse* response = new VoteResponse;
RaftNode_Stub* raft_node;
rpc_client_->GetStub(nodes_[i], &raft_node);
std::function<void (const VoteRequest*, VoteResponse*, bool, int)> callback
= std::bind(&RaftNodeImpl::ElectionCallback, this,
std::placeholders::_1, std::placeholders::_2,
std::placeholders::_3, std::placeholders::_4, nodes_[i]);
rpc_client_->AsyncRequest(raft_node, &RaftNode_Stub::Vote, request, response, callback, 60, 1);//这里和日志复制不同了,没有使用同步RPC,可能是节点多了,用不着等待,也没有promise/future并行模式等待N-1个。所以入口需要判断是否已经是leader
delete raft_node;
}
election_taskid_ =
thread_pool_->DelayTask(election_timeout_ + rand() % election_timeout_,
std::bind(&RaftNodeImpl::Election, this));//计划下一次选举
}

看看对选举的回应:

void RaftNodeImpl::Vote(::google::protobuf::RpcController* controller,
const ::baidu::bfs::VoteRequest* request,
::baidu::bfs::VoteResponse* response,
::google::protobuf::Closure* done) {
int64_t term = request->term();
const std::string& candidate = request->candidate();
int64_t last_log_index = request->last_log_index();
int64_t last_log_term = request->last_log_term();
LOG(INFO, "Recv vote request: %s %ld %ld / (%s %ld %ld)",
candidate.c_str(), term, last_log_term,
voted_for_.c_str(), current_term_, log_term_);
MutexLock lock(&mu_);
CheckTerm(term);
if (term >= current_term_//给对方投票的条件: 1.对方term>=当前选举term
&& (voted_for_ == "" || voted_for_ == candidate)//2.没投过 或 已经给对方投过
&& (last_log_term > log_term_ ||//3.对方日志term较大 或 日志相等,但log序号更大
(last_log_term == log_term_ && last_log_index >= log_index_))) {
voted_for_ = candidate;//记录本次term投过的
if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) {
LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_);
} else {
LOG(INFO, "Granted %s %ld %ld", candidate.c_str(), term, last_log_index);
}
response->set_vote_granted(true);
response->set_term(term);
done->Run();
return;
}
response->set_vote_granted(false);//否则拒绝本次投票
response->set_term(current_term_);
done->Run();
}

最后是投票结果的处理:

void RaftNodeImpl::ElectionCallback(const VoteRequest* request,
VoteResponse* response,
bool failed,
int error,
const std::string& node_addr) {
std::unique_ptr<const VoteRequest> req(request);
std::unique_ptr<VoteResponse> res(response);
if (failed) {
return;
}

int64_t term = response->term();
bool granted = response->vote_granted();
assert(term >= request->term() && (term == request->term() || !granted));//对结果的预期;1.投了则resp的term和req的相等。2.没投说明对方resp的term较大 或term相等,但log较新。

LOG(INFO, "ElectionCallback %s by %s %ld / %ld",
granted ? "granted" : "reject",
node_addr.c_str(), term, current_term_);

MutexLock lock(&mu_);
CheckTerm(term);
if (term != current_term_ || !granted || node_state_ == kLeader) {//异步回调,状态都要判断
return;
}

voted_.insert(node_addr);//每次有人投我,加入被投列表(上面过滤了非current_term_的cb)
if (voted_.size() >= (nodes_.size() / 2) + 1) {//拿到多数投票
leader_ = self_;
node_state_ = kLeader;
CancelElection();//取消选举超时
LOG(INFO, "Change state to Leader, term %ld index %ld commit %ld applied %ld",
current_term_, log_index_, commit_index_, last_applied_);
StoreLog(current_term_, ++log_index_, "", kRaftCmd);//立即插入一个kRaftCmd空内容的日志,在日志应用里是被忽略的。但这一步很重要,从根本上来说强化了node的leader合法地位。这里举2个例子:[1].有ABC三节点,A和C分区,但和都和B连通。假设A是靠term自增和一个较快的超时当选的,大家的log_index相同,还没有用户日志,如果没这一步,C觉得超时了,要重新选举,C是有可能根据term+较快超时当选的,于是AC轮流当选。有这一步了会使得B的log和A同步,故B不会投票给C,可以减少这样的情况(当然,C投票要求先到则还是会出现)   [2].有ABCDE5个节点,a.A是leader,log_index=1,已经复制到B,AB挂掉。b.假设C当选,C接受了一条用户日志,log_index=1(term不同),C挂掉 c.A恢复,A复制D,这次已经复制到多数集可以提交了,AB再次挂 d.C当选,然后用自己的日志覆盖了AB的log_index=1(这是不允许的)。 有了这一步后在c步这里A会加一条log_index=2,必须先提交了,才能提交前面的log1。
for (uint32_t i = 0;i < follower_context_.size(); i++) {//更新follower状态
if (nodes_[i] != self_) {
follower_context_[i]->match_index = 0;
follower_context_[i]->next_index = log_index_ + 1;
follower_context_[i]->condition.Signal();//触发和各个follower的日志复制(心跳),之前阻塞在condtion.Wait()处
}
}
}
}

以上就是大致流程,代码实现看起来并不麻烦,主要是对比raft的细节和正确性
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: