TIKV副本一致性检查机制源码分析
2017-08-31 11:53
316 查看
背景
TIKV使用raft协议来实现副本同步,任何时刻写入一个key-val键值对,都会基于raft协议复制到不同机器的三个副本上,raft协议本身能保证副本同步的强一致性,但是任何系统都可能存在bug,如果由于程序bug导致出现副本不一致,我们需要有一个机制能够检测出来,同时这个一致性检测功能不应该影响系统的正常运转。以下内容需要对raft协议以及TIKV的整体架构有基本的了解,可以参考这篇文章:http://www.infoq.com/cn/articles/building-flexible-storage-system-based-on-raft基本思想
集群中每个TIKV进程都运行有一个检测线程,检测线程周期性的从所有的本地副本中选出一个距离上一次检测时间最老的Leader副本,写一条命令字为AdminCmdType::ComputeHash的raft log,接下来:Leader和Follow在on_apply这条log时候时候做rocksdb的snapshot,这样可以保证leader和follow是在同一log位置做的snapshot,同时会使用on_apply这条log时候的raft log index作为id用以标识这一轮校验。
然后基于这个snapshot异步的计算checksum,并保存到内存中的Peer对象中。
异步计算完成以后,如果是Leader,那么会再次写一条命令字为AdminCmdType::VerifyHash的命令,内容为leader计算出来的checksum以及id。
Follow收到AdminCmdType::VerifyHash的命令以后,如果从log中解析出来的id和本地保存的id相同,会根据解析出来的checksum和自己本地保存的checksum计算比较,如果checksum不相同,说明副本不一致。
源码分析
一致性检测定时器on_consistency_check_tickfn on_consistency_check_tick(&mut self, event_loop: &mut EventLoop<Self>) { // 检测过程会涉及到扫描rocksdb,为了对系统的正常读写提供影响,只有等上一次的checksum计算完成,才会发起下一个region的副本校验 if self.consistency_check_worker.is_busy() { self.register_consistency_check_tick(event_loop); return; } // 选出一个距离上一次校验时间最老的region let (mut candidate_id, mut candidate_check_time) = (0, Instant::now()); for (®ion_id, peer) in &mut self.region_peers { if !peer.is_leader() { continue; } if peer.consistency_state.last_check_time < candidate_check_time { candidate_id = region_id; candidate_check_time = peer.consistency_state.last_check_time; } } // 如果存在,则写一条命令字为AdminCmdType::ComputeHash的raft log if candidate_id != 0 { let peer = &self.region_peers[&candidate_id]; info!("{} scheduling consistent check", peer.tag); let msg = Msg::new_raft_cmd(new_compute_hash_request(candidate_id, peer.peer.clone()), Box::new(|_| {})); if let Err(e) = self.sendch.send(msg) { error!("{} failed to schedule consistent check: {:?}", peer.tag, e); } } // 重新注册定时器 self.register_consistency_check_tick(event_loop); }
这条log被commit后,leader和follow都会被触发on_ready_compute_hash函数
fn on_ready_compute_hash(&mut self, region: metapb::Region, index: u64, snap: EngineSnapshot) { let region_id = region.get_id(); self.region_peers.get_mut(®ion_id).unwrap().consistency_state.last_check_time = Instant::now(); // 触发异步checksum计算,必须用异步是因为不能阻塞RAFT线程 let task = ConsistencyCheckTask::compute_hash(region, index, snap); info!("[region {}] schedule {}", region_id, task); if let Err(e) = self.consistency_check_worker.schedule(task) { error!("[region {}] schedule failed: {:?}", region_id, e); } }
checksum异步计算完成后,会回调fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Msg)函数,在这里会调用on_hash_computed,传入的参数为checksum计算结果
fn notify(&mut self, event_loop: &mut EventLoop<Self>, msg: Msg) { match msg { Msg::RaftMessage(data) => { if let Err(e) = self.on_raft_message(data) { error!("{} handle raft message err: {:?}", self.tag, e); } } Msg::RaftCmd { send_time, request, callback } => { self.raft_metrics .propose .request_wait_time .observe(duration_to_sec(send_time.elapsed()) as f64); self.propose_raft_command(request, callback) } Msg::SnapshotStats => self.store_heartbeat_pd(), // 调用on_hash_computed异步的计算checksum Msg::ComputeHashResult { region_id, index, hash } => { self.on_hash_computed(region_id, index, hash); } } }
在on_hash_computed会把计算出来的checksum信息保存起来,如果是leader那么会发送一条命令字为AdminCmdType::VerifyHash的raft log, log内容为计算出来的checksum值
fn on_hash_computed(&mut self, region_id: u64, index: u64, hash: Vec<u8>) { let (state, peer) = match self.region_peers.get_mut(®ion_id) { None => { warn!("[region {}] receive stale hash at index {}", region_id, index); return; } Some(p) => (&mut p.consistency_state, &p.peer), }; // 会把计算出来的checksum以及index(raft log的index)信息保存起来 // 注意在这里也可能会做一次checksum校验,后面会来说明这个问题 if !verify_and_store_hash(region_id, state, index, hash) { return; } // 接着会发送一条命令字为AdminCmdType::VerifyHash的raft log, log内容为计算出来的checksum和index值 // 这里需要加一个判断,只有leader才需要发送AdminCmdType::VerifyHash,尽管follow发送也不会成功,但是加判断代码更容易理解 let msg = Msg::new_raft_cmd(new_verify_hash_request(region_id, peer.clone(), state), Box::new(|_| {})); if let Err(e) = self.sendch.send(msg) { error!("[region {}] failed to schedule verify command for index {}: {:?}", region_id, index, e); } }
follow在on_apply时候接收到命令字为AdminCmdType::VerifyHash的rafg log时候会触发on_ready_verify_hash,然后这里会调用verify_and_store_hash做checksum校验
fn on_ready_verify_hash(&mut self, region_id: u64, expected_index: u64, expected_hash: Vec<u8>) { let state = match self.region_peers.get_mut(®ion_id) { None => { warn!("[region {}] receive stale hash at index {}", region_id, expected_index); return; } Some(p) => &mut p.consistency_state, }; // 在这个函数会触发校验逻辑 verify_and_store_hash(region_id, state, expected_index, expected_hash); }
verify_and_store_hash
// 注意这个函数同时候被 on_hash_computed和on_ready_verify_hash调用 // 也就是说存在两个需要做checksum校验的地方 // 在on_ready_verify_hash做checksum校验容易理解,这是正常的流程 // 1.leader和follow计算完checksum后,follow保存index和checksum到本地, // 接着leader发送命令字为AdminCmdType::VerifyHash的raft log // 2.follow收到这个命令后,接续出log中的checksum和index, // 如果解析出来的index和本地保存的index相同,那么开始校验checksum // 什么情况下在on_hash_computed会做checksum校验了? // 1.如果leader先于follow计算出checksum,并发送AdminCmdType::VerifyHash给follow // 2.follow收到这个命令后,发现index比本地的大,那么直接保存log中的checksum和index到本地 // 3.当follow的checksum计算完成后,再用计算出来的结果,和本地保存的checksum做校验
fn verify_and_store_hash(region_id: u64, state: &mut ConsistencyState, expected_index: u64, expected_hash: Vec<u8>) -> bool { if expected_index < state.index { REGION_HASH_COUNTER_VEC.with_label_values(&["verify", "miss"]).inc(); warn!("[region {}] has scheduled a new hash: {} > {}, skip.", region_id, state.index, expected_index); return false; } // 这里的传入的index为上次compuate_hash命令时候的index,只有index相同,才做region一致性校验 if state.index == expected_index { if state.hash != expected_hash { // 检测到副本不一致了! panic!("[region {}] hash at {} not correct, want {}, got {}!!!", region_id, state.index, escape(&expected_hash), escape(&state.hash)); } REGION_HASH_COUNTER_VEC.with_label_values(&["verify", "matched"]).inc(); state.hash = vec![]; return false; } if state.index != INVALID_INDEX && !state.hash.is_empty() { // Maybe computing is too slow or computed result is dropped due to channel full. // If computing is too slow, miss count will be increased twice. REGION_HASH_COUNTER_VEC.with_label_values(&["verify", "miss"]).inc(); warn!("[region {}] hash belongs to index {}, but we want {}, skip.", region_id, state.index, expected_index); } state.index = expected_index; state.hash = expected_hash; true }
问题
目前TIKV的实现,只能发现副本不一致,但是发现不一致后,如果更快速的定位问题了?在这一点上CockroachDB做的更进一步,在发现不一致后,又做了第二次校验,这次校验针对leader和follow的snapshot进行逐key的比较,并将diff上报到中心节点。on_consistency_check_tick的调度策略比较简陋,目前应该是在24小时不中断的做副本校验,优点是能更及时的发现问题,缺点是可能对系统的正常运行造成影响,更合理的策略是否应该是在半夜流量低峰期触发校验?
在发送AdminCmdType::VerifyHash之前,需要加一个判断,只有Leader才需要发送,这样代码更容易理解,尽管follow发送也会失败,这一点我已经提issue,周末有空会发pr修复。
相关文章推荐
- 4(phonegap源码分析)通道模块的事件订阅机制(channel)
- android的消息处理机制(图+源码分析)——Looper,Handler,Message
- quartz集群调度机制调研及源码分析---转载
- cache源码分析一 存储机制分析
- ceph源码分析--monitor的lease机制
- Ruby的GC机制源码分析(4)
- Android触摸屏事件派发机制详解与源码分析一(View篇)
- Android触摸屏事件派发机制详解与源码分析三(Activity篇)
- Fragment运行机制源码分析(二)
- 【Cocos2d-x 3.x】 事件处理机制源码分析
- Android AdapterView 源码分析以及其相关回收机制的分析 推荐
- Android 消息机制源码分析
- netty源码分析(二)Netty对Executor的实现机制源码分析
- SOFA 源码分析 — 扩展机制
- DSS源码分析--对RTSP请求的状态机处理机制
- 消息中间件 activeMQ的源码分析 之 TCP通讯机制
- Android消息机制源码分析
- Linux下Libpcap源码分析和包过滤机制 (2)
- 从源码的角度分析Android消息处理机制
- [转] android的消息处理机制(图+源码分析)——Looper,Handler,Message