百度的bfs中raft协议实现(1)
2018-01-22 17:23
302 查看
raft具体原理就不说了,在这里只分析下百度bfs中的代码实现。
为了使用不同的同步协议,bfs中把同步抽象了接口,实现方面一个是传统的主从(带快照,类似mysql dump数据+GTID同步方式),一个就是raft了。
具体抽象接口如下:
然后看看实现者,代码本身已经带了一些注释了:
先看看构造:
接下来是最长的函数了,具体的Leader日志复制逻辑:
这里是日志复制follower的处理逻辑:
接着看看应用日志,代码很简单,读取[last_applied_+1, commit_index_]区间的日志,依次调用callback,并更新last_applied_落地。不过这里值得注意的是,只有log_type为kUserLog的才应用,log还有一种kRaftCmd类型,这个类型日志后面说明。
日志复制大概就是以上流程,上面提到,每次如果没有收到日志复制(心跳)RPC,则有一个随机时间的delay_task触发选举,选举主体:
看看对选举的回应:
最后是投票结果的处理:
以上就是大致流程,代码实现看起来并不麻烦,主要是对比raft的细节和正确性
为了使用不同的同步协议,bfs中把同步抽象了接口,实现方面一个是传统的主从(带快照,类似mysql dump数据+GTID同步方式),一个就是raft了。
具体抽象接口如下:
class RaftImpl : public Sync { public: RaftImpl(); ~RaftImpl(); void Init(SyncCallbacks callbacks);//初始化一系列callback,如log具体怎么落地,生成快照方式 bool IsLeader(std::string* leader_addr = NULL);//自己是否leader,传出真正leader bool Log(const std::string& entry, int timeout_ms = 10000);//log调用入口,等待timeout_ms的时间 void Log(const std::string& entry, std::function<void (bool)> callback);//log入口,成功后回调 void SwitchToLeader() {} std::string GetStatus();//返回状态,leader/follower public: google::protobuf::Service* GetService(); // 获取服务提供者 private: RaftNodeImpl* raft_node_;//真正实现者 };
然后看看实现者,代码本身已经带了一些注释了:
enum NodeState { kFollower = 0, kCandidate = 1, kLeader = 2, }; class RaftNodeImpl : public RaftNode { public: RaftNodeImpl(const std::string& raft_nodes, int node_index, int election_timeout, const std::string& db_path);// 初始化节点信息,选举时间,log存储目录 ~RaftNodeImpl(); void Vote(::google::protobuf::RpcController* controller, const ::baidu::bfs::VoteRequest* request, ::baidu::bfs::VoteResponse* response, ::google::protobuf::Closure* done);// 要求其他节点给我投票 void AppendEntries(::google::protobuf::RpcController* controller, const ::baidu::bfs::AppendEntriesRequest* request, ::baidu::bfs::AppendEntriesResponse* response, ::google::protobuf::Closure* done);// 主动给其他节点复制日志(包含心跳) public: bool GetLeader(std::string* leader);// 获取leader ip,返回自己是否leader void AppendLog(const std::string& log, std::function<void (bool)> callback);//抽象接口功能实现,同上描述 bool AppendLog(const std::string& log, int timeout_ms = 10000);// 同上 void Init(std::function<void (const std::string& log)> callback, // 同上 std::function<void (int32_t, std::string*)> snapshot_callback); private: bool StoreContext(const std::string& context, int64_t value); //存储meta信息 bool StoreContext(const std::string& context, const std::string& value);//同上 std::string Index2Logkey(int64_t index);//atoi void LoadStorage(const std::string& db_path);//初始化meta bool CancelElection();//放弃选举 void ResetElection();//重置选举 void ReplicateLogForNode(uint32_t id);//leader复制日志 void ReplicateLogWorker(uint32_t id);//复制日志的线程函数 void Election();//发起选举 bool CheckTerm(int64_t term);//比较term和本地term void ElectionCallback(const VoteRequest* request, VoteResponse* response, bool failed, int error, const std::string& node_addr);// 选举的cb bool StoreLog(int64_t term, int64_t index, const std::string& log, LogType type = kUserLog);//存储log void ApplyLog();//应用commitlog std::string LoadVoteFor();// 以下函数都是和状态相关,已废弃 void SetVeteFor(const std::string& votefor); int64_t LoadCurrentTerm(); void SetCurrentTerm(int64_t); void SetLastApplied(int64_t index); int64_t GetLastApplied(int64_t index); private: std::vector<std::string> nodes_; std::string self_; int64_t current_term_; /// 当前term std::string voted_for_; /// 当前term下投的票 LogDB* log_db_; /// log持久存储 int64_t log_index_; /// 上一条log的index int64_t log_term_; /// 上一条log的term int64_t commit_index_; /// 提交的log的index int64_t last_applied_; /// 应用到状态机的index bool applying_; /// 正在提交到状态机 bool node_stop_;//节点停止,用于日志同步标志位 struct FollowerContext {//follower同步状态 int64_t next_index;//下一个日志序号 int64_t match_index;//已经match的序号 common::ThreadPool worker; common::CondVar condition; FollowerContext(Mutex* mu) : next_index(0), match_index(0), worker(1), condition(mu) {} }; std::vector<FollowerContext*> follower_context_; Mutex mu_; common::ThreadPool* thread_pool_; RpcClient* rpc_client_; std::set<std::string> voted_; /// 谁投我了 std::string leader_; int64_t election_taskid_;//选举定时任务id,可以用于取消 int32_t election_timeout_;//选举超时 std::function<void (const std::string& log)> log_callback_; std::map<int64_t, std::function<void (bool)> > callback_map_;//日志被复制后对应的cb NodeState node_state_; };
先看看构造:
/*具体代码不贴,基本是初始化raft节点,状态恢复(LoadStorage),重置选举(ResetElection)等。 LoadStorage中,恢复了4个变量 last_applied_,current_term_,voted_for_,log_index_。然后对每个非自身节点初始化其FollowerContext,next_index = log_index_+1,并初始化工作线程为如下函数:*/ void RaftNodeImpl::ReplicateLogWorker(uint32_t id) { FollowerContext* follower = follower_context_[id]; while (true) { MutexLock lock(&mu_); while (node_state_ != kLeader && !node_stop_) {//如果不是leader且没stop,那就等着leader通知 follower->condition.Wait(); } if (node_stop_) {//因退出而唤醒的,退出循环 return; } int64_t s = common::timer::get_micros();//记录开始时间 ReplicateLogForNode(id);//具体复制日志过程(顺带心跳) int64_t d = election_timeout_ / 2 - (common::timer::get_micros() - s) / 1000; if (d > 0) {//d=选举一趟时间-复制时间;如果d还有剩余则变相sleep,除非leader主动通知(比如有新日志了,leader状态变了之类),这样做的结果也就是控制心跳时间,使其至少要>选举一趟时间 follower->condition.TimeWait(d); } } } /*这个函数对非leader状态的节点,会一直阻塞在wait。如果是leader状态,则会定期调到ReplicateLogForNode,有日志(match_index < log_index_)则是复制,没有则是心跳作用(也就是心跳间隔>选举单趟时间)。*/
void RaftNodeImpl::ResetElection() { mu_.AssertHeld(); if (election_taskid_ != -1) { CancelElection(); } election_taskid_ = thread_pool_->DelayTask(election_timeout_ + rand() % election_timeout_, std::bind(&RaftNodeImpl::Election, this)); //LOG(INFO, "Reset election %ld", election_taskid_); } /*功能很简单,就是取消线程池中的定时器任务,并重启。这里和raft标准类似的[t,t * 2]随机时间,如果没有取消定时任务就会开始一轮新选举任务。*/
接下来是最长的函数了,具体的Leader日志复制逻辑:
void RaftNodeImpl::ReplicateLogForNode(uint32_t id) { mu_.AssertHeld(); FollowerContext* follower = follower_context_[id]; int64_t next_index = follower->next_index;//该follower的下一个日志序号 int64_t match_index = follower->match_index;//已经match的序号 mu_.Unlock(); int64_t max_index = 0; int64_t max_term = -1; std::unique_ptr<AppendEntriesRequest> request(new AppendEntriesRequest); std::unique_ptr<AppendEntriesResponse> response(new AppendEntriesResponse); request->set_term(current_term_); request->set_leader(self_); request->set_leader_commit(commit_index_);//构建req LOG(INFO, "M %ld N %ld I %ld", match_index, next_index, log_index_); if (match_index < log_index_) {//follower的match点<当前log点,说明有新日志 assert(match_index <= next_index); int64_t prev_index = 0; int64_t prev_term = 0; std::string prev_log; StatusCode s = log_db_->Read(next_index - 1, &prev_log); if (s == kOK) {// 获取上一次提交的最后一条日志的index和term LogEntry prev_entry; bool ret = prev_entry.ParseFromString(prev_log); assert(ret); prev_index = prev_entry.index(); prev_term = prev_entry.term(); } request->set_prev_log_index(prev_index); request->set_prev_log_term(prev_term); for (int64_t i = next_index; i <= log_index_; i++) {//把新产生的日志放req里 std::string log; s = log_db_->Read(i, &log); if (s != kOK) { LOG(FATAL, "Data lost: %ld", i); break; } LOG(INFO, "Add %ld to request", i); LogEntry* entry = request->add_entries(); bool ret = entry->ParseFromString(log); assert(ret); max_index = entry->index();//记录最后一个log的index和term,等下更新用 max_term = entry->term(); if (request->ByteSize() >= 1024*1024) {//限制req大小,如果发太多但失败了,重发就恶心了 break; } } } RaftNode_Stub* node; rpc_client_->GetStub(nodes_[id], &node);//给follower发送日志,这里使用阻塞rpc,等返回结果,follower的响应函数为AppendEntries。先直接看下面leader对结果的处理 bool ret = rpc_client_->SendRequest(node, &RaftNode_Stub::AppendEntries, request.get(), response.get(), 1, 1); LOG(INFO, "Replicate %d entrys to %s return %d", request->entries_size(), nodes_[id].c_str(), ret); delete node; mu_.Lock(); if (ret) { int64_t term = response->term(); if (CheckTerm(term)) {//如果term匹配的(term<=current_term_) if (response->success()) {//这里表示日志匹配了(空日志也可以) if (max_index && max_term == current_term_) {//有日志且是当前term的日志 follower->match_index = max_index;//更新follower的match,next状态 follower->next_index = max_index + 1; LOG(INFO, "Replicate to %s success match %ld next %ld", nodes_[id].c_str(), max_index, max_index + 1); std::vector<int64_t> match_index;//log被接收后搜集所有follower的match_index for (uint32_t i = 0; i < nodes_.size(); i++) { if (nodes_[i] == self_) { match_index.push_back(1LL<<60);//如果是自身,一定match了,当做最大 } else { match_index.push_back(follower_context_[i]->match_index); } } std::sort(match_index.begin(), match_index.end()); int mid_pos = (nodes_.size() - 1) / 2;//排序,获取多数派的match_index int64_t commit_index = match_index[mid_pos]; if (commit_index > commit_index_) {// commit_index更新了,本地commit LOG(INFO, "Update commit_index from %ld to %ld", commit_index_, commit_index); commit_index_ = commit_index; while (last_applied_ < commit_index) {//依次本地应用日志 last_applied_ ++; LOG(INFO, "[Raft] Apply %ld to leader", last_applied_); std::map<int64_t, std::function<void (bool)> >::iterator cb_it = callback_map_.find(last_applied_);//日志的callback if (cb_it != callback_map_.end()) { std::function<void (bool)> callback = cb_it->second; callback_map_.erase(cb_it); mu_.Unlock(); LOG(INFO, "[Raft] AppendLog callback %ld", last_applied_); callback(true);//处理callback mu_.Lock(); } else { LOG(INFO, "[Raft] no callback for %ld", last_applied_); } } if (last_applied_ == commit_index) {//如果处理完才持久化last_applied StoreContext("last_applied", last_applied_); } } }// end of response->success(),没日志,也就是心跳不处理任何事 } else {// 日志不匹配,这里是回退一个log_index_,等待下一次复制(Q:差距比较大会不会导致效率低?) if (follower->next_index > follower->match_index && follower->next_index > 1) { --follower->next_index; } } } } }
这里是日志复制follower的处理逻辑:
void RaftNodeImpl::AppendEntries(::google::protobuf::RpcController* controller, const ::baidu::bfs::AppendEntriesRequest* request, ::baidu::bfs::AppendEntriesResponse* response, ::google::protobuf::Closure* done) { MutexLock lock(&mu_); int64_t term = request->term();// leader的term if (term < current_term_) {//leader term还没自己大?可能是延迟到达,但我已经当选leader。也可能是发生分区,旧leader已经不是最新leader了 LOG(INFO, "AppendEntries old term %ld / %ld", term, current_term_); response->set_success(false); done->Run();//不知道干什么的?回包? return; } CheckTerm(term);//比较term,可能转变为follower if (term == current_term_ && node_state_ == kCandidate) {//对方已经开始发送日志,term相等,但我还是候选状态,说明对方已经当选leader了 node_state_ = kFollower; } leader_ = request->leader(); ResetElection();//日志(心跳)到了,重置选举超时时间 int64_t prev_log_term = request->prev_log_term(); int64_t prev_log_index = request->prev_log_index();//上次一次同步的最后一个log_index if (prev_log_index > 0) { // check prev term,检查日志连续性 std::string log; StatusCode s = log_db_->Read(prev_log_index, &log); LogEntry entry; if (s == kOK && !entry.ParseFromString(log)) { LOG(FATAL, "Paser logdb value fail:%ld", prev_log_index); } if (s == kNsNotFound || entry.term() != prev_log_term) {//日志不连续,让leader调整next_index LOG(INFO, "[Raft] Last index %ld term %ld / %ld mismatch", prev_log_index, prev_log_term, entry.term()); response->set_success(false); done->Run(); return; } LOG(INFO, "[Raft] Last index %ld term %ld match", prev_log_index, prev_log_term); } /// log match... int64_t leader_commit = request->leader_commit(); int entry_count = request->entries_size(); if (entry_count > 0) {//有日志要处理 LOG(INFO, "AppendEntries from %s %ld \"%s\" %ld", request->leader().c_str(), term, request->entries(0).log_data().c_str(), leader_commit); for (int i = 0; i < entry_count; i++) { int64_t index = request->entries(i).index(); if (log_index_ >= index) {//有重复的日志 LOG(INFO, "[raft] Ignore duplicate entry %ld, last log_index: %ld", index, log_index_); continue; } else if (log_index_ + 1 != index && log_index_ != -1) {//预期index是连续的(Q:不连续怎么办?) LOG(FATAL, "[Raft] Wrong index %ld in AppendEntries, last log_index: %ld", index, log_index_); } bool ret = StoreLog(current_term_, index, request->entries(i).log_data());//存储日志 log_index_ = index;//对follower来说log_index_是接受的最后一个日志的index if (!ret) { response->set_success(false); done->Run(); return; } } } response->set_term(current_term_); response->set_success(true); done->Run(); //到这里日志复制都成功了,更新本地commit_index_ if (leader_commit > commit_index_) { commit_index_ = leader_commit; } if (commit_index_ > last_applied_) {//有新日志可以应用,异步处理 thread_pool_->AddTask(std::bind(&RaftNodeImpl::ApplyLog, this)); } }
接着看看应用日志,代码很简单,读取[last_applied_+1, commit_index_]区间的日志,依次调用callback,并更新last_applied_落地。不过这里值得注意的是,只有log_type为kUserLog的才应用,log还有一种kRaftCmd类型,这个类型日志后面说明。
日志复制大概就是以上流程,上面提到,每次如果没有收到日志复制(心跳)RPC,则有一个随机时间的delay_task触发选举,选举主体:
void RaftNodeImpl::Election() { MutexLock lock(&mu_); if (node_state_ == kLeader) {//已经是leader,取消本次操作(Q:因为其他节点回应慢了,所以多触发一次?) election_taskid_ = -1; return; } voted_.clear();//重新统计投票信息 current_term_ ++; node_state_ = kCandidate;//候选状态 voted_for_ = self_;//先投自己(标识投过谁,每个term只投一次,不重复) if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) { LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_); } voted_.insert(self_);//谁投过我,多数派判断的地方 LOG(INFO, "Start Election: CT%ld LI%ld LT%ld", current_term_, log_index_, log_term_); for (uint32_t i = 0; i < nodes_.size(); i++) {//对每个非自己的node,要求他们投我 if (nodes_[i] == self_) { continue; } LOG(INFO, "Send VoteRequest to %s", nodes_[i].c_str()); VoteRequest* request = new VoteRequest; request->set_term(current_term_); request->set_candidate(self_); request->set_last_log_index(log_index_); request->set_last_log_term(log_term_);//拿出自己最后log的term,index,当前选举term.(Q:log_term_好像并没有用到,一直都是0?) VoteResponse* response = new VoteResponse; RaftNode_Stub* raft_node; rpc_client_->GetStub(nodes_[i], &raft_node); std::function<void (const VoteRequest*, VoteResponse*, bool, int)> callback = std::bind(&RaftNodeImpl::ElectionCallback, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4, nodes_[i]); rpc_client_->AsyncRequest(raft_node, &RaftNode_Stub::Vote, request, response, callback, 60, 1);//这里和日志复制不同了,没有使用同步RPC,可能是节点多了,用不着等待,也没有promise/future并行模式等待N-1个。所以入口需要判断是否已经是leader delete raft_node; } election_taskid_ = thread_pool_->DelayTask(election_timeout_ + rand() % election_timeout_, std::bind(&RaftNodeImpl::Election, this));//计划下一次选举 }
看看对选举的回应:
void RaftNodeImpl::Vote(::google::protobuf::RpcController* controller, const ::baidu::bfs::VoteRequest* request, ::baidu::bfs::VoteResponse* response, ::google::protobuf::Closure* done) { int64_t term = request->term(); const std::string& candidate = request->candidate(); int64_t last_log_index = request->last_log_index(); int64_t last_log_term = request->last_log_term(); LOG(INFO, "Recv vote request: %s %ld %ld / (%s %ld %ld)", candidate.c_str(), term, last_log_term, voted_for_.c_str(), current_term_, log_term_); MutexLock lock(&mu_); CheckTerm(term); if (term >= current_term_//给对方投票的条件: 1.对方term>=当前选举term && (voted_for_ == "" || voted_for_ == candidate)//2.没投过 或 已经给对方投过 && (last_log_term > log_term_ ||//3.对方日志term较大 或 日志相等,但log序号更大 (last_log_term == log_term_ && last_log_index >= log_index_))) { voted_for_ = candidate;//记录本次term投过的 if (!StoreContext("current_term", current_term_) || !StoreContext("voted_for", voted_for_)) { LOG(FATAL, "Store term & vote_for fail %s %ld", voted_for_.c_str(), current_term_); } else { LOG(INFO, "Granted %s %ld %ld", candidate.c_str(), term, last_log_index); } response->set_vote_granted(true); response->set_term(term); done->Run(); return; } response->set_vote_granted(false);//否则拒绝本次投票 response->set_term(current_term_); done->Run(); }
最后是投票结果的处理:
void RaftNodeImpl::ElectionCallback(const VoteRequest* request, VoteResponse* response, bool failed, int error, const std::string& node_addr) { std::unique_ptr<const VoteRequest> req(request); std::unique_ptr<VoteResponse> res(response); if (failed) { return; } int64_t term = response->term(); bool granted = response->vote_granted(); assert(term >= request->term() && (term == request->term() || !granted));//对结果的预期;1.投了则resp的term和req的相等。2.没投说明对方resp的term较大 或term相等,但log较新。 LOG(INFO, "ElectionCallback %s by %s %ld / %ld", granted ? "granted" : "reject", node_addr.c_str(), term, current_term_); MutexLock lock(&mu_); CheckTerm(term); if (term != current_term_ || !granted || node_state_ == kLeader) {//异步回调,状态都要判断 return; } voted_.insert(node_addr);//每次有人投我,加入被投列表(上面过滤了非current_term_的cb) if (voted_.size() >= (nodes_.size() / 2) + 1) {//拿到多数投票 leader_ = self_; node_state_ = kLeader; CancelElection();//取消选举超时 LOG(INFO, "Change state to Leader, term %ld index %ld commit %ld applied %ld", current_term_, log_index_, commit_index_, last_applied_); StoreLog(current_term_, ++log_index_, "", kRaftCmd);//立即插入一个kRaftCmd空内容的日志,在日志应用里是被忽略的。但这一步很重要,从根本上来说强化了node的leader合法地位。这里举2个例子:[1].有ABC三节点,A和C分区,但和都和B连通。假设A是靠term自增和一个较快的超时当选的,大家的log_index相同,还没有用户日志,如果没这一步,C觉得超时了,要重新选举,C是有可能根据term+较快超时当选的,于是AC轮流当选。有这一步了会使得B的log和A同步,故B不会投票给C,可以减少这样的情况(当然,C投票要求先到则还是会出现) [2].有ABCDE5个节点,a.A是leader,log_index=1,已经复制到B,AB挂掉。b.假设C当选,C接受了一条用户日志,log_index=1(term不同),C挂掉 c.A恢复,A复制D,这次已经复制到多数集可以提交了,AB再次挂 d.C当选,然后用自己的日志覆盖了AB的log_index=1(这是不允许的)。 有了这一步后在c步这里A会加一条log_index=2,必须先提交了,才能提交前面的log1。 for (uint32_t i = 0;i < follower_context_.size(); i++) {//更新follower状态 if (nodes_[i] != self_) { follower_context_[i]->match_index = 0; follower_context_[i]->next_index = log_index_ + 1; follower_context_[i]->condition.Signal();//触发和各个follower的日志复制(心跳),之前阻塞在condtion.Wait()处 } } } }
以上就是大致流程,代码实现看起来并不麻烦,主要是对比raft的细节和正确性
相关文章推荐
- 百度的bfs中raft协议实现(2)
- 以raft协议为基础的模拟zoomkeep选举唯一主机实现
- 副本集RAFT协议的实现(2):选主,主从切换
- redis集群实现(五) sentinel的架构与raft协议
- 副本集RAFT协议的实现(1):heartbeat
- raft协议的go语言实现
- Raft一致性协议实现源码
- redis集群实现(五) sentinel的架构与raft协议
- js实现百度搜索接口及链接功能
- Jsch初步 Java SFTP协议 实现
- C# 实现http协议GET、POST请求
- 开源一个C++实现的简单HTTP协议处理库
- [转]怎么样把百度搜索引入自己的网站JS实现(附源代码)
- IIS - 自动申请、部署Let's Encrypt的免费SSL证书(让网站实现HTTPS协议)
- (转载)百度Android定位SDK实现获取当前经纬度及位置
- Elasticsearch实现类百度搜索引擎搜索功能ES5.5.0v
- etcd raft如何实现Linearizable Read
- Python模块学习 ---- httplib HTTP协议客户端实现
- C#实现http协议支持上传下载文件的GET、POST请求