您的位置:首页 > 运维架构 > Tomcat

[Tomcat6.0源码]请求的处理一Socket

2012-09-25 15:45 363 查看
《组件的启动》中JIoEndpoint的start会启动Acceptor.run来监听请求(Acceptor是JIoEndpoint的内部类):

Socket socket = serverSocketFactory.acceptSocket(serverSocket);
serverSocketFactory.initSocket(socket);
// Hand this socket off to an appropriate processor
if (!processSocket(socket)) {
// Close socket right away
try {
socket.close();
} catch (IOException e) {
// Ignore
}
}

这个serverSocketFactory,serverSocket都来自JIoendpoint.init()中:

serverSocketFactory = ServerSocketFactory.getDefault();
serverSocket = serverSocketFactory.createSocket(port, backlog, address);

getDefault返回的是DefaultServerSocketFactory实例,DefaultServerSocketFactory.acceptSocket():

public Socket acceptSocket(ServerSocket socket)
throws IOException {
return socket.accept();
}

这里只是对ServerSocket的accpet()方法做一个简单的封装,Acceptor线程处于阻塞状态,当serverSocket监听到请求时,才会执行以后的代码。

serverSocketFactory.initSocket(socket);这个initSocket真的是没干什么,连一行代码都没有。

访问http://localhost:8080 接收到请求后JIoEndpoint.processSocket(socket)处理socket:

protected boolean processSocket(Socket socket) {
try {
if (executor == null) {
getWorkerThread().assign(socket);
} else {
executor.execute(new SocketProcessor(socket));
}
} catch (Throwable t) {
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

线程池没配置executor为null,所以会执行getWorkerThread().assign(socket);

JIoEndpoint.getWorkerThread():

protected Worker getWorkerThread() {
// Allocate a new worker thread
synchronized (workers) {
Worker workerThread;
while ((workerThread = createWorkerThread()) == null) {
try {
workers.wait();
} catch (InterruptedException e) {
// Ignore
}
}
return workerThread;
}
}

JIoEndpoint.createWorkerThread():

protected Worker createWorkerThread() {

synchronized (workers) {
if (workers.size() > 0) {
curThreadsBusy++;
return workers.pop();
}
if ((maxThreads > 0) && (curThreads < maxThreads)) {
curThreadsBusy++;
if (curThreadsBusy == maxThreads) {
log.info(sm.getString("endpoint.info.maxThreads",
Integer.toString(maxThreads), address,
Integer.toString(port)));
}
return (newWorkerThread());
} else {
if (maxThreads < 0) {
curThreadsBusy++;
return (newWorkerThread());
} else {
return (null);
}
}
}

}

如果没有多余的空闲worker,且worker数量没达到最大,就new一个,JIoEndpoint.newWorkerThread():

protected Worker newWorkerThread() {

Worker workerThread = new Worker();
workerThread.start();
return (workerThread);

}

Worker.run():

public void run() {

// Process requests until we receive a shutdown signal
while (running) {

// Wait for the next socket to be assigned
Socket socket = await();
if (socket == null)
continue;

// Process the request from this socket
if (!setSocketOptions(socket) || !handler.process(socket)) {
// Close socket
try {
socket.close();
} catch (IOException e) {
}
}

// Finish up this request
socket = null;
recycleWorkerThread(this);

}

}

Worker.await():

private synchronized Socket await() {

// Wait for the Connector to provide a new Socket
while (!available) {
try {
wait();
} catch (InterruptedException e) {
}
}

// Notify the Connector that we have received this Socket
Socket socket = this.socket;
available = false;
notifyAll();

return (socket);

}

available初始值为false,至此,getWorkerThread()方法已经执行完,线程处于等待状态,连while循环的停了。回到getWorkerThread().assign(socket):

synchronized void assign(Socket socket) {

// Wait for the Processor to get the previous Socket
while (available) {
try {
wait();
} catch (InterruptedException e) {
}
}

// Store the newly available Socket and notify our thread
this.socket = socket;
available = true;
notifyAll();

}

available改成true;notifyAll()唤醒等待的线程,Worker.await()返回socket。Worker.run()接收到这个返回的socket,就会往下执行。

捋一捋:

1.JIoEndpoint创建一个Acceptor线程来监听请求。

2.Acceptor监听到请求后,就到Worker池中取一个对象来(Worker用栈来维护)。Worker是多线程的,创建的时候处理socket的run方法就被等待。

3.给Worker对象分配socket,唤醒run()方法做进一步处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: