您的位置:首页 > 理论基础 > 计算机网络

Apache HttpAsyncClient源码解读 (如何实现异步IO)

2017-02-22 00:00 1541 查看
摘要: 前面两篇博客介绍了Apache Component的基础,这篇就详细介绍HttpAsyncClient(基于HttpCore), 其中也有关于HttpSever的内容但是大多数都跳过了。

###Preface
确保你已经对Java NIO 和Reactor 模型有了解。如果没有请查看前面两篇博客Apache HttpClient BasicApache HttpCore Basic 。因为HttpAsyncClient 其实就是 Java NIO + Reactor Module.

###IO Reactor



IOReactor interface represents an abstract object implementing the Reactor pattern.
然后它的主要方法,

/**
* Starts the reactor and initiates the dispatch of I/O event notifications
* to the given {@link IOEventDispatch}.  分配IO事件
*
* @param eventDispatch the I/O event dispatch.
* @throws IOException in case of an I/O error.
*/
void execute(IOEventDispatch eventDispatch) throws IOException;

IOEventDispatch

All methods of the IOEventDispatch are executed on a dispatch thread of the I/O reactor. IOEventDispatch 定义了几种IO事件类型,

connected: Triggered when a new session has been created.

inputReady: Triggered when the session has pending input.

outputReady: Triggered when the session is ready for output.

timeout: Triggered when the session has timed out.

disconnected: Triggered when the session has been terminated.

可以看出,这些事件都是由IOSession 触发的。

IOSession

The IOSession interface represents a sequence of logically related data exchanges between two end points. IOSession encapsulates functionality of NIO java.nio.channels.SelectionKey and java.nio.channels.SocketChannel. The channel associated with the IOSession can be used to read data from and write data to the session.

之前我们知道Java NIO 是通过将Channel 注册到Selector上,然后监听SelectionKey来查看是否有事件发生,然后对不同的事件做不同的处理(比如从Channel中读取数据),IOSession 就是这个过程的封装。这样就可以直接在IOSession上读取和写入数据了。

I/O sessions are not bound to an execution thread, therefore one cannot use the context of the thread to store a session's state. All details about a particular session must be stored within the session itself.

ListeningIOReactor

ListeningIOReactor represents an I/O reactor capable of listening for incoming connections on one or several ports. 相当于服务端的I/O Reactor

ConnectingIOReactor

ConnectingIOReactor represents an I/O reactor capable of establishing connections with remote hosts.
相当于客户端的IO Reactor

/**
* Requests a connection to a remote host.
* <p>
* Opening a connection to a remote host usually tends to be a time
* consuming process and may take a while to complete. One can monitor and
* control the process of session initialization by means of the
* {@link SessionRequest} interface.
*
* @param remoteAddress the socket address of the remote host.
* @param localAddress the local socket address. Can be {@code null},
*    in which can the default local address and a random port will be used.
* @param attachment the attachment object. Can be {@code null}.
* @param callback interface. Can be {@code null}.
* @return session request object.
*/
SessionRequest connect(
SocketAddress remoteAddress,
SocketAddress localAddress,
Object attachment,
SessionRequestCallback callback);

Opening a connection to a remote host usually tends to be a time consuming process and may take a while to complete. One can monitor and control the process of session initialization by means of the SessionRequest interface.
因为建立连接通常会花费比较长的时间,所以可以通过SessionRequest interface来控制这个过程。

/**
* Returns {@link IOSession} instance created as a result of this request
* or {@code null} if the request is still pending.
*
* @return I/O session or {@code null} if the request is still pending.
*/
IOSession getSession();

再经过一些超时或者异常处理就可以获得我们的IOSession了。

AbstractMultiworkerIOReactor

我们也注意到了,Listening 和 Connection 都继承了 AbstractMultiworkerIOReactor,这个抽象类的功能是什么呢,

Generic implementation of IOReactor that can run multiple BaseIOReactor instance in separate worker threads and distribute newly created I/O session equally across those I/O reactors for a more optimal resource utilization and a better I/O performance. Usually it is recommended to have one worker I/O reactor per physical CPU core. 就是在多核计算机能够实现多个 IOReactor 工作线程,就是分发事件的主线程,建议一个内核一个线程。

IOReactorConfig

理解了这些之后,那就是我们实际项目中最重要的一个,就是配置 IOReactor了,通过这个配置即可,然而实际的配置会直接反映在httpClientBuilder 或者asyncHttpClientBuilder中,但是理解之后配置起来就不难了。

###NHttpConnection

Effectively non-blocking HTTP connections are wrappers around IOSession with HTTP specific functionality. Non-blocking HTTP connections are stateful and not thread-safe. Input / output operations on non-blocking HTTP connections should be restricted to the dispatch events triggered by the I/O event dispatch thread.
也就是说 NHttpConnection 是IOSession的封装用于读取HttpRequest and HttpResponse.



###NHttpClientConnection

Abstract non-blocking client-side HTTP connection interface. It can be used to submit HTTP requests and asynchronously receive HTTP responses.
那我们来看一看它是怎么asynchronously receive HTTP responses的?

but at same time implies that this is the job of the protocol handler to match logically related request and the response messages.
这句话在说NHttpClientConnection既可以传输request数据,也可以传输response数据,所以request和response之间的联系是the protocol handler来处理的。

那the protocol handler 又是怎么来处理这个联系的呢?

IOControl

Connection input/output control interface. It can be used to control interest in I/O event notifications for non-blocking HTTP connections.
Implementations of this interface are expected to be threading safe. Therefore it can be used to request / suspend I/O event notifications from any thread of execution.

因为HttpConnection 是线程不安全的,所以它应该被event触发修改,而不应该,所有线程一起去修改它

I/O operations with the underlying channel of the session are not a problem as they are guaranteed to be non-blocking.

NHttpClientEventHandler

convert generic I/O events triggered by an I/O reactor to HTTP protocol specific events

outputReady:  Triggered when the underlying channel is ready for writing a next portion of the request entity through the corresponding content encoder. If the content producer is unable to generate the outgoing content, output event notifications can be temporarily suspended using IOControl interface (super interface of NHttpClientConnection). Please note that the NHttpClientConnection and ContentEncoder objects are not thread-safe and should only be used within the context of this method call. The IOControl object can be shared and used on other thread to resume output event notifications when more content is made available.

这是其中的一个事件,也讲了IOControl Interface 的用处,用于HttpConnection事件和最终channel(IOReactor事件)交互的中间处理,而这个过程通常就是 content produce。

####Asynchronous HTTP request executor

这一节就是回答上面提出的问题 关于 Non-blocking HTTP protocol handlers . handler 是怎么处理request,和async response的呢

#####HttpAsyncRequestExecutor

HttpAsyncRequestExecutor is a fully asynchronous client side HTTP protocol handler based on the NIO (non-blocking) I/O model. HttpAsyncRequestExecutor translates individual events fired through the NHttpClientEventHandler interface into logically related HTTP message exchanges.

下面这段代码就是一个NHttp request 的执行过程。

HttpProcessor httpproc = HttpProcessorBuilder.create()
.add(new RequestContent())
.add(new RequestTargetHost())
.add(new RequestConnControl())
.add(new RequestUserAgent("MyAgent-HTTP/1.1"))
.add(new RequestExpectContinue(true))
.build();
// supports single HTTP request / response exchanges only
HttpAsyncRequester requester = new HttpAsyncRequester(httpproc);
NHttpClientConnection conn = <...>
// Initiates asynchronous HTTP request execution.
Future<HttpResponse> future = requester.execute(
// callback
new BasicAsyncRequestProducer(new HttpHost("localhost"), new BasicHttpRequest("GET","/")),
// callback
new BasicAsyncResponseConsumer(),
conn);
HttpResponse response = future.get();

需要解释的就是

HttpAsyncRequestProducer facilitates the process of asynchronous generation of HTTP requests. It is a callback interface whose methods get invoked to generate an HTTP request message and to stream message content to a non-blocking client side HTTP connection.

HttpAsyncResponseConsumer facilitates the process of asynchronous processing of HTTP responses. It is a callback interface whose methods get invoked to process an HTTP response message and to stream message content from a non-blocking client side HTTP connection.

这两个回调函数会在不同事件下触发。

最后就是NHttpConnection 是由pool 来管理的,用的时候其实只用lease就可以了,(这在我们做配置的时候也有体现,比如每个路由最多保持多少连接等)。

最后来个图来整体看一下吧(不是非常准确,是大概的模型)。



上图是根据源码和React Model猜想的内容,下面debug 和Thread Dumps 的验证结果:

第一步,主线程(或者说是客户线程),这里的Main线程,从池子AbstractNIOConnPool 里面租用 NHttpConnection

"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor.connect(DefaultConnectingIOReactor.java:233)
at org.apache.http.nio.pool.AbstractNIOConnPool.processPendingRequest(AbstractNIOConnPool.java:434)
at org.apache.http.nio.pool.AbstractNIOConnPool.lease(AbstractNIOConnPool.java:276)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.requestConnection(PoolingNHttpClientConnectionManager.java:266)
at org.apache.http.impl.nio.client.AbstractClientExchangeHandler.requestConnection(AbstractClientExchangeHandler.java:363)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.start(DefaultClientExchangeHandlerImpl.java:125)
at org.apache.http.impl.nio.client.InternalHttpAsyncClient.execute(InternalHttpAsyncClient.java:141)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(CloseableHttpAsyncClient.java:75)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(CloseableHttpAsyncClient.java:108)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClient.execute(CloseableHttpAsyncClient.java:92)
at org.tiger.httpclient.ApacheAsyncHttpClientTest.main(ApacheAsyncHttpClientTest.java:36)

第二步 :Thread Pool (created by defaultThreadFacoty). pool-1-thread-1 启动IOReactor. IOReactor 启动subIOReactor. 开始分发事件。

"pool-1-thread-1@1272" prio=5 tid=0xd nid=NA runnable
java.lang.Thread.State: RUNNABLE
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:308)
- locked <0x504> (a java.lang.Object)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:192)
at org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64)
at java.lang.Thread.run(Thread.java:745)

下面是 pool-1-thread-1 启动的代码:

public CloseableHttpAsyncClientBase(
final NHttpClientConnectionManager connmgr,
final ThreadFactory threadFactory,
final NHttpClientEventHandler handler) {
super();
this.connmgr = connmgr;
// 构造器,创建这个实例的时候试行的代码,每个实例构建这么一个线程
if (threadFactory != null && handler != null) {
this.reactorThread = threadFactory.newThread(new Runnable() {

@Override
public void run() {
try {
final IOEventDispatch ioEventDispatch = new InternalIODispatch(handler);
connmgr.execute(ioEventDispatch);
} catch (final Exception ex) {
log.error("I/O reactor terminated abnormally", ex);
} finally {
status.set(Status.STOPPED);
}
}

});
} else {
this.reactorThread = null;
}
this.status = new AtomicReference<Status>(Status.INACTIVE);
}

第三步:IOReactor dispatch IO 事件,并将结果写入Response,默认实现是放在内存(这里也是使用async 需要注意的地方)。 IOReactor处理完之后会继续去select,有新事件之后继续执行。

"I/O dispatcher 2@1303" prio=5 tid=0xf nid=NA runnable
java.lang.Thread.State: RUNNABLE
at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
- locked <0x631> (a sun.nio.ch.EPollSelectorImpl)
- locked <0x632> (a java.util.Collections$UnmodifiableSet)
- locked <0x633> (a sun.nio.ch.Util$2)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)  // 等待 select
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:255)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:588)
at java.lang.Thread.run(Thread.java:745)

Conclusion

AsyncHttpClient 主要的内容也就这么多,其中也涉及到了很多的设计模式,也考虑到了很多线程安全的问题。但最主要的还是 IOReactor模式了。AsyncHttpClient的具体使用呢其实很简单,通过HttpAsyncClientBuilder 构建即可使用。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息