您的位置:首页 > 运维架构 > Tomcat

Tomcat 8的Connector部分

2016-09-27 00:40 357 查看
在tomcat中,connector负责接收来自客户端的连接,并交由后续的代码进行处理。connector对象持有ProtocolHandler对象;ProtocolHandler对象持有AbstractEndpoint对象。AbstractEndpoint负责创建服务器套接字,并绑定到监听端口;同时还创建accepter线程来接收客户端的连接以及poller线程来处理连接中的读写请求。其结构如图1所示。



图1 Connector

Connector的入口在其构造函数。

public Connector() {
this(null);
}

public Connector(String protocol) {
// 设置协议
setProtocol(protocol);
ProtocolHandler p = null;
try {
// 反射生成ProtocolHandler实例
Class<?> clazz = Class.forName(protocolHandlerClassName);
p = (ProtocolHandler) clazz.newInstance();
} catch (Exception e) {
log.error(sm.getString(
"coyoteConnector.protocolHandlerInstantiationFailed"), e);
} finally {
this.protocolHandler = p;
}

if (!Globals.STRICT_SERVLET_COMPLIANCE) {
URIEncoding = "UTF-8";
URIEncodingLower = URIEncoding.toLowerCase(Locale.ENGLISH);
}
}

public void setProtocol(String protocol) {

if (AprLifecycleListener.isAprAvailable()) {
if ("HTTP/1.1".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11AprProtocol");
} else if ("AJP/1.3".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.ajp.AjpAprProtocol");
} else if (protocol != null) {
setProtocolHandlerClassName(protocol);
} else {
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11AprProtocol");
}
} else {
if ("HTTP/1.1".equals(protocol)) {
// tomcat8默认配置
setProtocolHandlerClassName
("org.apache.coyote.http11.Http11NioProtocol");
} else if ("AJP/1.3".equals(protocol)) {
setProtocolHandlerClassName
("org.apache.coyote.ajp.AjpNioProtocol");
} else if (protocol != null) {
setProtocolHandlerClassName(protocol);
}
}

}


Connector的构造函数带有协议属性,该协议属性是server.xml中Connector标签的protocol的属性值。Tomcat 8中默认值为HTTP/1.1,因此在Connector的构造函数中生成的是Http11NioProtocol对象。在setProtocol()方法中可以看到,tomcat8还包括其他几个协议处理器。协议处理器中带有Apr命名的都是使用Apr库来处理http请求的。通过使用APR库,Tomcat将使用JNI的方式来读取文件以及进行网络传输,可以大大提升Tomcat对静态文件的处理性能,同时如果你使用了HTTPS方式传输的话,也可以提升SSL的处理性能。AJP/1.3协议是Http服务器和应用服务器之间数据交互的协议,比如Apache服务器或IIS服务器与tomcat服务器之间进行数据交互。

Http11NioProtocol是非阻塞模式的Http1.1协议处理器,使用java的nio包来实现非阻塞。可以看到,在tomcat 8中,默认使用的是非阻塞IO。

在创建Http11NioProtocol实例的时候,会创建NioEndpoint、Http11ConnectionHandler实例。

public Http11NioProtocol() {
// 创建nioEndPoint
endpoint=new NioEndpoint();
// 创建Http11ConnectionHandler
cHandler = new Http11ConnectionHandler(this);
((NioEndpoint) endpoint).setHandler(cHandler);
// 是指socket被关闭时逗留的时间,值为-1。
// 在这段时间内,socket会尽量把未送出去的数据给发出去。
setSoLinger(Constants.DEFAULT_CONNECTION_LINGER);
// 设置读取数据超时
setSoTimeout(Constants.DEFAULT_CONNECTION_TIMEOUT);
// 设置tcp_nodelay,
setTcpNoDelay(Constants.DEFAULT_TCP_NO_DELAY);
}


NioEndpoint是Connector中处理客户端连接的核心类,负责创建服务器套接字,并绑定到监听端口;同时还创建accepter线程来接收客户端的连接以及poller线程来处理连接中的读写请求。

Connector方法实现Lifecycle接口,当Connector的拥有者调用init()及start()方法的时候,会分别执行Connector的initInternal()以及startInternal()方法。Connector的initInternal()方法会最终调用endpoint的init()方法;startInternal()方法则调用endpoint的start()方法。

endpoint的init()方法在其父类AbstractEndpoint里实现,并最终调用延迟到NioEndpoint中实现的bind()方法。

public final void init() throws Exception {
// 校验setUseCipherSuitesOrder方法是否存在
testServerCipherSuitesOrderSupport();
if (bindOnInit) {

bind();
// 设置状态
bindState = BindState.BOUND_ON_INIT;
}
}

/**
* Initialize the endpoint.
*/
@Override
public void bind() throws Exception {
// 打开serverSocketChannel
serverSock = ServerSocketChannel.open();
// 设置socket属性
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort()));
// 绑定监听端口
serverSock.socket().bind(addr,getBacklog());
// 设为阻塞模式
serverSock.configureBlocking(true); //mimic APR behavior
// 设置超时
serverSock.socket().setSoTimeout(getSocketProperties().getSoTimeout());

// Initialize thread count defaults for acceptor, poller
if (acceptorThreadCount == 0) {
// FIXME: Doesn't seem to work that well with multiple accept threads
acceptorThreadCount = 1;
}
if (pollerThreadCount <= 0) {
//minimum one poller thread
pollerThreadCount = 1;
}
stopLatch = new CountDownLatch(pollerThreadCount);

// Initialize SSL if needed
if (isSSLEnabled()) {
SSLUtil sslUtil = handler.getSslImplementation().getSSLUtil(this);

sslContext = sslUtil.createSSLContext();
sslContext.init(wrap(sslUtil.getKeyManagers()),
sslUtil.getTrustManagers(), null);

SSLSessionContext sessionContext =
sslContext.getServerSessionContext();
if (sessionContext != null) {
sslUtil.configureSessionContext(sessionContext);
}
// Determine which cipher suites and protocols to enable
enabledCiphers = sslUtil.getEnableableCiphers(sslContext);
enabledProtocols = sslUtil.getEnableableProtocols(sslContext);
}

if (oomParachute>0) reclaimParachute(true);
// 打开阻塞模式的selector
selectorPool.open();
}


在bind()方法中,首先打开serverSocketChannel,并绑定到监听端口,此处将其该channel设置为阻塞模式。对于SSL部分,此处略过不讲。在最后的 selectorPool.open()执行语句中,会先获得共享的selector,并且创建线程在该selector上检测事件。这里有一点疑问地方,就是这里创建的selector有什么用?

经过上述过程,endpoint初始化完成,并且开启serverSocketChannel并监听端口。接着看下endpoint的start()方法。和init()方法一样,endpoint的start()方法是在AbstractEndPoint中实现的,并调用推迟到NioEndPoint中的startInternal()方法。

public final void start() throws Exception {
if (bindState == BindState.UNBOUND) {
bind();
bindState = BindState.BOUND_ON_START;
}
startInternal();
}

/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {

if (!running) {
running = true;
paused = false;

// 创建缓存容器
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());

// 创建线程池
if ( getExecutor() == null ) {
createExecutor();
}

// 初始化计数器Latch
initializeConnectionLatch();

// 创建Poller线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}

// 创建Acceptor线程
startAcceptorThreads();
}
}


在startInternal()方法中,最重要的是创建Poller和Acceptor线程。Acceptor线程处理serverSocketChannel的请求接收事件;Poller处理serverSocketChannel的读写事件。此时可以预想到,Acceptor线程专门负责接收客户端连接socketChannel,然后将socketChannel交给Poller线程读写。在实际中,Poller线程将socketChannel再次封装之后又开启另一个线程进行实际的数据处理。这样设计的目的是避免当某一个请求出现阻塞的时候,影响到整个服务器的接收、处理能力。

按接收请求,处理请求的逻辑,我们先观察Acceptor线程。

