您的位置:首页 > 运维架构 > Tomcat

Tomcat源码解析之连接器解析请求

2017-06-01 23:10 211 查看
我们看一下Tomcat的总体架构图

 


其中最重要的组件就是Connector和Container,其中Connector负责监听客户端的请求并把他封装成容器统一的Request和Response对象。

Tomcat的连接器大致分两种:

HTTP

HTTP连接器是Tomcat默认设置的并且可用的,该连接器具有最低的延迟和最佳的整体性能。
对于集群来说,必须安装支持web粘性的会话的负载均衡,把同一个用户引导到相同的Tomcat服务器上。Tomcat支持mod_proxy(Apache HTTP Server 2.x, 包括 Apache HTTP Server 2.2)作为负载均衡。HTTP代理的性能通常低于AJP,因此在集群中使用AJP更合适。

AJP

单服务器的时候,使用AJP的性能比使用默认的HTTP连接器性能糟糕很多,尽管web应用的大部分是静态文件。如果因为某些原因需要和本地web服务器集成的话,AJP连接器比HTTP代理效率更高。从Tomcat集群的视角看AJP是最有效的。功能等同于HTTP集群。

因为是否支持APR,Tomcat也做了对应的实现,总的来说分了四种

HTTP/1.1 

org.apache.coyote.http11.Http11AprProtocol   支持APR
org.apache.coyote.http11.Http11NioProtocol  不支持APR

AJP/1.3  

org.apache.coyote.ajp.AjpAprProtocol   支持APR
org.apache.coyote.ajp.AjpAprProtocol  不支持APR

我们接下来详细分析一下org.apache.coyote.http11.Http11NioProtocol

其中有两个重要的属性

1. NioEndpoint

2. Http11ConnectionHandler

NioEndpoint用于监听建立连接,Http11ConnectionHandler用于请求识别处理。

看NioEndpoint的内部类Acceptor, 用于监听连接

protected class Acceptor extends AbstractEndpoint.Acceptor {

@Override
public void run() {

int errorDelay = 0;

// Loop until we receive a shutdown command
while (running) {

// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}

if (!running) {
break;
}
state = AcceptorState.RUNNING;

try {
//if we have reached max connections, wait
countUpOrAwaitConnection();

SocketChannel socket = null;
try {
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept();
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection();
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;

// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}

利用java.nio.channels.ServerSocketChannel#accept获取一个连接请求
他会把这个请求放入一个Poller中去轮询处理:

protected boolean processSocket(KeyAttachment attachment, SocketStatus status, boolean dispatch) {
try {
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
SocketProcessor sc = processorCache.pop();
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
log.warn(sm.getString("endpoint.executor.fail", attachment.getSocket()), ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

org.apache.tomcat.util.net.NioEndpoint.KeyAttachment继承自org.apache.tomcat.util.net.SocketWrapper,重要的属性



接下来看一下SocketProcessor

/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {

private KeyAttachment ka = null;
private SocketStatus status = null;

public SocketProcessor(KeyAttachment ka, SocketStatus status) {
reset(ka, status);
}

public void reset(KeyAttachment ka, SocketStatus status) {
this.ka = ka;
this.status = status;
}

@Override
public void run() {
NioChannel socket = ka.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(
socket.getPoller().getSelector());

// Upgraded connections need to allow multiple threads to access the
// connection at the same time to enable blocking IO to be used when
// NIO has been configured
if (ka.isUpgraded() && SocketStatus.OPEN_WRITE == status) {
synchronized (ka.getWriteThreadLock()) {
doRun(key, ka);
}
} else {
synchronized (socket) {
doRun(key, ka);
}
}
}

private void doRun(SelectionKey key, KeyAttachment ka) {
NioChannel socket = ka.getSocket();

try {
int handshake = -1;

try {
if (key != null) {
// For STOP there is no point trying to handshake as the
// Poller has been stopped.
if (socket.isHandshakeComplete() ||
status == SocketStatus.STOP) {
handshake = 0;
} else {
handshake = socket.handshake(
key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
status = SocketStatus.OPEN_READ;
}
}
} catch (IOException x) {
handshake = -1;
if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x);
} catch (CancelledKeyException ckx) {
handshake = -1;
}
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (status == null) {
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
if (state == SocketState.CLOSED) {
// Close socket and pool
try {
ka.setComet(false);
if (socket.getPoller().cancelledKey(key, SocketStatus.ERROR) != null) {
// SocketWrapper (attachment) was removed from the
// key - recycle both. This can only happen once
// per attempted closure so it is used to determine
// whether or not to return socket and ka to
// their respective caches. We do NOT want to do
// this more than once - see BZ 57340.
if (running && !paused) {
nioChannels.push(socket);
}
socket = null;
}
ka = null;
} catch (Exception x) {
log.error("",x);
}
}
} else if (handshake == -1 ) {
if (key != null) {
socket.getPoller().cancelledKey(key, SocketStatus.DISCONNECT);
}
if (running && !paused) {
nioChannels.push(socket);
}
socket = null;
ka = null;
} else {
ka.getPoller().add(socket,handshake);
}
} catch (CancelledKeyException cx) {
if (socket != null) {
socket.getPoller().cancelledKey(key, null);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
log.error("", oom);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
}
releaseCaches();
} catch (Throwable oomt) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
} catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
} catch (Throwable t) {
log.error("", t);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
}
} finally {
socket = null;
status = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
}我们看到
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (status == null) {
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}

接下来查看Http11ConnectionHandler 看一个巨长的process方法:

public SocketState process(SocketWrapper<S> wrapper,
SocketStatus status) {
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}

S socket = wrapper.getSocket();
if (socket == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}

Processor<S> processor = connections.get(socket);
if (status == SocketStatus.DISCONNECT && processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}

wrapper.setAsync(false);
ContainerThreadMarker.set();

try {
if (processor == null) {
processor = recycledProcessors.pop();
}
if (processor == null) {
processor = createProcessor();
}

initSsl(wrapper, processor);

SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (status == SocketStatus.CLOSE_NOW) {
processor.errorDispatch();
state = SocketState.CLOSED;
} else if (dispatches != null) {
// Associate the processor with the connection as
// these calls may result in a nested call to process()
connections.put(socket, processor);
DispatchType nextDispatch = dispatches.next();
if (processor.isUpgrade()) {
state = processor.upgradeDispatch(
nextDispatch.getSocketStatus());
} else {
state = processor.asyncDispatch(
nextDispatch.getSocketStatus());
}
} else if (status == SocketStatus.DISCONNECT &&
!processor.isComet()) {
// Do nothing here, just wait for it to get recycled
// Don't do this for Comet we need to generate an end
// event (see BZ 54022)
} else if (processor.isAsync()) {
state = processor.asyncDispatch(status);
} else if (state == SocketState.ASYNC_END) {
state = processor.asyncDispatch(status);
if (state == SocketState.OPEN) {
// There may be pipe-lined data to read. If the data
// isn't processed now, execution will exit this
// loop and call release() which will recycle the
// processor (and input buffer) deleting any
// pipe-lined data. To avoid this, process it now.
state = processor.process(wrapper);
}
} else if (processor.isComet()) {
state = processor.event(status);
} else if (processor.isUpgrade()) {
state = processor.upgradeDispatch(status);
} else if (status == SocketStatus.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else {
state = processor.process(wrapper);
}

if (state != SocketState.CLOSED && processor.isAsync()) {
state = processor.asyncPostProcess();
}

if (state == SocketState.UPGRADING) {
// Get the HTTP upgrade handler
HttpUpgradeHandler httpUpgradeHandler =
processor.getHttpUpgradeHandler();
// Retrieve leftover input
ByteBuffer leftoverInput = processor.getLeftoverInput();
// Release the Http11 processor to be re-used
release(wrapper, processor, false, false);
// Create the upgrade processor
processor = createUpgradeProcessor(
wrapper, leftoverInput, httpUpgradeHandler);
// Mark the connection as upgraded
wrapper.setUpgraded(true);
// Associate with the processor with the connection
connections.put(socket, processor);
// Initialise the upgrade handler (which may trigger
// some IO using the new protocol which is why the lines
// above are necessary)
// This cast should be safe. If it fails the error
// handling for the surrounding try/catch will deal with
// it.
httpUpgradeHandler.init((WebConnection) processor);
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + wrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = wrapper.getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
state == SocketState.UPGRADING ||
dispatches != null && state != SocketState.CLOSED);

if (state == SocketState.LONG) {
// In the middle of processing a request/response. Keep the
// socket associated with the processor. Exact requirements
// depend on type of long poll
connections.put(socket, processor);
longPoll(wrapper, processor);
} else if (state == SocketState.OPEN) {
// In keep-alive but between requests. OK to recycle
// processor. Continue to poll for the next request.
connections.remove(socket);
release(wrapper, processor, false, true);
} else if (state == SocketState.SENDFILE) {
// Sendfile in progress. If it fails, the socket will be
// closed. If it works, the socket will be re-added to the
// poller
connections.remove(socket);
release(wrapper, processor, false, false);
} else if (state == SocketState.UPGRADED) {
// Don't add sockets back to the poller if this was a
// non-blocking write otherwise the poller may trigger
// multiple read events which may lead to thread starvation
// in the connector. The write() method will add this socket
// to the poller if necessary.
if (status != SocketStatus.OPEN_WRITE) {
longPoll(wrapper, processor);
}
} else {
// Connection closed. OK to recycle the processor. Upgrade
// processors are not recycled.
connections.remove(socket);
if (processor.isUpgrade()) {
processor.getHttpUpgradeHandler().destroy();
} else {
release(wrapper, processor, true, false);
}
}
return state;
} catch(java.net.SocketException e) {
// SocketExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.socketexception.debug"), e);
} catch (java.io.IOException e) {
// IOExceptions are normal
getLog().debug(sm.getString(
"abstractConnectionHandler.ioexception.debug"), e);
}
// Future developers: if you discover any other
// rare-but-nonfatal exceptions, catch them here, and log as
// above.
catch (Throwable e) {
ExceptionUtils.handleThrowable(e);
// any other exception or error is odd. Here we log it
// with "ERROR" level, so it will show up even on
// less-than-verbose logs.
getLog().error(
sm.getString("abstractConnectionHandler.error"), e);
} finally {
ContainerThreadMarker.clear();
}

// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
// Don't try to add upgrade processors back into the pool
if (processor !=null && !processor.isUpgrade()) {
release(wrapper, processor, true, false);
}
return SocketState.CLOSED;
}

其中调用org.apache.coyote.http11.Http11NioProcessor去处理请求org.apache.coyote.http11.AbstractHttp11Processor#process

public SocketState process(SocketWrapper<S> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);

// Setting up the I/O
setSocketWrapper(socketWrapper);
getInputBuffer().init(socketWrapper, endpoint);
getOutputBuffer().init(socketWrapper, endpoint);

// Flags
keepAlive = true;
comet = false;
openSocket = false;
sendfileInProgress = false;
readComplete = true;
if (endpoint.getUsePolling()) {
keptAlive = false;
} else {
keptAlive = socketWrapper.isKeptAlive();
}

if (disableKeepAlive()) {
socketWrapper.setKeepAliveLeft(0);
}

while (!getErrorState().isError() && keepAlive && !comet && !isAsync() &&
httpUpgradeHandler == null && !endpoint.isPaused()) {

// Parsing the request header
try {
setRequestLineReadTimeout();

if (!getInputBuffer().parseRequestLine(keptAlive)) {
if (handleIncompleteRequestLineRead()) {
break;
}
}

if (endpoint.isPaused()) {
// 503 - Service unavailable
response.setStatus(503);
setErrorState(ErrorState.CLOSE_CLEAN, null);
} else {
keptAlive = true;
// Set this every time in case limit has been changed via JMX
request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount());
// Currently only NIO will ever return false here
if (!getInputBuffer().parseHeaders()) {
// We've read part of the request, don't recycle it
// instead associate it with the socket
openSocket = true;
readComplete = false;
break;
}
if (!disableUploadTimeout) {
setSocketTimeout(connectionUploadTimeout);
}
}
} catch (IOException e) {
if (getLog().isDebugEnabled()) {
getLog().debug(
sm.getString("http11processor.header.parse"), e);
}
setErrorState(ErrorState.CLOSE_NOW, e);
break;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
UserDataHelper.Mode logMode = userDataHelper.getNextMode();
if (logMode != null) {
String message = sm.getString(
"http11processor.header.parse");
switch (logMode) {
case INFO_THEN_DEBUG:
message += sm.getString(
"http11processor.fallToDebug");
//$FALL-THROUGH$
case INFO:
getLog().info(message);
break;
case DEBUG:
getLog().debug(message);
}
}
// 400 - Bad Request
response.setStatus(400);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}

if (!getErrorState().isError()) {
// Setting up filters, and parse some request headers
rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE);
try {
prepareRequest();
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"http11processor.request.prepare"), t);
}
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}

if (maxKeepAliveRequests == 1) {
keepAlive = false;
} else if (maxKeepAliveRequests > 0 &&
socketWrapper.decrementKeepAlive() <= 0) {
keepAlive = false;
}

// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
// Handle when the response was committed before a serious
// error occurred. Throwing a ServletException should both
// set the status to 500 and set the errorException.
// If we fail here, then the response is likely already
// committed, so we can't try and set headers.
if(keepAlive && !getErrorState().isError() && (
response.getErrorException() != null ||
(!isAsync() &&
statusDropsConnection(response.getStatus())))) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
setCometTimeouts(socketWrapper);
} catch (InterruptedIOException e) {
setErrorState(ErrorState.CLOSE_NOW, e);
} catch (HeadersTooLargeException e) {
// The response should not have been committed but check it
// anyway to be safe
if (response.isCommitted()) {
setErrorState(ErrorState.CLOSE_NOW, e);
} else {
response.reset();
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, e);
response.setHeader("Connection", "close"); // TODO: Remove
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
getLog().error(sm.getString(
"http11processor.request.process"), t);
// 500 - Internal Server Error
response.setStatus(500);
setErrorState(ErrorState.CLOSE_CLEAN, t);
getAdapter().log(request, response, 0);
}
}

// Finish the handling of the request
rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT);

if (!isAsync() && !comet) {
if (getErrorState().isError()) {
// If we know we are closing the connection, don't drain
// input. This way uploading a 100GB file doesn't tie up the
// thread if the servlet has rejected it.
getInputBuffer().setSwallowInput(false);
} else {
// Need to check this again here in case the response was
// committed before the error that requires the connection
// to be closed occurred.
checkExpectationAndResponseStatus();
}
endRequest();
}

rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT);

// If there was an error, make sure the request is counted as
// and error, and update the statistics counter
if (getErrorState().isError()) {
response.setStatus(500);
}
request.updateCounters();

if (!isAsync() && !comet || getErrorState().isError()) {
if (getErrorState().isIoAllowed()) {
getInputBuffer().nextRequest();
getOutputBuffer().nextRequest();
}
}

if (!disableUploadTimeout) {
if(endpoint.getSoTimeout() > 0) {
setSocketTimeout(endpoint.getSoTimeout());
} else {
setSocketTimeout(0);
}
}

rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE);

if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}

rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);

if (getErrorState().isError() || endpoint.isPaused()) {
return SocketState.CLOSED;
} else if (isAsync() || comet) {
return SocketState.LONG;
} else if (isUpgrade()) {
return SocketState.UPGRADING;
} else {
if (sendfileInProgress) {
return SocketState.SENDFILE;
} else {
if (openSocket) {
if (readComplete) {
return SocketState.OPEN;
} else {
return SocketState.LONG;
}
} else {
return SocketState.CLOSED;
}
}
}
}

通过这些处理生成了对应的org.apache.coyote.Request和org.apache.coyote.Response对象。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: