您的位置:首页 > 理论基础 > 计算机网络

OkHttp完全解析之Http请求响应&&Socket连接重用详解

2018-01-18 17:35 465 查看

OkHttp 和Socket的关系

OkHttp 是一个独立的 Http网络请求框架,他的底层不依赖已经实现的HttpClient,或者HttpURLConnection,而是自己重新基于 Socket实现了Http协议的请求发起和响应。Socket的编程Java新手的老大难问题,下面,我试图理清楚OkHttp是如何基于Java的Socker实现Http协议的请求发起和获取响应。

注意:OkHttp的IO操作大量使用了Square公司一个OkIo的框架,建议先简单了了解下这个库。

其实任何一个Http网络请求框架的本质都是HTTP协议的请求发起和响应获取

为了更好的从整体到局部,先提前曝光一下OkHttp发起和响应Http的流程

梳理一下整体的流程



HttpCodec -Http操作的抽象接口

由于上一篇文章分析过OkHttp的整体流程了,这一次我们就只关心Http部分。我认为最分析起点应该是HttpCodec,根据注释,它负责 Encodes HTTP requests and decodes HTTP responses

也就是编码一个Http的请求和解码这个请求的响应

注意,它有两个实现类 Http1CodecHttp2Codec,由于HTTP/2还没有推广应用,我们重点关注HTTP/1.1也就是Http1Codec.

/** Encodes HTTP requests and decodes HTTP responses. */
public interface HttpCodec {
/**
* The timeout to use while discarding a stream of input data. Since this is used for connection
* reuse, this timeout should be significantly less than the time it takes to establish a new
* connection.
*/
int DISCARD_STREAM_TIMEOUT_MILLIS = 100;

/** Returns an output stream where the request body can be streamed. */
Sink createRequestBody(Request request, long contentLength);

/** This should update the HTTP engine's sentRequestMillis field. */
void writeRequestHeaders(Request request) throws IOException;

/** Flush the request to the underlying socket. */
void finishRequest() throws IOException;

/** Read and return response headers. */
Response.Builder readResponseHeaders() throws IOException;

/** Returns a stream that reads the response body. */
ResponseBody openResponseBody(Response response) throws IOException;

/**
* Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
* That may happen later by the connection pool thread.
*/
void cancel();
}


通过代码就能看的HttpCodec这个接口声明了负责Http请求编码和响应解码的基础方法:

void writeRequestHeaders(Request request) throws IOException;

发送请求头

Sink createRequestBody(Request request, long contentLength);

创建一个请求体的输入流Sink

void finishRequest() throws IOException;

flush Request的数据

Response.Builder readResponseHeaders() throws IOException;

读取请求的响应头

ResponseBody openResponseBody(Response response) throws IOException;

打开一个请求体的输入流

void cancel();

取消这个流

Http1Codec

其实官方的注释对这类描述的非常清楚

我简单翻译一下

Http1Codec代表着一个可以被用于发送HTTP/1.1 信息的Socker连接,这个类严格(如何严格?Http1Codec维护一个 state,执行某个生命周期的方法会先检查者state是否合法)遵循下面的生命周期:

1.写入请求头

2.打开一个Sink用于写入请求体

3.flush请的写入,然后关闭sink

4.读取响应头

5.打开一个Source读取请求体

6.读取完毕就关闭Source

其实就是对应HttpCodec的6个抽象方法

一个Http请求响应,可以分为发送请求接收响应两个部分,其中,

请求Request包括

Request Line

Request Header

Request Body

响应Respone包括

Respone Line

Respone Headers

Respone Body

发送Request Line&&Request Headers

/**
* Prepares the HTTP headers and sends them to the server.
*
* <p>For streaming requests with a body, headers must be prepared <strong>before</strong> the
* output stream has been written to. Otherwise the body would need to be buffered!
*
* <p>For non-streaming requests with a body, headers must be prepared <strong>after</strong> the
* output stream has been written to and closed. This ensures that the {@code Content-Length}
* header field receives the proper value.
*
*//
*
*
*/
@Override public void writeRequestHeaders(Request request) throws IOException {
String requestLine = RequestLine.get(
request, streamAllocation.connection().route().proxy().type());
writeRequest(request.headers(), requestLine);
}

/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}


其实代码和注释都很清晰,我主要添加一些关于Http的知识,方便大家理解

这是我抓包的一个HTTP/1.1请求头

GET https://www.fiddler2.com/UpdateCheck.aspx?isBeta=False HTTP/1.1
User-Agent: Fiddler/4.6.20171.9220 (.NET 4.6.2; WinNT 10.0.16299.0; zh-CN; 8xAMD64; Auto Update; Full Instance; Extensions: APITesting, AutoSaveExt, EventLog, Geoedge, HostsFile, RulesTab2, SAZClipboardFactory, SimpleFilter, Timeline)
Pragma: no-cache
Host: www.fiddler2.com
Accept-Language: zh-CN
Referer: http://fiddler2.com/client/4.6.20171.9220 Accept-Encoding: gzip, deflate
Connection: close


第一行就是请求行

GET https://www.fiddler2.com/UpdateCheck.aspx?isBeta=False HTTP/1.1
请求方法          请求url                                                                   HTTP协议版本


很简单,一行文本信息,我们需要从Request实例中取出相关的信息,拼接为一行字符串即可,RequestLine这个类帮我们完成了这个工作:

public final class RequestLine {

/**
* Returns the request status line, like "GET / HTTP/1.1". This is exposed to the application by
* {@link HttpURLConnection#getHeaderFields}, so it needs to be set even if the transport is
* HTTP/2.
*/
public static String get(Request request, Proxy.Type proxyType) {
StringBuilder result = new StringBuilder();
result.append(request.method());
result.append(' ');

if (includeAuthorityInRequestLine(request, proxyType)) {
result.append(request.url());
} else {
result.append(requestPath(request.url()));
}

result.append(" HTTP/1.1");
return result.toString();
}
}


发送请求头

请求头Headers是跟在请求行后面的,都是

key : value这种形式

User-Agent: Fiddler/4.6.20171.9220 (.NET 4.6.2; WinNT 10.0.16299.0; zh-CN; 8xAMD64; Auto Update; Full Instance; Extensions: APITesting, AutoSaveExt, EventLog, Geoedge, HostsFile, RulesTab2, SAZClipboardFactory, SimpleFilter, Timeline)
Pragma: no-cache
Host: www.fiddler2.com
Accept-Language: zh-CN


具体的代码在Http1Codec 的writeRequest():

/** Returns bytes of a request header for sending on an HTTP transport. */
public void writeRequest(Headers headers, String requestLine) throws IOException {
if (state != STATE_IDLE) throw new IllegalStateException("state: " + state);
sink.writeUtf8(requestLine).writeUtf8("\r\n");
for (int i = 0, size = headers.size(); i < size; i++) {
sink.writeUtf8(headers.name(i))
.writeUtf8(": ")
.writeUtf8(headers.value(i))
.writeUtf8("\r\n");
}
sink.writeUtf8("\r\n");
state = STATE_OPEN_REQUEST_BODY;
}


这里有几点要注意的

和Request Body 有关的Header

比如

"Content-Length: xxx"

"Content-Tpye: xxx"

这些一般只有在一个Request有请求体的使用才有,所以这些信息在RequestBody里面,

public abstract class RequestBody {
/** Returns the Content-Type header for this body. */
public abstract MediaType contentType();

/**
* Returns the number of bytes that will be written to {@code out} in a call to {@link #writeTo},
* or -1 if that count is unknown.
*/
public long contentLength() throws IOException {
return -1;
}
}


注意RequestBody是一个抽象类,非常正常,具体的不同类型的RequestBody都有自己具体的实现



BridgeInterceptor处理Request的特殊的Header

Cookie

Content-Type Content-Length Transfer-Encoding

Accept-Encoding

