您的位置:首页 > 运维架构 > Tomcat

Tomcat7中NIO处理分析(二)

2015-11-15 15:50 465 查看
  • 6.PollerEvent处理流程

Poller处理的核心是启动执行事件队列中的PollerEvent,接着从selector中遍历已经就绪的key,一旦发生了感兴趣的事件,则交由processSocket方法处理。PollerEvent的作用是向socket注册或更新感兴趣的事件:

/**
*
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent implements Runnable {

// 每个PollerEvent都会保存NioChannel的引用
protected NioChannel socket;
protected int interestOps;
protected KeyAttachment key;
public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
reset(ch, k, intOps);
}

public void reset(NioChannel ch, KeyAttachment k, int intOps) {
socket = ch;
interestOps = intOps;
key = k;
}

public void reset() {
reset(null, null, 0);
}

@Override
public void run() {
//socket第一次注册到selector中,完成对socket读事件的注册
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
// socket之前已经注册到了selector中,更新socket所感兴趣的事件
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
boolean cancel = false;
if (key != null) {
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
//handle callback flag
if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
att.setCometNotify(true);
} else {
att.setCometNotify(false);
}
interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
// 刷新事件的最后访问时间,防止事件超时
att.access();//to prevent timeout
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
} else {
cancel = true;
}
} else {
cancel = true;
}
if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
}catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
}catch (Exception ignore) {}
}
}//end if
}//run

@Override
public String toString() {
return super.toString()+"[intOps="+this.interestOps+"]";
}
}

 

 

  • 7.将socket交给Worker执行

在第5步的Poller处理流程的分析中看到它的run方法最后会调用processKey()处理selector检测到的通道事件,而在这个方法最后会调用processSocket来调用具体的通道处理逻辑,看下processSocket方法的实现:

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
// 从SocketProcessor的缓存队列中取出一个来处理socket
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
// 将有事件发生的socket交给Worker处理
if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:"+socket,rx);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

Poller通过NioEndpoint的协调,将发生事件的socket交给工作者线程Worker来进一步处理。整个事件框架的工作就到此结束,下面就是Worker的处理。

 

 

  • 8.从socket中处理请求

在Tomcat6版本的NIO处理实现中有一个Worker类,在Tomcat7中把它去掉了,但工作者的职责还在,只是交由了上面看到的SocketProcessor这个类来担当,看下这个类的实现代码:

// ---------------------------------------------- SocketProcessor Inner Class
// 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {

// 每个SocketProcessor保存一个NioChannel的引用
protected NioChannel socket = null;
protected SocketStatus status = null;

public SocketProcessor(NioChannel socket, SocketStatus status) {
reset(socket,status);
}

public void reset(NioChannel socket, SocketStatus status) {
this.socket = socket;
this.status = status;
}

@Override
public void run() {
// 从socket中获取SelectionKey
SelectionKey key = socket.getIOChannel().keyFor(
socket.getPoller().getSelector());
KeyAttachment ka = null;

if (key != null) {
ka = (KeyAttachment)key.attachment();
}

// Upgraded connections need to allow multiple threads to access the
// connection at the same time to enable blocking IO to be used when
// NIO has been configured
if (ka != null && ka.isUpgraded() &&
SocketStatus.OPEN_WRITE == status) {
synchronized (ka.getWriteThreadLock()) {
doRun(key, ka);
}
} else {
synchronized (socket) {
doRun(key, ka);
}
}
}

private void doRun(SelectionKey key, KeyAttachment ka) {
try {
int handshake = -1;

try {
if (key != null) {
// For STOP there is no point trying to handshake as the
// Poller has been stopped.
if (socket.isHandshakeComplete() ||
status == SocketStatus.STOP) {
handshake = 0;
} else {
handshake = socket.handshake(
key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
status = SocketStatus.OPEN_READ;
}
}
}catch ( IOException x ) {
handshake = -1;
if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
}catch ( CancelledKeyException ckx ) {
handshake = -1;
}
if ( handshake == 0 ) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (status == null) {
// 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
if (state == SocketState.CLOSED) {
// Close socket and pool
try {
close(ka, socket, key, SocketStatus.ERROR);
} catch ( Exception x ) {
log.error("",x);
}
}
} else if (handshake == -1 ) {
close(ka, socket, key, SocketStatus.DISCONNECT);
} else {
ka.getPoller().add(socket, handshake);
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key, null, false);
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
log.error("", oom);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
}
releaseCaches();
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
}catch ( Throwable t ) {
log.error("",t);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
}
} finally {
socket = null;
status = null;
//return to cache
if (running && !paused) {
processorCache.offer(this);
}
}
}

private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,
SocketStatus socketStatus) {
...
}
}

可以看到由SocketProcessor寻找合适的Handler处理器做最终socket转换处理。

 

可以用下面这幅图总结一下NioEndpoint的主要流程:


Acceptor和Poller是线程数组,Worker是一个线程池(Executor)

 

 

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: