您的位置:首页 > 大数据 > 人工智能

Hadoop源码分析13: IPC流程(8) Server的wait、notify

2014-05-28 08:47 381 查看
1.Server的wait、notify

public abstractclass Server {

public synchronized void join()throws
InterruptedException {
while
(running){
wait();
}
}

public synchronized void stop(){
LOG.info("Stopping
server on" + port);
running
=false;
if (handlers
!=null) {
for
(int i = 0; i< handlerCount; i++) {
if
(handlers[i]!= null) {
handlers[i].interrupt();
}
}
}
listener.interrupt();
listener.doStop();
responder.interrupt();
notifyAll();
}

}

2. ServerListenerReader的wait、notify

publicclass ServerListenerReader implements Runnable{

public void run(){
synchronized (this) {
while(serverListener.server.running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}

Iterator<SelectionKey> iter =readSelector.selectedKeys().iterator();
while (iter.hasNext()){
key =iter.next();
iter.remove();
if(key.isValid()) {
if (key.isReadable()) {
serverListener.doRead(key);
}
}
key =null;
}
} catch (InterruptedException e) {
if(serverListener.server.running) { //
unexpected -- log it
}
} catch (IOException ex) {
}
}
}
}

public synchronized void finishAdd(){
adding = false;
this.notify();

}
}

3. ServerResponder的wait、notify

publicclass ServerResponder extends Thread {

public void run(){
Server.SERVER.set(server);
long lastPurgeTime = 0; // last check for old calls.

while (server.running){
try{
waitPending();
// If a channel is being registered,wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey>iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key =iter.next();
iter.remove();
try {
if(key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e){
}
}
long now =System.currentTimeMillis();
if (now< lastPurgeTime +PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not beensent out for a
// long time, discard them.
//
ArrayList<ServerCall>calls;

// get the list of channels from list ofkeys.
synchronized (writeSelector.keys()) {
calls = newArrayList<ServerCall>(writeSelector.keys().size());
iter =writeSelector.keys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
ServerCallcall = (ServerCall)key.attachment();
if (call!= null && key.channel() == call.connection.channel){
calls.add(call);
}
}
}

for(ServerCall call : calls) {
try {
doPurge(call, now);
} catch (IOException e){
}
}
} catch(OutOfMemoryError e) {
//
// we can run out of memory if we have too manythreads
// log the event and sleep for a minute andgive
// some thread(s) a chance to finish
//
try { Thread.sleep(60000); }catch (Exception ie) {}
} catch(Exception e) {
}
}
}

private synchronized void waitPending()throws
InterruptedException {
while (pending>0) {
wait();
}
}
private synchronized void decPending(){
// call done enqueueing.
pending--;
notify();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: