Hadoop源码分析13: IPC流程(8) Server的wait、notify
2014-05-28 08:47
381 查看
1.Server的wait、notify
public abstractclass Server {
public synchronized void join()throws
InterruptedException {
while
(running){
wait();
}
}
public synchronized void stop(){
LOG.info("Stopping
server on" + port);
running
=false;
if (handlers
!=null) {
for
(int i = 0; i< handlerCount; i++) {
if
(handlers[i]!= null) {
handlers[i].interrupt();
}
}
}
listener.interrupt();
listener.doStop();
responder.interrupt();
notifyAll();
}
}
2. ServerListenerReader的wait、notify
publicclass ServerListenerReader implements Runnable{
public void run(){
synchronized (this) {
while(serverListener.server.running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter =readSelector.selectedKeys().iterator();
while (iter.hasNext()){
key =iter.next();
iter.remove();
if(key.isValid()) {
if (key.isReadable()) {
serverListener.doRead(key);
}
}
key =null;
}
} catch (InterruptedException e) {
if(serverListener.server.running) { //
unexpected -- log it
}
} catch (IOException ex) {
}
}
}
}
public synchronized void finishAdd(){
adding = false;
this.notify();
}
}
3. ServerResponder的wait、notify
publicclass ServerResponder extends Thread {
public void run(){
Server.SERVER.set(server);
long lastPurgeTime = 0; // last check for old calls.
while (server.running){
try{
waitPending();
// If a channel is being registered,wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey>iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key =iter.next();
iter.remove();
try {
if(key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e){
}
}
long now =System.currentTimeMillis();
if (now< lastPurgeTime +PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not beensent out for a
// long time, discard them.
//
ArrayList<ServerCall>calls;
// get the list of channels from list ofkeys.
synchronized (writeSelector.keys()) {
calls = newArrayList<ServerCall>(writeSelector.keys().size());
iter =writeSelector.keys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
ServerCallcall = (ServerCall)key.attachment();
if (call!= null && key.channel() == call.connection.channel){
calls.add(call);
}
}
}
for(ServerCall call : calls) {
try {
doPurge(call, now);
} catch (IOException e){
}
}
} catch(OutOfMemoryError e) {
//
// we can run out of memory if we have too manythreads
// log the event and sleep for a minute andgive
// some thread(s) a chance to finish
//
try { Thread.sleep(60000); }catch (Exception ie) {}
} catch(Exception e) {
}
}
}
private synchronized void waitPending()throws
InterruptedException {
while (pending>0) {
wait();
}
}
private synchronized void decPending(){
// call done enqueueing.
pending--;
notify();
}
}
public abstractclass Server {
public synchronized void join()throws
InterruptedException {
while
(running){
wait();
}
}
public synchronized void stop(){
LOG.info("Stopping
server on" + port);
running
=false;
if (handlers
!=null) {
for
(int i = 0; i< handlerCount; i++) {
if
(handlers[i]!= null) {
handlers[i].interrupt();
}
}
}
listener.interrupt();
listener.doStop();
responder.interrupt();
notifyAll();
}
}
2. ServerListenerReader的wait、notify
publicclass ServerListenerReader implements Runnable{
public void run(){
synchronized (this) {
while(serverListener.server.running) {
SelectionKey key = null;
try {
readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter =readSelector.selectedKeys().iterator();
while (iter.hasNext()){
key =iter.next();
iter.remove();
if(key.isValid()) {
if (key.isReadable()) {
serverListener.doRead(key);
}
}
key =null;
}
} catch (InterruptedException e) {
if(serverListener.server.running) { //
unexpected -- log it
}
} catch (IOException ex) {
}
}
}
}
public synchronized void finishAdd(){
adding = false;
this.notify();
}
}
3. ServerResponder的wait、notify
publicclass ServerResponder extends Thread {
public void run(){
Server.SERVER.set(server);
long lastPurgeTime = 0; // last check for old calls.
while (server.running){
try{
waitPending();
// If a channel is being registered,wait.
writeSelector.select(PURGE_INTERVAL);
Iterator<SelectionKey>iter = writeSelector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key =iter.next();
iter.remove();
try {
if(key.isValid() && key.isWritable()) {
doAsyncWrite(key);
}
} catch (IOException e){
}
}
long now =System.currentTimeMillis();
if (now< lastPurgeTime +PURGE_INTERVAL) {
continue;
}
lastPurgeTime = now;
//
// If there were some calls that have not beensent out for a
// long time, discard them.
//
ArrayList<ServerCall>calls;
// get the list of channels from list ofkeys.
synchronized (writeSelector.keys()) {
calls = newArrayList<ServerCall>(writeSelector.keys().size());
iter =writeSelector.keys().iterator();
while (iter.hasNext()){
SelectionKey key = iter.next();
ServerCallcall = (ServerCall)key.attachment();
if (call!= null && key.channel() == call.connection.channel){
calls.add(call);
}
}
}
for(ServerCall call : calls) {
try {
doPurge(call, now);
} catch (IOException e){
}
}
} catch(OutOfMemoryError e) {
//
// we can run out of memory if we have too manythreads
// log the event and sleep for a minute andgive
// some thread(s) a chance to finish
//
try { Thread.sleep(60000); }catch (Exception ie) {}
} catch(Exception e) {
}
}
}
private synchronized void waitPending()throws
InterruptedException {
while (pending>0) {
wait();
}
}
private synchronized void decPending(){
// call done enqueueing.
pending--;
notify();
}
}
相关文章推荐
- Hadoop源码分析9:IPC流程(4) Client 的 wait() 和 notify()
- Hadoop源码分析11: IPC流程(6)volatile
- Hadoop源码分析之IPC中Server端的初始化与启动
- Hadoop源码分析10: IPC流程(5) Atomic
- Hadoop源码分析7: IPC流程(1) 主要类
- Hadoop源码分析12: IPC流程(7)容器
- Hadoop源码分析7: IPC流程(2) 流程
- Hadoop源码分析16: IPC流程(11) 整体流程
- Hadoop源码分析14: IPC流程(9) SelectionKey
- Hadoop源码分析8: IPC流程(3)客户端的clients、connections、calls复用
- [hadoop源码阅读][6]-org.apache.hadoop.ipc-ipc.server
- Hadoop源码流程分析4-Task节点执行任务
- Hadoop源码分析之IPC机制
- Hadoop源码 – ipc.Server
- Hadoop 中 IPC 的源码分析
- Hadoop 中 IPC 的源码分析
- Java NIO框架Netty教程(四) – ServerBootStrap启动流程源码分析
- Hadoop源码分析笔记(八):HDFS主要流程
- Hbase 源码分析6 -- Regionserver上的 Get 全流程
- Hadoop 中 IPC 的源码分析