etcd raft如何实现Linearizable Read
2017-08-04 14:52
330 查看
Linearizable Read通俗来讲,就是读请求需要读到最新的已经commit的数据,不会读到老数据。
对于使用raft协议来保证多副本强一致的系统中,读写请求都可以通过走一次raft协议来满足。然后,现实系统中,读请求通常会占很大比重,如果每次读请求都要走一次raft落盘,性能可想而知。所以优化读性能至关重要。
从raft协议可知,leader拥有最新的状态,如果读请求都走leader,那么leader可以直接返回结果给客户端。然而,在出现网络分区和时钟快慢相差比较大的情况下,这有可能会返回老的数据,即stale read,这违反了Linearizable Read。例如,leader和其他followers之间出现网络分区,其他followers已经选出了新的leader,并且新的leader已经commit了一堆数据,然而由于不同机器的时钟走的快慢不一,原来的leader可能并没有发觉自己的lease过期,仍然认为自己还是合法的leader直接给客户端返回结果,从而导致了stale
read。
Raft作者提出了一种叫做ReadIndex的方案:
当leader接收到读请求时,将当前commit index记录下来,记作read index,在返回结果给客户端之前,leader需要先确定自己到底还是不是真的leader,确定的方法就是给其他所有peers发送一次心跳,如果收到了多数派的响应,说明至少这个读请求到达这个节点时,这个节点仍然是leader,这时只需要等到commit index被apply到状态机后,即可返回结果。
处理读请求时,应用的goroutine会调用这个函数,其中rctx参数相当于读请求id,全局保证唯一。step会往recvc中塞进一个MsgReadIndex消息,而运行node入口函数
的goroutine会从recvc中拿出这个message,并进行处理:
Step(m)最终会调用到raft结构体的step(m),step是个函数指针,根据node的角色,运行stepLeader()/stepFollower()/stepCandidate()。
如果node是leader,stepLeader()主要代码片段:
首先,r.raftLog.zeroTermOnErrCompacted需要检查leader是否在当前term有过commit entry,小论文5.4节关于Safety中给出了解释,以及不这么做会有什么问题,并且给出了反例。
其次,本文讨论的ReadIndex方案对应的是ReadOnlySafe这个option分支,其中addRequest(...)会把这个读请求到达时的commit index保存起来,并且维护一些状态信息,而bcastHeartbeatWithCtx(...)准备好需要发送给peers的心跳消息MsgHeartbeat。当node收到心跳响应消息MsgHeartbeatResp时处理如下:
只保留逻辑相关代码:
首先只有ReadOnlySafe这个方案时,才会继续往下走。如果接收到了多数派的心跳响应,则会从刚才保存的信息中将对应读请求当时的commit index和请求id拿出来,填充到ReadState中,ReadState结构如下:
可以看出ReadState实际上包含了一个读请求到达node时,当前raft的状态commit index和请求id。
然后将ReadState append到raft结构体中的readStates数组中,readStates数组会被包含在Ready结构体中从readyc中pop出来供应用使用。
看看etcdserver是怎么使用的:
首先,在消费Ready的goroutine中:
这里重点是把Ready中的ReadState放入readStateC中,readStateC是一个buffer大小为1的channel
然后,在etcdserver跑linearizableReadLoop()的另外一个goroutine中:
至此,ReadIndex流程结束,总结一下,就四步:
leader check自己是否在当前term commit过entry
leader记录下当前commit index,然后leader给所有peers发心跳广播
收到多数派响应代表读请求到达时还是leader,然后等待apply index大于等于commit index
返回结果
etcd不仅实现了leader上的read only query,同时也实现了follower上的read only query,原理是一样的,只不过读请求到达follower时,commit index是需要向leader去要的,leader返回commit index给follower之前,同样,需要走上面的ReadIndex流程,因为leader同样需要check自己到底还是不是leader,代码不赘述。
对于使用raft协议来保证多副本强一致的系统中,读写请求都可以通过走一次raft协议来满足。然后,现实系统中,读请求通常会占很大比重,如果每次读请求都要走一次raft落盘,性能可想而知。所以优化读性能至关重要。
从raft协议可知,leader拥有最新的状态,如果读请求都走leader,那么leader可以直接返回结果给客户端。然而,在出现网络分区和时钟快慢相差比较大的情况下,这有可能会返回老的数据,即stale read,这违反了Linearizable Read。例如,leader和其他followers之间出现网络分区,其他followers已经选出了新的leader,并且新的leader已经commit了一堆数据,然而由于不同机器的时钟走的快慢不一,原来的leader可能并没有发觉自己的lease过期,仍然认为自己还是合法的leader直接给客户端返回结果,从而导致了stale
read。
Raft作者提出了一种叫做ReadIndex的方案:
当leader接收到读请求时,将当前commit index记录下来,记作read index,在返回结果给客户端之前,leader需要先确定自己到底还是不是真的leader,确定的方法就是给其他所有peers发送一次心跳,如果收到了多数派的响应,说明至少这个读请求到达这个节点时,这个节点仍然是leader,这时只需要等到commit index被apply到状态机后,即可返回结果。
func (n *node) ReadIndex(ctx context.Context, rctx []byte) error { return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}}) }
处理读请求时,应用的goroutine会调用这个函数,其中rctx参数相当于读请求id,全局保证唯一。step会往recvc中塞进一个MsgReadIndex消息,而运行node入口函数
func (n *node) run(r *raft)
的goroutine会从recvc中拿出这个message,并进行处理:
case m := <-n.recvc: // filter out response message from unknown From. if _, ok := r.prs[m.From]; ok || !IsResponseMsg(m.Type) { r.Step(m) // raft never returns an error }
Step(m)最终会调用到raft结构体的step(m),step是个函数指针,根据node的角色,运行stepLeader()/stepFollower()/stepCandidate()。
如果node是leader,stepLeader()主要代码片段:
case pb.MsgReadIndex: if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term { // Reject read only request when this leader has not committed any log entry at its term. return } if r.quorum() > 1 { switch r.readOnly.option { case ReadOnlySafe: r.readOnly.addRequest(r.raftLog.committed, m) r.bcastHeartbeatWithCtx(m.Entries[0].Data) case ReadOnlyLeaseBased: var ri uint64 if r.checkQuorum { ri = r.raftLog.committed } if m.From == None || m.From == r.id { // from local member r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data}) } else { r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries}) } } }
首先,r.raftLog.zeroTermOnErrCompacted需要检查leader是否在当前term有过commit entry,小论文5.4节关于Safety中给出了解释,以及不这么做会有什么问题,并且给出了反例。
其次,本文讨论的ReadIndex方案对应的是ReadOnlySafe这个option分支,其中addRequest(...)会把这个读请求到达时的commit index保存起来,并且维护一些状态信息,而bcastHeartbeatWithCtx(...)准备好需要发送给peers的心跳消息MsgHeartbeat。当node收到心跳响应消息MsgHeartbeatResp时处理如下:
只保留逻辑相关代码:
case pb.MsgHeartbeatResp: if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return } ackCount := r.readOnly.recvAck(m) if ackCount < r.quorum() { return } rss := r.readOnly.advance(m) for _, rs := range rss { req := rs.req if req.From == None || req.From == r.id { // from local member r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data}) } else { r.send(pb.Message{To: req.From, Type: pb. 4000 MsgReadIndexResp, Index: rs.index, Entries: req.Entries}) } }
首先只有ReadOnlySafe这个方案时,才会继续往下走。如果接收到了多数派的心跳响应,则会从刚才保存的信息中将对应读请求当时的commit index和请求id拿出来,填充到ReadState中,ReadState结构如下:
type ReadState struct { Index uint64 RequestCtx []byte }
可以看出ReadState实际上包含了一个读请求到达node时,当前raft的状态commit index和请求id。
然后将ReadState append到raft结构体中的readStates数组中,readStates数组会被包含在Ready结构体中从readyc中pop出来供应用使用。
看看etcdserver是怎么使用的:
首先,在消费Ready的goroutine中:
if len(rd.ReadStates) != 0 { select { case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]: case <-time.After(internalTimeout): plog.Warningf("timed out sending read state") case <-r.stopped: return } }
这里重点是把Ready中的ReadState放入readStateC中,readStateC是一个buffer大小为1的channel
然后,在etcdserver跑linearizableReadLoop()的另外一个goroutine中:
// 执行ReadIndex,ctx是request id if err := s.r.ReadIndex(cctx, ctx); err != nil { cancel() if err == raft.ErrStopped { return } plog.Errorf("failed to get read index from raft: %v", err) nr.notify(err) continue } //等待request id对应的ReadState从readStateC中pop出来 for !timeout && !done { select { case rs = <-s.r.readStateC: done = bytes.Equal(rs.RequestCtx, ctx) if !done { // a previous request might time out. now we should ignore the response of it and // continue waiting for the response of the current requests. plog.Warningf("ignored out-of-date read index response (want %v, got %v)", rs.RequestCtx, ctx) } case <-time.After(s.Cfg.ReqTimeout()): plog.Warningf("timed out waiting for read index response") nr.notify(ErrTimeout) timeout = true case <-s.stopping: return } } if !done { continue } // 等待当前apply index大于等于commit index if ai := s.getAppliedIndex(); ai < rs.Index { select { case <-s.applyWait.Wait(rs.Index): case <-s.stopping: return } }
至此,ReadIndex流程结束,总结一下,就四步:
leader check自己是否在当前term commit过entry
leader记录下当前commit index,然后leader给所有peers发心跳广播
收到多数派响应代表读请求到达时还是leader,然后等待apply index大于等于commit index
返回结果
etcd不仅实现了leader上的read only query,同时也实现了follower上的read only query,原理是一样的,只不过读请求到达follower时,commit index是需要向leader去要的,leader返回commit index给follower之前,同样,需要走上面的ReadIndex流程,因为leader同样需要check自己到底还是不是leader,代码不赘述。
相关文章推荐
- etcd raft如何实现Linearizable Read
- etcd raft如何实现leadership transfer
- etcd raft如何实现leadership transfer
- etcd raft如何实现成员变更
- 如何在Python中实现这五类强大的概率分布
- C语言中如何清空键盘缓冲区,Linux和Windows下完美实现
- asp.net如何实现迅雷下载功能
- SPQuery如何消除重复记录(实现联动性)
- 如何实现一个malloc:【来源】博客园
- 如何在C#中实现图片缩放
- 如何实现简单的位数组(bit array)
- epoll编程,如何实现高并发服务器开发?
- PHP如何实现跨域
- VC与JavaScript交互(一) --- 如何实现
- 一个接口有多个实现类,当调用接口中的方法时,如何判定用的是哪个实现类
- 如何做实时监控?—— 参考 Spring Boot 实现(转)
- iOS应用如何实现64位的支持
- Oracle如何实现创建数据库、备份数据库及数据导出导入的一条龙操作
- TCP可靠数据传输是如何实现的,OSPF的工作原理