/**
* Bridges from application code to network code. First it builds a network request from a user
* request. Then it proceeds to call the network. Finally it builds a user response from the network
* response.
*/
public final class BridgeInterceptor implements Interceptor {
private final CookieJar cookieJar;

public BridgeInterceptor(CookieJar cookieJar) {
this.cookieJar = cookieJar;
}

@Override public Response intercept(Chain chain) throws IOException {
Request userRequest = chain.request();
Request.Builder requestBuilder = userRequest.newBuilder();

RequestBody body = userRequest.body();
if (body != null) {
//只有RequestBody不为空,才处理
//Content-Type
//Content-Length
//Transfer-Encoding
MediaType contentType = body.contentType();
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}

long contentLength = body.contentLength();
if (contentLength != -1) {
requestBuilder.header("Content-Length", Long.toString(contentLength));
requestBuilder.removeHeader("Transfer-Encoding");
} else {
requestBuilder.header("Transfer-Encoding", "chunked");
requestBuilder.removeHeader("Content-Length");
}
//通过上面可以看到Content-Length 和Transfer-Encoding是互斥的,不能同时存在
}

//Host,是HTTP/1.1新加的,没有指定Host,默认的Host就是请求的url
if (userRequest.header("Host") == null) {
requestBuilder.header("Host", hostHeader(userRequest.url(), false));
}

//Connection,是HTTP/1.1新加的,没有指定Connection,默认的Connection就是“Keep-Alive”,
if (userRequest.header("Connection") == null) {
requestBuilder.header("Connection", "Keep-Alive");
}

// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing
// the transfer stream.
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}

List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {
requestBuilder.header("Cookie", cookieHeader(cookies));
}

if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}

Response networkResponse = chain.proceed(requestBuilder.build());

HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());

Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);

if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}

return responseBuilder.build();
}

/** Returns a 'Cookie' HTTP request header with all cookies, like {@code a=b; c=d}. */
private String cookieHeader(List<Cookie> cookies) {
StringBuilder cookieHeader = new StringBuilder();
for (int i = 0, size = cookies.size(); i < size; i++) {
if (i > 0) {
cookieHeader.append("; ");
}
Cookie cookie = cookies.get(i);
cookieHeader.append(cookie.name()).append('=').append(cookie.value());
}
return cookieHeader.toString();
}
}


发送请求体 HttpCodec createRequestBody()

@Override public Sink createRequestBody(Request request, long contentLength) {
if ("chunked".equalsIgnoreCase(request.header("Transfer-Encoding"))) {
// Stream a request body of unknown length.
return newChunkedSink();
}

if (contentLength != -1) {
// Stream a request body of a known length.
return newFixedLengthSink(contentLength);
}

throw new IllegalStateException(
"Cannot stream a request body without chunked encoding or a known content length!");
}


可以看见,根据两种不同的RequestBody

长度确定 newFixedLengthSink(contentLength)

长度不定 创建 newChunkedSink()

调用这个方法的地方在CallSeverInterceptor

Request request = chain.request();

long sentRequestMillis = System.currentTimeMillis();
httpCodec.writeRequestHeaders(request);

if (HttpMethod.permitsRequestBody(request.method()) && request.body() != null) {
Sink requestBodyOut = httpCodec.createRequestBody(request, request.body().contentLength());
BufferedSink bufferedRequestBody = Okio.buffer(requestBodyOut);
request.body().writeTo(bufferedRequestBody);
bufferedRequestBody.close();
}

httpCodec.finishRequest();


Http响应的读取Response Body

public final class CallServerInterceptor implements Interceptor {

@Override public Response intercept(Chain chain) throws IOException {

//省略上面的代码

Response response = httpCodec.readResponseHeaders()
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();

int code = response.code();
if (forWebSocket && code == 101) {
// Connection is upgrading, but we need to ensure interceptors see a non-null response body.
response = response.newBuilder()
.body(Util.EMPTY_RESPONSE)
.build();
} else {
response = response.newBuilder()
.body(httpCodec.openResponseBody(response))
.build();
}

if ("close".equalsIgnoreCase(response.request().header("Connection"))
|| "close".equalsIgnoreCase(response.header("Connection"))) {
streamAllocation.noNewStreams();
}

if ((code == 204 || code == 205) && response.body().contentLength() > 0) {
throw new ProtocolException(
"HTTP " + code + " had non-zero Content-Length: " + response.body().contentLength());
}

return response;
}
}


可以看到,Respone的处理比较简单,就是读取响应行,响应头,然后创建一个ResponseBody的输入流即可

OkHttp对Socket连接的复用和清除

HTTP 1.0版本,每一次Http请求都会经历,TCP握手-》Http请求-》Http响应-》关闭连接,TCP挥手

这样带来了比较严重的性能问题,TCP握手,挥手,都是消耗响应时间和服务器资源的。

比如一个网页,里面有100个图片,如果这些图片都是在同一个Http服务器,而我们却要重复连接100次,显然效率是非常低下的。

HTTP 1.1 默认开启的Keep-Alive,,也就是说,一个Http请求结束后,如果没有超过设置的Keep-Alive时间,对应tcp连接,都不会关闭,下一个Http请求如果是同一个服务器的话,复用这个连接。

同时,如果是超过了一些条件,比如Keep-Alive时间超时了,或者空闲的连接太多了

需用有一个子线程不断去检查,清理。

首先我们

理清楚涉及的几个基础的类:

RealConnection

RealConnection是对Socket的简单封装,是代表一个TCP连接

ConnectionPool

用于缓存RealConnection的对象实例的连接池

StreamAllocation

StreamAllocaion创建一个执行请求的HttpCodec

HttpCodec里面真正处理连接的是RealConnection,Stream从ConnectionPool中寻找RealConnection,找到合适的就复用这个连接,没有找到就新建一个RealConnection,利用RealConnection创建一个HttpCodec

ConnectionPool-》连接池

从上面,我们得出一个结论,我们不希望每一个Http连接都创建销毁一个连接,而是希望能够复用共用一个地址的连接。

所以,就和我们常见的Java 对象池一样,OkHttp有一个管理所有连接的ConnectionPool

根据官方的注释:

ConnectionPool,管理HTTP,HTTP/2的连接复用从而减少网络延迟。拥有同一个地址的多个Http Request可能共用一个

Connection,而ConnectionPool实现了确定保持哪一个连接用于未来重用的策略

public final class ConnectionPool {
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));

/** The maximum number of idle connections for each address. */
private final int maxIdleConnections;
private final long keepAliveDurationNs;

//清理线程使用的Runnable,这是一个阻塞操作,先清理,然后阻塞一段剩余的时间,死循环继续这个操作
private final Runnable cleanupRunnable = new Runnable() {
@Override public void run() {
while (true) {
long waitNanos = cleanup(System.nanoTime());
if (waitNanos == -1) return;
if (waitNanos > 0) {
long waitMillis = waitNanos / 1000000L;
waitNanos -= (waitMillis * 1000000L);
synchronized (ConnectionPool.this) {
try {
ConnectionPool.this.wait(waitMillis, (int) waitNanos);
} catch (InterruptedException ignored) {
}
}
}
}
}
};

//使用一个双端队列保存当前所有的Connection
private final Deque<RealConnection> connections = new ArrayDeque<>();
final RouteDatabase routeDatabase = new RouteDatabase();
//通过标识变量,确保每一次只有一个清理线程执行
boolean cleanupRunning;

/**默认设置的OKHttpClient,运行最大的空闲Connection数目5,最大超时时间5分钟
*因为如果空闲的连接太多,或者时间很长都没有被复用,都会导致性能损失
* Create a new connection pool with tuning parameters appropriate for a single-user application.
* The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
* this pool holds up to 5 idle connections which will be evicted after 5 minutes of inactivity.
*/
public ConnectionPool() {
this(5, 5, TimeUnit.MINUTES);
}

public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) {
this.maxIdleConnections = maxIdleConnections;
this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration);

// Put a floor on the keep alive duration, otherwise cleanup will spin loop.
if (keepAliveDuration <= 0) {
throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration);
}
}

//返回空闲连接的数目?什么是空闲连接,引用的次数为0也就是allocation为空的
/** Returns the number of idle connections in the pool. */
public synchronized int idleConnectionCount() {
int total = 0;
for (RealConnection connection : connections) {
if (connection.allocations.isEmpty()) total++;
}
return total;
}

/**
*返回全部的连接数目
* Returns total number of connections in the pool. Note that prior to OkHttp 2.7 this included
* only idle connections and HTTP/2 connections. Since OkHttp 2.7 this includes all connections,
* both active and inactive. Use {@link #idleConnectionCount()} to count connections not currently
* in use.
*/
public synchronized int connectionCount() {
return connections.size();
}

/**
*在连接池中找到可以复用的Connection(地址相同,单个连接分配次数没有超过限制,并且连接不是noNewStreams)
*Returns a recycled connection to {@code address}, or null if no such connection exists.
**/
RealConnection get(Address address, StreamAllocation streamAllocation) {
assert (Thread.holdsLock(this));
for (RealConnection connection : connections) {
if (connection.allocations.size() < connection.allocationLimit
&& address.equals(connection.route().address)
&& !connection.noNewStreams) {
streamAllocation.acquire(connection);
return connection;
}
}
return null;
}

/**
*在连接池中添加一个Connection的时候需要指向一次清理Runnable
**/
void put(RealConnection connection) {
assert (Thread.holdsLock(this));
if (!cleanupRunning) {
cleanupRunning = true;
executor.execute(cleanupRunnable);
}
connections.add(connection);
}

/**提醒连接池,这个连接开始闲置,某种条件下直接移除这个连接,其他条件下
* 指向连接清理Runnable
* Notify this pool that {@code connection} has become idle. Returns true if the connection has
* been removed from the pool and should be closed.
*/
boolean connectionBecameIdle(RealConnection connection) {
assert (Thread.holdsLock(this));
if (connection.noNewStreams || maxIdleConnections == 0) {
connections.remove(connection);
return true;
} else {
notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
return false;
}
}

/** Close and remove all idle connections in the pool. */
public void evictAll() {
List<RealConnection> evictedConnections = new ArrayList<>();
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();
if (connection.allocations.isEmpty()) {
connection.noNewStreams = true;
evictedConnections.add(connection);
i.remove();
}
}
}

for (RealConnection connection : evictedConnections) {
closeQuietly(connection.socket());
}
}

/**清理空闲时间久的连接
* Performs maintenance on this pool, evicting the connection that has been idle the longest if
* either it has exceeded the keep alive limit or the idle connections limit.
*
* <p>Returns the duration in nanos to sleep until the next scheduled call to this method. Returns
* -1 if no further cleanups are required.
*/
long cleanup(long now) {
int inUseConnectionCount = 0;
int idleConnectionCount = 0;
RealConnection longestIdleConnection = null;
long longestIdleDurationNs = Long.MIN_VALUE;

// Find either a connection to evict, or the time that the next eviction is due.
synchronized (this) {
for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
RealConnection connection = i.next();

// If the connection is in use, keep searching.
if (pruneAndGetAllocationCount(connection, now) > 0) {
inUseConnectionCount++;
continue;
}

idleConnectionCount++;

// If the connection is ready to be evicted, we're done.
long idleDurationNs = now - connection.idleAtNanos;
if (idleDurationNs > longestIdleDurationNs) {
longestIdleDurationNs = idleDurationNs;
longestIdleConnection = connection;
}
}

if (longestIdleDurationNs >= this.keepAliveDurationNs
|| idleConnectionCount > this.maxIdleConnections) {
// We've found a connection to evict. Remove it from the list, then close it below (outside
// of the synchronized block).
connections.remove(longestIdleConnection);
} else if (idleConnectionCount > 0) {
// A connection will be ready to evict soon.
return keepAliveDurationNs - longestIdleDurationNs;
} else if (inUseConnectionCount > 0) {
// All connections are in use. It'll be at least the keep alive duration 'til we run again.
return keepAliveDurationNs;
} else {
// No connections, idle or in use.
cleanupRunning = false;
return -1;
}
}

closeQuietly(longestIdleConnection.socket());

// Cleanup again immediately.
return 0;
}

/**
* Prunes any leaked allocations and then returns the number of remaining live allocations on
* {@code connection}. Allocations are leaked if the connection is tracking them but the
* application code has abandoned them. Leak detection is imprecise and relies on garbage
* collection.
*/
private int pruneAndGetAllocationCount(RealConnection connection, long now) {
List<Reference<StreamAllocation>> references = connection.allocations;
for (int i = 0; i < references.size(); ) {
Reference<StreamAllocation> reference = references.get(i);

if (reference.get() != null) {
i++;
continue;
}

// We've discovered a leaked allocation. This is an application bug.
StreamAllocation.StreamAllocationReference streamAllocRef =
(StreamAllocation.StreamAllocationReference) reference;
String message = "A connection to " + connection.route().address().url()
+ " was leaked. Did you forget to close a response body?";
Platform.get().logCloseableLeak(message, streamAllocRef.callStackTrace);

references.remove(i);
connection.noNewStreams = true;

// If this was the last allocation, the connection is eligible for immediate eviction.
if (references.isEmpty()) {
connection.idleAtNanos = now - keepAliveDurationNs;
return 0;
}
}

return references.size();
}
}


小结

ConnectionPool其实并不复杂,理解它是用于实现空闲连接的管理和复用的就能看懂OKHttp的Socket重用和自动清理机制。

总结

OKHttp只是一个Http协议的请求框架,大家多去了了解Http的知识,结合框架代码实现,收获匪浅的。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: