zookeeper(15)源码分析-服务器(2)
2019-12-21 16:38
1481 查看
LearnerZooKeeperServer是所有Follower和Observer的父类,在LearnerZooKeeperServer里有2个重要的属性:
//提交请求处理器
protected CommitProcessor commitProcessor;
//同步处理器
protected SyncRequestProcessor syncProcessor;
FollowerZooKeeperServer和ObserverZooKeeperServer都继承了LearnerZooKeeperServer服务器。
1、FollowerZooKeeperServer
1.1、类属性
//待同步的请求 ConcurrentLinkedQueue<Request> pendingSyncs; //待处理的事务请求 LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
1.2、核心函数
1.2.1、setupRequestProcessors
构建请求处理链,FollowerZooKeeperServer的请求处理链是:
FollowerRequestProcessor -> CommitProcessor ->FinalRequestProcessor
@Override protected void setupRequestProcessors() { //最后的处理器 RequestProcessor finalProcessor = new FinalRequestProcessor(this); //第二个处理器 commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); //第一个请求处理器FollowerRequestProcessor firstProcessor = new FollowerRequestProcessor(this, commitProcessor); ((FollowerRequestProcessor) firstProcessor).start(); syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower())); syncProcessor.start(); }
1.2.2、logRequest
该函数将请求进行记录(放入到对应的队列中),等待处理。
public void logRequest(TxnHeader hdr, Record txn) { Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); //zxid不等于0,说明此服务器已经处理过请求 if ((request.zxid & 0xffffffffL) != 0) { // 将该请求放入pendingTxns中,等待事务处理 pendingTxns.add(request); } // 使用SyncRequestProcessor处理请求(其会将请求放在队列中,异步进行处理) syncProcessor.proce***equest(request); }
1.2.3、commit
函数会提交zxid对应的请求(pendingTxns的队首元素),其首先会判断队首请求对应的zxid是否为传入的zxid,然后再进行移除和提交(放在committedRequests队列中)。
public void commit(long zxid) { // 没有还在等待处理的事务 if (pendingTxns.size() == 0) { LOG.warn("Committing " + Long.toHexString(zxid) + " without seeing txn"); return; } // 队首元素的zxid long firstElementZxid = pendingTxns.element().zxid; // 如果队首元素的zxid不等于需要提交的zxid,则退出程序 if (firstElementZxid != zxid) { LOG.error("Committing zxid 0x" + Long.toHexString(zxid) + " but next pending txn 0x" + Long.toHexString(firstElementZxid)); System.exit(12); } // 从待处理事务请求队列中移除队首请求 Request request = pendingTxns.remove(); // 提交该请求 commitProcessor.commit(request); }
2、ObserverZooKeeperServer
2.1、类属性
// 同步处理器是否可用,系统参数控制 private boolean syncRequestProcessorEnabled = this.self.getSyncEnabled(); // 待同步请求队列 ConcurrentLinkedQueue<Request> pendingSyncs = new ConcurrentLinkedQueue<Request>();
2.2、核心方法
2.2.1、setupRequestProcessors
构建请求处理链,ObserverZooKeeperServer的请求处理链是:ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor,可能会存在SyncRequestProcessor。
@Override protected void setupRequestProcessors() { // We might consider changing the processor behaviour of // Observers to, for example, remove the disk sync requirements. // Currently, they behave almost exactly the same as followers. RequestProcessor finalProcessor = new FinalRequestProcessor(this); commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener()); commitProcessor.start(); firstProcessor = new ObserverRequestProcessor(this, commitProcessor); ((ObserverRequestProcessor) firstProcessor).start(); /* * Observer should write to disk, so that the it won't request * too old txn from the leader which may lead to getting an entire * snapshot. * * However, this may degrade performance as it has to write to disk * and do periodic snapshot which may double the memory requirements */ //是否使用同步处理器,看系统参数配置,会影响性能 if (syncRequestProcessorEnabled) { syncProcessor = new SyncRequestProcessor(this, null); syncProcessor.start(); } }
2.2.2、commitRequest
同步处理器可用,则使用同步处理器进行处理(放入同步处理器的queuedRequests队列中),然后提交请求(放入提交请求处理器的committedRequests队列中)
public void commitRequest(Request request) { if (syncRequestProcessorEnabled) { // Write to txnlog and take periodic snapshot //写事务日志,并定期快照 syncProcessor.proce***equest(request); } commitProcessor.commit(request); }
相关文章推荐
- 【Zookeeper】源码分析之服务器(一)
- 【Zookeeper】源码分析之服务器(二)之ZooKeeperServer
- 【Zookeeper】源码分析之服务器(二)
- 【Zookeeper】源码分析之服务器(五)之ObserverZooKeeperServer
- 【Zookeeper】源码分析之服务器(三)
- 【Zookeeper】源码分析之服务器(四)之FollowerZooKeeperServer
- zookeeper(14)源码分析-服务器(1)
- 【Zookeeper】源码分析之服务器(三)之LeaderZooKeeperServer
- 【Zookeeper】源码分析之服务器(一)
- 【Zookeeper】源码分析之服务器(三)
- 【Zookeeper】源码分析之服务器(三)
- 谷歌浏览器的源码分析(15)
- zookeeper3.3.3源码分析(二)FastLeader选举算法
- zookeeper源码分析-谭志强-专题视频课程
- Mangos源码分析(4):服务器结构探讨之继续世界服
- leaf开源服务器第四节-分析源码实现模拟TCP客户端
- Mangos源码分析(4):服务器结构探讨之继续世界服
- 高并发服务器架构笔记(3)——muduo_base 源码分析
- Zookeeper源码分析(7)- SyncRequestProcessor
- Zookeeper源码分析之六 Leader/Follower初始化