您的位置:首页 > 其它

zookeeper源码分析-client分析

2014-06-08 12:16 381 查看
最近在阅读 zookeeper的源码。此系列文章仅作为个人记录,所以可能有点乱

zookeeper client端的代码分析:



zookeeper client 与 server之间的所有通信,包括 ping, getData, getChild, event 等等,都是通过ClientCnxnSocket 的实现类ClientCnxnSocketNIO 去做的。 ClientCnxnSocketNIO的源码如下:

public class ClientCnxnSocketNIO extends ClientCnxnSocket {
private static final Logger LOG = LoggerFactory
.getLogger(ClientCnxnSocketNIO.class);

private final Selector selector = Selector.open();

private SelectionKey sockKey;

ClientCnxnSocketNIO() throws IOException {
super();
}


Client 是由两个线程处理所有的事情的。SendThread处理所有请求的发送和相应的接受,该线程只关心两个队列,outgoingQueue和pendingQueue。要被发送的请求都会被ClientCnxn以packet的形式放入到 outgoingQueue中,对于用户的请求(不是ping和权限验证的请求)再发送出去后会被SendThread放入到pendingQueue中。当SendThread 接受到response(包括events 和其他的请求结果)时,会通过eventThread放入到WaitingEvents队列中,等待eventThread处理。

ClientCnxn 部分源码如下:

public class ClientCnxn {
private static final Logger LOG = LoggerFactory.getLogger(ClientCnxn.class);

private static final String ZK_SASL_CLIENT_USERNAME =
"zookeeper.sasl.client.username";

/** This controls whether automatic watch resetting is enabled.
* Clients automatically reset watches during session reconnect, this
* option allows the client to turn off this behavior by setting
* the environment variable "zookeeper.disableAutoWatchReset" to "true" */
private static boolean disableAutoWatchReset;
static {
// this var should not be public, but otw there is no easy way
// to test
disableAutoWatchReset =
Boolean.getBoolean("zookeeper.disableAutoWatchReset");
if (LOG.isDebugEnabled()) {
LOG.debug("zookeeper.disableAutoWatchReset is "
+ disableAutoWatchReset);
}
}

static class AuthData {
AuthData(String scheme, byte data[]) {
this.scheme = scheme;
this.data = data;
}

String scheme;

byte data[];
}

private final CopyOnWriteArraySet<AuthData> authInfo = new CopyOnWriteArraySet<AuthData>();

/**
* These are the packets that have been sent and are waiting for a response.
*/
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

/**
* These are the packets that need to be sent.
*/
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();

private int connectTimeout;

/**
* The timeout in ms the client negotiated with the server. This is the
* "real" timeout, not the timeout request by the client (which may have
* been increased/decreased by the server which applies bounds to this
* value.
*/
private volatile int negotiatedSessionTimeout;

private int readTimeout;

private final int sessionTimeout;

private final ZooKeeper zooKeeper;

private final ClientWatchManager watcher;


EventThread部分源码如下:

class EventThread extends Thread {
private final LinkedBlockingQueue<Object> waitingEvents =
new LinkedBlockingQueue<Object>();

/** This is really the queued session state until the event
* thread actually processes the event and hands it to the watcher.
* But for all intents and purposes this is the state.
*/
private volatile KeeperState sessionState = KeeperState.Disconnected;

private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: