您的位置:首页 > 运维架构 > Tomcat

tomcat的connector与container原理分析(一)

2016-05-20 10:11 471 查看
最近在看how tomcat works,里面提到了tomcat的两大核心组件,connector和container,下面就来分析一下二者的运行机制。

对于实现了connector接口的httpconnector,它的start方法如下:

public void start() throws LifecycleException {
if (started)
throw new LifecycleException
(sm.getString("httpConnector.alreadyStarted"));
threadName = "HttpConnector[" + port + "]";
lifecycle.fireLifecycleEvent(START_EVENT, null);
started = true;
threadStart();
while (curProcessors < minProcessors) {
if ((maxProcessors > 0) && (curProcessors >= maxProcessors))
break;
HttpProcessor processor = newProcessor();
recycle(processor);
}
}

httpconnector同时也实现了lifecycle接口,这个后面再讲,start方法中把start设置为true,并且启动threadstart(),后面的是用来生成processor的栈,recycle方法其实就是stack.push,threadstart()方法如下:

private void threadStart() {

log(sm.getString("httpConnector.starting"));
thread = new Thread(this, threadName);
thread.setDaemon(true);
thread.start();
}
也就是说,从threadstart()方法创建了新线程并且启动了,run方法如下:

public void run() {
// Loop until we receive a shutdown command
while (!stopped) {
// Accept the next incoming connection from the server socket
Socket socket = null;
try {
//                if (debug >= 3)
//                    log("run: Waiting on serverSocket.accept()");
socket = serverSocket.accept();
//                if (debug >= 3)
//                    log("run: Returned from serverSocket.accept()");
if (connectionTimeout > 0)
socket.setSoTimeout(connectionTimeout);
socket.setTcpNoDelay(tcpNoDelay);
} catch (AccessControlException ace) {
log("socket accept security exception", ace);
continue;
} catch (IOException e) {
...
}

// Hand this socket off to an appropriate processor
<strong>  HttpProcessor processor = createProcessor();</strong>
if (processor == null) {
try {
log(sm.getString("httpConnector.noProcessor"));
socket.close();
} catch (IOException e) {
;
}
continue;
}
//            if (debug >= 3)
//                log("run: Assigning socket to processor " + processor);
<strong> processor.assign(socket);</strong>

// The processor will recycle itself when it finishes

}

// Notify the threadStop() method that we have shut ourselves down
//        if (debug >= 3)
//            log("run: Notifying threadStop() that we have shut down");
synchronized (threadSync) {
threadSync.notifyAll();
}
}
run方法里会产生一个processor,并且调用assign方法去处理socket,请注意,这里的createProcessor()方法如下:

private HttpProcessor createProcessor() {

synchronized (processors) {
if (processors.size() > 0) {
// if (debug >= 2)
// log("createProcessor: Reusing existing processor");
return ((HttpProcessor) processors.pop());
}
if ((maxProcessors > 0) && (curProcessors < maxProcessors)) {
// if (debug >= 2)
// log("createProcessor: Creating new processor");
return (newProcessor());
} else {
if (maxProcessors < 0) {
// if (debug >= 2)
// log("createProcessor: Creating new processor");
return (newProcessor());
} else {
// if (debug >= 2)
// log("createProcessor: Cannot create new processor");
return (null);
}
}
}
}
这里用 synchronized 锁住了processors,使得同一时刻只有一个线程可以访问这个栈,实际上,这里主要用的是processors.pop()从栈中弹出一个processor,而如果栈中不含任何processor,那么将会调用newProcessor()产生一个新的processor,并且启动它的start方法,实例化processor时会传递request,response,定义它的专属于该connector的id等等。

下面我们看processor.assign(socket):

synchronized void assign(Socket socket) {

// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}

// Store the newly available Socket and notify our thread
this.socket = socket;
available = true;
notifyAll();

if ((debug >= 1) && (socket != null))
log(" An incoming request is being assigned");

}
这里用的是线程的wait,notify机制,对于每一个processor,使用synchronized锁住当前对象,然后进入wait()方法,等待别人将它唤醒,而它自己一旦被唤醒后,又调用notifyall方法,释放对processor的锁。而事实上在processor新产生时,available被定义为false,并不会进入循环,直接将socket传递到processor上。