protected class Acceptor extends AbstractEndpoint.Acceptor {

@Override
public void run() {
int errorDelay = 0;
// 一直循环直到接收停止命令
while (running) {
// 暂停命令
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}

if (!running) {
break;
}
state = AcceptorState.RUNNING;

try {
//通过同步计数器来限制连接数目
//当连接数目超过上限时,则等待
//其中同步计算器是通过继承AQS实现的
//默认的最大连接数是10000
countUpOrAwaitConnection();

SocketChannel socket = null;
try {
//接收连接,此处并不是使用selector实现
//在前面的代码中已知serverSock是阻塞模式的。
socket = serverSock.accept();
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection();
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;

// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
// 在setSocketOptions中将接收到的socket传给poller线程进行处理
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}


Acceptor线程会反复执行serverSock.accept()等待客户端连接的到来,等接收到一个客户端连接时,会把接收到的socket传给后续的poller线程处理,其执行过程在setSocketOptions()方法中。

/**
* Process the specified connection.
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
// 设置为非阻塞模式
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);

// 从NioChannel容器中获得一个NioChannel
// NioChannel可以理解为socketChannel的代理类,提供更多的功能
NioChannel channel = nioChannels.pop();
if ( channel == null ) {
// SSL相关
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appbufsize = engine.getSession().getApplicationBufferSize();

NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
socketProperties.getDirectBuffer());
channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
// NioBufferHandler维护了在处理过程中的读写缓存
NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
// 将socket、bufHandler封装到NioChannel中
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
if ( channel instanceof SecureNioChannel ) {
SSLEngine engine = createSSLEngine();
((SecureNioChannel)channel).reset(engine);
} else {
channel.reset();
}
}
// 将niochannel注册到poller线程中进行处理
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(t);
}
// Tell to close the socket
return false;
}
return true;
}


在setSocketOptions()方法中,将socket、niobufferhandler对象封装成niochannel对象,并调用poller线程的register()的方法注册到poller中,等待poller线程进行处理。

查看Poller线程的outline:



图2 Poller的outline

Poller线程中的主要方法如下:

Poller() : 在构造函数中创建打开selector。

destory() : 将close标志位置为true,表示要关闭poller。

addEvent(PollerEvent):内部方法调用,添加PollerEvent事件到队列中,并唤醒阻塞等待的selector。

add(NioChannel)、add(NioChannel, int):创建PollerEvent事件,并调用addEvent()方法将事件添加到队列中。

events():执行队列中的PollerEvent事件,注册读或写

register(NioChannel):外部方法调用,将NioChannel添加到队列中。

cancelledKey(SelectionKey, SocketStatus):取消注册到selector中的socket

run():线程执行方法体,负责响应socket读写事件

processKey():内部方法调用,处理接收到的读写事件和队列中的事件

Poller执行从run()开始。

@Override
public void run() {
// 循环直到destroy()方法被调用
while (true) {
try {
// 暂停处理
while (paused && (!close) ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
}

boolean hasEvents = false;
// 关闭
if (close) {
// 执行队列中的PollerEvent事件,注册读或写
events();
//
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
} else {
// 执行队列中的PollerEvent事件,注册读或写,
// hasEvents表示是否有读写事件注册
hasEvents = events();
}
try {
if ( !close ) {
// wakeupCounter > 0,表示有事件,故直接用selectNow,否则用select(selectorTimeout)以阻塞一段时间等待事件到来
if (wakeupCounter.getAndSet(-1) > 0) {
//if we are here, means we have other stuff to do
//do a non blocking select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
// 此处在判断一次close,及时响应destroy()方法
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());

Iterator<SelectionKey> iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
KeyAttachment attachment = (KeyAttachment)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
// 记录时间
attachment.access();
iterator.remove();
// 将sk和attachtment包装,交由后续线程继续处理
processKey(sk, attachment);
}
}//while

//process timeouts
timeout(keyCount,hasEvents);
if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}//while

stopLatch.countDown();
}


在上述流程中,核心流程是循环注册队列中的事件,监听selector,并将监听到的SelectionKey事件包装,交给线程池处理;还有其他一些附属流程,比如当Poller关闭时的处理以及超时处理。其处理流程如图3所示。



图3 Poller处理流程

1、events()方法处理事件注册

public boolean events() {
boolean result = false;

PollerEvent pe = null;
// 从队列中获得PollerEvent事件
while ( (pe = events.poll()) != null ) {
result = true;
try {
// 调用PollerEvent的run()方法执行事件注册
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}

return result;
}

public void run() {
// 如果是注册,则把socket注册到selector中
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
boolean cancel = false;
if (key != null) {
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
att.access();//to prevent timeout
// interestOps相或,追加事件
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
} else {
cancel = true;
}
} else {
cancel = true;
}
if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR);
}catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT);
}catch (Exception ignore) {}
}
}//end if
}//run


在events()方法中,通过调用PollerEvent的run()方法将socket注册到selector中。

2、processKey()

processKey()方法处理准备完毕的事件。

protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;
try {
// 如果close,则取消sk
if ( close ) {
cancelledKey(sk, SocketStatus.STOP);
} else if ( sk.isValid() && attachment != null ) {
// 对于有效的socket,保证不会超时
attachment.access();//make sure we don't time out valid sockets
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
// 处理文件
processSendfile(sk,attachment, false);
} else {
if ( isWorkerAvailable() ) {
//
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 处理可读
if (!processSocket(attachment, SocketStatus.OPEN_READ, true)) {
closeSocket = true;
}
}
// 处理可写
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketStatus.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk,SocketStatus.DISCONNECT);
}
} else {
result = false;
}
}
}
} else {
//invalid key
cancelledKey(sk, SocketStatus.ERROR);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk, SocketStatus.ERROR);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
return result;
}


在processKey()方法中,主要是处理可读、可写事件。查看processSocket(attachment, SocketStatus.OPEN_READ, true)。

protected boolean processSocket(KeyAttachment attachment, SocketStatus status, boolean dispatch) {
try {
if (attachment == null) {
return false;
}
// 将selectionKey包装为SocketProcessor
SocketProcessor sc = processorCache.pop();
if ( sc == null ) sc = new SocketProcessor(attachment, status);
else sc.reset(attachment, status);
Executor executor = getExecutor();
if (dispatch && executor != null) {
// 交给线程池处理或直接运行
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
log.warn(sm.getString("endpoint.executor.fail", attachment.getSocket()), ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}


在processSocket()方法中,将读写的selectionKey包装为SocketProcessor,并交给线程池处理。

至此,Connector中处理请求的流程已经结束。至于如何处理请求中的数据,将在后续代码中继续执行。

总结:

在Tomcat 8中,Connector负责处理客户端的连接请求,其核心实现在EndPoint类中。EndPoint中定义了Acceptor来接收客户端的socket,并将socket交给Poller线程进行读写处理。Poller线程监听到读写事件后,继续将selectionKey封装成Processor交给后续代码处理。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  tomcat