ZooKeeper源码分析:Log和Snapshot持久化(SyncRequestProcessor类)
2015-03-03 23:50
363 查看
事务日志的持久化是在SyncRequestProcessor类中实现,并会按照一定的规则滚动日志(关闭当前文件,创建一个新文件),以及生成新的Snapshot。在持久化过程中,使用组提交(Group Commits)来优化磁盘io 操作。组提交是指将多个Request对象的事务作为一次写附加到磁盘上。使用这种方式可以在持久化多个事务的时候,只使用一次磁盘寻道(Disk Seek)的开销。Request对象只有在其中事务同步到磁盘后,才会传递到下一个处理器。
SyncRequestProcessor被用于下面三种不同的情景中:
Leader - 同步请求到磁盘,并且转发这个请求到AckRequestProcessor。该处理器发送Ack消息给Leader自己。
Follower - 同步请求到磁盘,并转发请求到SendAckRequestProcessor。该处理器发送确认数据包给Leader。SendAckRequestProcessor是flushable, 允许我们强制将数据包推送到Leader。
Observer - 同步已经提交的请求到磁盘(从INFORM数据包中接收)。它不会发送确认数据包给Leader。所以nextProcessor是null。在observer中, 和一般的txnlog语义不同,因为它仅包含已经提交的txn。
在SyncRequestProcessor中有两个关键队列:
queuedRequest队列:存放从传入该处理器的Request对象。当调用该处理器的processRequest方法,会将Request对象放入到queuedRequest队列;
toFlush队列:存放已经附加到日志文件,但还没有Flush的Request对象。
SyncRequestProcessor的run方法循环读取queuedRequests队列中的Request对象并进行持久化。
流程图如下:
![](http://img.blog.csdn.net/20150303235002940)
如果toFlush队列为空,则调用queuedRequest队列的阻塞方法take();如果toFlush队列不为空,则调用queuedRequest队列的非阻塞方法poll()。如果poll()方法返回null,则会立即将toFlush队列中所有Request对象中事务Flush到磁盘,并将Request对象传入到下一个处理器。这样可以避免增加请求处理的延时。如果queuedRequest.poll()方法返回不为Null或者queuedRequest.take()方法返回, 则将返回的Request对象si中的事务附加到事务日志文件中,并放入toFlush队列中。如果toFlush队列大小大于1000,则将队列中所有Request对象中事务Flush到磁盘,并将Request对象传入下一个处理器。这是可以避免在有大量请求的时候增加请求处理的延时。
Request对象附加到事务日志之后,会检查日志记录数logCount是否大于(snapCount / 2 + randRoll)。如果大于则滚动日志,并启动生成新Snapshot的线程。其中randRoll是一个随机数。这个随机数的使用可以避免Zookeeper集群里的所有机器同时构建Snapshot。
SyncRequestProcessor.run方法如下:
转载请附上原博客地址:/article/7785799.html
SyncRequestProcessor被用于下面三种不同的情景中:
Leader - 同步请求到磁盘,并且转发这个请求到AckRequestProcessor。该处理器发送Ack消息给Leader自己。
Follower - 同步请求到磁盘,并转发请求到SendAckRequestProcessor。该处理器发送确认数据包给Leader。SendAckRequestProcessor是flushable, 允许我们强制将数据包推送到Leader。
Observer - 同步已经提交的请求到磁盘(从INFORM数据包中接收)。它不会发送确认数据包给Leader。所以nextProcessor是null。在observer中, 和一般的txnlog语义不同,因为它仅包含已经提交的txn。
在SyncRequestProcessor中有两个关键队列:
queuedRequest队列:存放从传入该处理器的Request对象。当调用该处理器的processRequest方法,会将Request对象放入到queuedRequest队列;
toFlush队列:存放已经附加到日志文件,但还没有Flush的Request对象。
SyncRequestProcessor的run方法循环读取queuedRequests队列中的Request对象并进行持久化。
流程图如下:
如果toFlush队列为空,则调用queuedRequest队列的阻塞方法take();如果toFlush队列不为空,则调用queuedRequest队列的非阻塞方法poll()。如果poll()方法返回null,则会立即将toFlush队列中所有Request对象中事务Flush到磁盘,并将Request对象传入到下一个处理器。这样可以避免增加请求处理的延时。如果queuedRequest.poll()方法返回不为Null或者queuedRequest.take()方法返回, 则将返回的Request对象si中的事务附加到事务日志文件中,并放入toFlush队列中。如果toFlush队列大小大于1000,则将队列中所有Request对象中事务Flush到磁盘,并将Request对象传入下一个处理器。这是可以避免在有大量请求的时候增加请求处理的延时。
Request对象附加到事务日志之后,会检查日志记录数logCount是否大于(snapCount / 2 + randRoll)。如果大于则滚动日志,并启动生成新Snapshot的线程。其中randRoll是一个随机数。这个随机数的使用可以避免Zookeeper集群里的所有机器同时构建Snapshot。
SyncRequestProcessor.run方法如下:
public void run() { try { int logCount = 0; //这个随机数randRoll的使用可以避免Zookeeper集群里的所有机器同时构建Snapshot setRandRoll(r.nextInt( snapCount/2)); while (true ) { Request si = null; //如果toFlush为空,则调用队列queuedRequests的阻塞方法take() if (toFlush .isEmpty()) { si = queuedRequests.take(); } //如果toFlush不为空,则调用队列queuedRequests的非阻塞方法poll() else { si = queuedRequests.poll(); //如果si为null, 说明queuedRequests为空,则调用flush()方法 if (si == null) { flush( toFlush); continue; } } //如果si是一个poison pill, 则退出循环 if (si == requestOfDeath ) { break; } if (si != null) { // track the number of records written to the log //将record的操作记到日志中 if (zks .getZKDatabase().append(si)) { logCount++; if (logCount > (snapCount / 2 + randRoll)) { randRoll = r .nextInt(snapCount/2); //滚动事务日志 zks.getZKDatabase().rollLog(); //构建snapshot if (snapInProcess != null && snapInProcess.isAlive()) { LOG.warn("Too busy to snap, skipping" ); } else { //生成snapshot线程 snapInProcess = new Thread("Snapshot Thread") { public void run() { try { zks.takeSnapshot(); } catch(Exception e) { LOG.warn("Unexpected exception", e); } } }; //启动snapInProcess snapInProcess.start(); } logCount = 0; } } else if (toFlush .isEmpty()) { // optimization for read heavy workloads //如果这是一个read, 并且没有pending的flushes(writes), 那么直接传递到下一个处理器 if (nextProcessor != null) { nextProcessor.processRequest(si); if (nextProcessor instanceof Flushable) { ((Flushable) nextProcessor).flush(); } } continue; } toFlush.add(si); //如果toFlush的大小大于1000, 则flush if (toFlush .size() > 1000) { flush( toFlush); } } } } catch (Throwable t) { LOG.error("Severe unrecoverable error, exiting" , t); running = false ; System. exit(11); } LOG.info("SyncRequestProcessor exited!" ); }
转载请附上原博客地址:/article/7785799.html
相关文章推荐
- 【Zookeeper】源码分析之持久化--FileTxnSnapLog
- 【Zookeeper】源码分析之持久化(一)之FileTxnLog
- 【Zookeeper】源码分析之持久化(三)之FileTxnSnapLog
- Zookeeper源码分析之持久化(三)
- 【Zookeeper】源码分析之持久化--FileSnap
- Zookeeper源码分析之持久化(一)
- 【Zookeeper】源码分析之持久化(二)之FileSnap
- Zookeeper源码分析之持久化(二)
- Mongodb源码分析--日志及持久化
- Mongodb源码分析--日志及持久化
- zookeeper3.3.3源码分析(二)FastLeader选举算法
- nginx源码分析——初始化log
- 分布式文件系统KFS源码阅读与分析(二):MetaServer元数据持久化
- android的Log工具Log源码分析
- Mysql Innodb中undo-log和MVCC多版本一致性读的实现(源码分析)
- Storm-源码分析- Storm中Zookeeper的使用
- zookeeper源码阅读分析笔记--客户端服务端通信机制以及session超时、过期处理
- CYQ.IISLogViewer 一款IIS 日志分析工具 V1.0 发布[提供源码]
- Redis源码分析:snapshot
- Mongodb源码分析--日志及持久化