下面我们看processor的run方法:

public void run() {

// Process requests until we receive a shutdown signal
while (!stopped) {

// Wait for the next socket to be assigned
Socket socket = await();
if (socket == null)
continue;

// Process the request from this socket
try {
process(socket);
} catch (Throwable t) {
log("process.invoke", t);
}

// Finish up this request
connector.recycle(this);

}

// Tell threadStop() we have shut ourselves down successfully
synchronized (threadSync) {
threadSync.notifyAll();
}

}

processor对象在被创建时会启动start方法,之后会在某个时刻启动run方法开启线程,stop初始被设置为false,进入循环后,调用await方法,await方法将会阻塞,直到上述的assign方法将其唤醒,之后返回一个socket:

private synchronized Socket await() {

// Wait for the Connector to provide a new Socket
while (!available) {
try {
wait();
} catch (InterruptedException e) {
}
}

// Notify the Connector that we have received this Socket
Socket socket = this.socket;
available = false;
notifyAll();

if ((debug >= 1) && (socket != null))
log("  The incoming request has been awaited");

return (socket);
}
回到run方法中,await返回一个socket后,调用process方法处理,处理完成之后利用recycle将此processor重新push入栈。

事实上,这里面我有两点困惑,一是await的notifyall方法,书上说是为了防止在available为true的时候,有一个socket进入,这样就会在assign的方法中进入wait()方法阻塞,然而我并没有想到什么样的情况下会出现这个场景,因为available初始为false,而run方法调用await之后又会把它变成false,而这个processor的assign和await又是同步方法,也不会出现多个线程之间调用出现的混乱;第二点是await方法新建socket并返回,为何不直接返回this.socket,这里书上的解释是当前socket被完全处理之前,实例的socket变量可以赋给下一个前来的socket,但是问题是当前的socket变量处理完之前,当前的processor也不会被压入栈,这样新来的socket也不会被这个processor处理。


关于connector和processor就到此为止了,基本的原理就是connector会拥有一个自己的线程,然后每进来一个socket,就会从processor的线程池中调用一个,进行处理。下面我们看processor的方法。

private void process(Socket socket) {
boolean ok = true;
boolean finishResponse = true;
SocketInputStream input = null;
OutputStream output = null;

// Construct and initialize the objects we will need
try {
input = new SocketInputStream(socket.getInputStream(),
connector.getBufferSize());
} catch (Exception e) {
log("process.create", e);
ok = false;
}

keepAlive = true;

while (!stopped && ok && keepAlive) {

finishResponse = true;

try {
request.setStream(input);
request.setResponse(response);
output = socket.getOutputStream();
response.setStream(output);
response.setRequest(request);
((HttpServletResponse) response.getResponse()).setHeader
("Server", SERVER_INFO);
} catch (Exception e) {
log("process.create", e);
ok = false;
}

// Parse the incoming request
try {
if (ok) {

parseConnection(socket);
parseRequest(input, output);
if (!request.getRequest().getProtocol()
.startsWith("HTTP/0"))
parseHeaders(input);
if (http11) {
// Sending a request acknowledge back to the client if
// requested.
ackRequest(output);
// If the protocol is HTTP/1.1, chunking is allowed.
if (connector.isChunkingAllowed())
response.setAllowChunking(true);
}

}
} catch (EOFException e) {
...
}

// Ask our Container to process this request
try {
((HttpServletResponse) response).setHeader
("Date", FastHttpDateFormat.getCurrentDate());
if (ok) {
<strong>connector.getContainer().invoke(request, response);</strong>
}
} catch (ServletException e) {
...
}

// Finish up the handling of the request
if (finishResponse) {
...
}

// We have to check if the connection closure has been requested
// by the application or the response stream (in case of HTTP/1.0
// and keep-alive).
if ( "close".equals(response.getHeader("Connection")) ) {
keepAlive = false;
}

// End of request processing
status = Constants.PROCESSOR_IDLE;

// Recycling the request and the response objects
request.recycle();
response.recycle();

}

try {
shutdownInput(input);
socket.close();
} catch (IOException e) {
;
} catch (Throwable e) {
log("process.invoke", e);
}
socket = null;
}


解析出request和response之后,就是利用container调用invoke方法进行处理。下一篇再说
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: