您的位置:首页 > 理论基础 > 计算机网络

HttpCore组件案例程序(Java描述) (Http Components-- HttpCore Examples)

2012-08-13 08:33 507 查看

一、基本概念

      HttpCore是一套实现了HTTP协议最基础方面的组件,尽管HTTP协议在使用最小占用来开发全功能的客户端和服务器的HTTP服务是足够的。
      HttpCore有如下的范围和目标(@Author:南磊):

1. HttpCore范围


  ■构建客户端/代理/服务器端HTTP服务一致的API
  ■构建同步和异步HTTP服务一致的API
  ■基于阻塞(经典的)和非阻塞(NIO)I/O模型的一套低等级组件

2. HttpCore目标


  ■实现最基本的HTTP传输方面
  ■良好性能和清晰度&表现力之间的平衡
  ■小的(预测)内存占用
  ■自我包含的类库(没有超越JRE的额外依赖)

3.  什么是HttpCore不能做的


  ■HttpClient的替代
  ■Servlet容器或Servlet API竞争对手的替代

二、源代码详解

1.基本的Http获取程序(基于同步或加锁的I/O模型)

This example demonstrates how to execute a series of synchronous (blocking) HTTP GET requests.
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation.  For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.http.examples;

import java.net.Socket;

import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.DefaultHttpClientConnection;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpRequestExecutor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;

/**
* Elemental example for executing multiple GET requests sequentially.
* <p>
* Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
* It is NOT intended to demonstrate the most efficient way of building an HTTP client.
*/
public class ElementalHttpGet {

public static void main(String[] args) throws Exception {

HttpParams params = new SyncBasicHttpParams();
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
HttpProtocolParams.setContentCharset(params, "UTF-8");
HttpProtocolParams.setUserAgent(params, "HttpComponents/1.1");
HttpProtocolParams.setUseExpectContinue(params, true);

HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Required protocol interceptors
new RequestContent(),
new RequestTargetHost(),
// Recommended protocol interceptors
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});

HttpRequestExecutor httpexecutor = new HttpRequestExecutor();

HttpContext context = new BasicHttpContext(null);
HttpHost host = new HttpHost("localhost", 8080);

DefaultHttpClientConnection conn = new DefaultHttpClientConnection();
ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy();

context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host);

try {

String[] targets = {
"/",
"/servlets-examples/servlet/RequestInfoExample",
"/somewhere%20in%20pampa"};

for (int i = 0; i < targets.length; i++) {
if (!conn.isOpen()) {
Socket socket = new Socket(host.getHostName(), host.getPort());
conn.bind(socket, params);
}
BasicHttpRequest request = new BasicHttpRequest("GET", targets[i]);
System.out.println(">> Request URI: " + request.getRequestLine().getUri());

request.setParams(params);
httpexecutor.preProcess(request, httpproc, context);
HttpResponse response = httpexecutor.execute(request, conn, context);
response.setParams(params);
httpexecutor.postProcess(response, httpproc, context);

System.out.println("<< Response: " + response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
System.out.println("==============");
if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
System.out.println("Connection kept alive...");
}
}
} finally {
conn.close();
}
}

}



2.基本的Http发送程序(基于同步或加锁的I/O模型)

This example demonstrates how to execute a series of synchronous (blocking) HTTP POST requests that enclose entity content of various types: a string, a byte array, an arbitrary input
stream.

/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation.  For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.http.examples;

import java.io.ByteArrayInputStream;
import java.net.Socket;

import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpVersion;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.DefaultHttpClientConnection;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.params.HttpParams;
import org.apache.http.params.HttpProtocolParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpRequestExecutor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;

/**
* Elemental example for executing multiple POST requests sequentially.
* <p>
* Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
* It is NOT intended to demonstrate the most efficient way of building an HTTP client.
*/
public class ElementalHttpPost {

public static void main(String[] args) throws Exception {

HttpParams params = new SyncBasicHttpParams();
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
HttpProtocolParams.setContentCharset(params, "UTF-8");
HttpProtocolParams.setUserAgent(params, "Test/1.1");
HttpProtocolParams.setUseExpectContinue(params, true);

HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Required protocol interceptors
new RequestContent(),
new RequestTargetHost(),
// Recommended protocol interceptors
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});

HttpRequestExecutor httpexecutor = new HttpRequestExecutor();

HttpContext context = new BasicHttpContext(null);

HttpHost host = new HttpHost("localhost", 8080);

DefaultHttpClientConnection conn = new DefaultHttpClientConnection();
ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy();

context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn);
context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host);

try {

HttpEntity[] requestBodies = {
new StringEntity(
"This is the first test request", "UTF-8"),
new ByteArrayEntity(
"This is the second test request".getBytes("UTF-8")),
new InputStreamEntity(
new ByteArrayInputStream(
"This is the third test request (will be chunked)"
.getBytes("UTF-8")), -1)
};

for (int i = 0; i < requestBodies.length; i++) {
if (!conn.isOpen()) {
Socket socket = new Socket(host.getHostName(), host.getPort());
conn.bind(socket, params);
}
BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST",
"/servlets-examples/servlet/RequestInfoExample");
request.setEntity(requestBodies[i]);
System.out.println(">> Request URI: " + request.getRequestLine().getUri());

request.setParams(params);
httpexecutor.preProcess(request, httpproc, context);
HttpResponse response = httpexecutor.execute(request, conn, context);
response.setParams(params);
httpexecutor.postProcess(response, httpproc, context);

System.out.println("<< Response: " + response.getStatusLine());
System.out.println(EntityUtils.toString(response.getEntity()));
System.out.println("==============");
if (!connStrategy.keepAlive(response, context)) {
conn.close();
} else {
System.out.println("Connection kept alive...");
}
}
} finally {
conn.close();
}
}

}


 

3.基本的Http服务器程序(基于同步或加锁的I/O模型)

 
This is an example of an HTTP/1.1 file server based on a synchronous (blocking) I/O model.

/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation.  For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/

package org.apache.http.examples;

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.URLDecoder;
import java.nio.charset.Charset;
import java.util.Locale;

import org.apache.http.ConnectionClosedException;
import org.apache.http.HttpEntity;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpServerConnection;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.FileEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.DefaultHttpResponseFactory;
import org.apache.http.impl.DefaultHttpServerConnection;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.HttpRequestHandler;
import org.apache.http.protocol.HttpRequestHandlerRegistry;
import org.apache.http.protocol.HttpService;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
import org.apache.http.util.EntityUtils;

/**
* Basic, yet fully functional and spec compliant, HTTP/1.1 file server.
* <p>
* Please note the purpose of this application is demonstrate the usage of HttpCore APIs.
* It is NOT intended to demonstrate the most efficient way of building an HTTP file server.
*
*
*/
public class ElementalHttpServer {

public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Please specify document root directory");
System.exit(1);
}
Thread t = new RequestListenerThread(8080, args[0]);
t.setDaemon(false);
t.start();
}

static class HttpFileHandler implements HttpRequestHandler  {

private final String docRoot;

public HttpFileHandler(final String docRoot) {
super();
this.docRoot = docRoot;
}

public void handle(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {

String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
String target = request.getRequestLine().getUri();

if (request instanceof HttpEntityEnclosingRequest) {
HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity();
byte[] entityContent = EntityUtils.toByteArray(entity);
System.out.println("Incoming entity content (bytes): " + entityContent.length);
}

final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {

response.setStatusCode(HttpStatus.SC_NOT_FOUND);
StringEntity entity = new StringEntity(
"<html><body><h1>File" + file.getPath() +
" not found</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");

} else if (!file.canRead() || file.isDirectory()) {

response.setStatusCode(HttpStatus.SC_FORBIDDEN);
StringEntity entity = new StringEntity(
"<html><body><h1>Access denied</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());

} else {

response.setStatusCode(HttpStatus.SC_OK);
FileEntity body = new FileEntity(file, ContentType.create("text/html", (Charset) null));
response.setEntity(body);
System.out.println("Serving file " + file.getPath());
}
}

}

static class RequestListenerThread extends Thread {

private final ServerSocket serversocket;
private final HttpParams params;
private final HttpService httpService;

public RequestListenerThread(int port, final String docroot) throws IOException {
this.serversocket = new ServerSocket(port);
this.params = new SyncBasicHttpParams();
this.params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpComponents/1.1");

// Set up the HTTP protocol processor
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});

// Set up request handlers
HttpRequestHandlerRegistry reqistry = new HttpRequestHandlerRegistry();
reqistry.register("*", new HttpFileHandler(docroot));

// Set up the HTTP service
this.httpService = new HttpService(
httpproc,
new DefaultConnectionReuseStrategy(),
new DefaultHttpResponseFactory(),
reqistry,
this.params);
}

@Override
public void run() {
System.out.println("Listening on port " + this.serversocket.getLocalPort());
while (!Thread.interrupted()) {
try {
// Set up HTTP connection
Socket socket = this.serversocket.accept();
DefaultHttpServerConnection conn = new DefaultHttpServerConnection();
System.out.println("Incoming connection from " + socket.getInetAddress());
conn.bind(socket, this.params);

// Start worker thread
Thread t = new WorkerThread(this.httpService, conn);
t.setDaemon(true);
t.start();
} catch (InterruptedIOException ex) {
break;
} catch (IOException e) {
System.err.println("I/O error initialising connection thread: "
+ e.getMessage());
break;
}
}
}
}

static class WorkerThread extends Thread {

private final HttpService httpservice;
private final HttpServerConnection conn;

public WorkerThread(
final HttpService httpservice,
final HttpServerConnection conn) {
super();
this.httpservice = httpservice;
this.conn = conn;
}

@Override
public void run() {
System.out.println("New connection thread");
HttpContext context = new BasicHttpContext(null);
try {
while (!Thread.interrupted() && this.conn.isOpen()) {
this.httpservice.handleRequest(this.conn, context);
}
} catch (ConnectionClosedException ex) {
System.err.println("Client closed connection");
} catch (IOException ex) {
System.err.println("I/O error: " + ex.getMessage());
} catch (HttpException ex) {
System.err.println("Unrecoverable HTTP protocol violation: " + ex.getMessage());
} finally {
try {
this.conn.shutdown();
} catch (IOException ignore) {}
}
}

}

}


 

4.异步的Http获取程序(基于非阻塞I/O模型:NIO框架)

 
This example demonstrates how HttpCore NIO can be used to execute multiple HTTP requests asynchronously using only one
I/O thread.

注释:
I. I/O反应器

      HttpCore NIO是基于由Doug Lea定义描述的反应器模式的。I/O反应器的目标是反应I/O事件然后分发事件通知到独立的I/O会话中。I/O反应器模式的主要思想是从每次连接模型摆脱一个由经典阻塞I/O模型强加的线程。IOReactor接口代表了反应器模式抽象的对象实现。在其内部,IOReactor实现封装了了NIO java.nio.channels.Selector的功能。

      I/O反应器通常使用一小部分分发线程(通常是一个)来分发I/O事件通知到一个很大数量(通常是好几千)的I/O会话或连接。通常建议每个CPU核心只有一个分发线程。

II.I/O分发器

      IOReactor实现使用了IOEventDispatch接口来通知事件客户端为特定的会话挂起。IOEventDispatch的所有方法在I/O反应器的分发线程上执行。因此,在事件方法上的处理不会阻塞分发线程很长时间,这一点是很重要的,而且I/O反应器也不可能对其它事件做出反应。

      由IOEventDispatch接口定义的通用I/O事件:

connected:当一个新的会话创建时触发。
inputReady:当新的会话等待输入时触发。
outputReady:当会话准备输出时触发。
timeout:当会话超时时触发。
disconnected:当会话终止时触发。
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation.  For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples.nio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.concurrent.CountDownLatch;

import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.pool.BasicNIOConnPool;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.nio.protocol.BasicAsyncRequestProducer;
import org.apache.http.nio.protocol.BasicAsyncResponseConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;

/**
* Asynchronous HTTP/1.1 client.
*/
public class NHttpClient {

public static void main(String[] args) throws Exception {
// HTTP parameters for the client
HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");
// Create HTTP protocol processing chain
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] {
// Use standard client-side protocol interceptors
new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()});
// Create client-side HTTP protocol handler
HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor();
// Create client-side I/O event dispatch
final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params);
// Create client-side I/O reactor
IOReactorConfig config = new IOReactorConfig();
config.setIoThreadCount(1);
final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config);
// Create HTTP connection pool
BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, params);
// Limit total number of connections to just two
pool.setDefaultMaxPerRoute(2);
pool.setMaxTotal(2);
// Run the I/O reactor in a separate thread
Thread t = new Thread(new Runnable() {

public void run() {
try {
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}

});
// Start the client thread
t.start();
// Create HTTP requester
HttpAsyncRequester requester = new HttpAsyncRequester(
httpproc, new DefaultConnectionReuseStrategy(), params);
// Execute HTTP GETs to the following hosts and
HttpHost[] targets = new HttpHost[] {
new HttpHost("www.apache.org", 80, "http"),
new HttpHost("www.verisign.com", 443, "https"),
new HttpHost("www.google.com", 80, "http")
};
final CountDownLatch latch = new CountDownLatch(targets.length);
for (final HttpHost target: targets) {
BasicHttpRequest request = new BasicHttpRequest("GET", "/");
requester.execute(
new BasicAsyncRequestProducer(target, request),
new BasicAsyncResponseConsumer(),
pool,
new BasicHttpContext(),
// Handle HTTP response from a callback
new FutureCallback<HttpResponse>() {

public void completed(final HttpResponse response) {
latch.countDown();
System.out.println(target + "->" + response.getStatusLine());
}

public void failed(final Exception ex) {
latch.countDown();
System.out.println(target + "->" + ex);
}

public void cancelled() {
latch.countDown();
System.out.println(target + " cancelled");
}

});
}
latch.await();
System.out.println("Shutting down I/O reactor");
ioReactor.shutdown();
System.out.println("Done");
}

}


 

5.异步的Http服务器程序(基于非阻塞I/O模型:NIO框架)

 

This example demonstrates the use of HttpCore NIO to build an asynchronous (non-blocking) HTTP server capable of direct channel
(zero copy) data transfer.

/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples.nio;

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Locale;

import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;

import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NFileEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;

/**
* HTTP/1.1 file server based on the non-blocking I/O model and capable of direct channel
* (zero copy) data transfer.
*/
public class NHttpServer {

public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Please specify document root directory");
System.exit(1);
}
// Document root directory
File docRoot = new File(args[0]);
int port = 8080;
if (args.length >= 2) {
port = Integer.parseInt(args[1]);
}
// HTTP parameters for the server
HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
// Create HTTP protocol processing chain
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
// Use standard server-side protocol interceptors
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});
// Create request handler registry
HttpAsyncRequestHandlerRegistry reqistry = new HttpAsyncRequestHandlerRegistry();
// Register the default handler for all URIs
reqistry.register("*", new HttpFileHandler(docRoot));
// Create server-side HTTP protocol handler
HttpAsyncService protocolHandler = new HttpAsyncService(
httpproc, new DefaultConnectionReuseStrategy(), reqistry, params) {

@Override
public void connected(final NHttpServerConnection conn) {
System.out.println(conn + ": connection open");
super.connected(conn);
}

@Override
public void closed(final NHttpServerConnection conn) {
System.out.println(conn + ": connection closed");
super.closed(conn);
}

};
// Create HTTP connection factory
NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory;
if (port == 8443) {
// Initialize SSL context
ClassLoader cl = NHttpServer.class.getClassLoader();
URL url = cl.getResource("my.keystore");
if (url == null) {
System.out.println("Keystore not found");
System.exit(1);
}
KeyStore keystore = KeyStore.getInstance("jks");
keystore.load(url.openStream(), "secret".toCharArray());
KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(
KeyManagerFactory.getDefaultAlgorithm());
kmfactory.init(keystore, "secret".toCharArray());
KeyManager[] keymanagers = kmfactory.getKeyManagers();
SSLContext sslcontext = SSLContext.getInstance("TLS");
sslcontext.init(keymanagers, null, null);
connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);
} else {
connFactory = new DefaultNHttpServerConnectionFactory(params);
}
// Create server-side I/O event dispatch
IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
// Create server-side I/O reactor
ListeningIOReactor ioReactor = new DefaultListeningIOReactor();
try {
// Listen of the given port
ioReactor.listen(new InetSocketAddress(port));
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}

static class HttpFileHandler implements HttpAsyncRequestHandler<HttpRequest> {

private final File docRoot;

public HttpFileHandler(final File docRoot) {
super();
this.docRoot = docRoot;
}

public HttpAsyncRequestConsumer<HttpRequest> processRequest(
final HttpRequest request,
final HttpContext context) {
// Buffer request content in memory for simplicity
return new BasicAsyncRequestConsumer();
}

public void handle(
final HttpRequest request,
final HttpAsyncExchange httpexchange,
final HttpContext context) throws HttpException, IOException {
HttpResponse response = httpexchange.getResponse();
handleInternal(request, response, context);
httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
}

private void handleInternal(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {

String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}

String target = request.getRequestLine().getUri();
final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {

response.setStatusCode(HttpStatus.SC_NOT_FOUND);
NStringEntity entity = new NStringEntity(
"<html><body><h1>File" + file.getPath() +
" not found</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");

} else if (!file.canRead() || file.isDirectory()) {

response.setStatusCode(HttpStatus.SC_FORBIDDEN);
NStringEntity entity = new NStringEntity(
"<html><body><h1>Access denied</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());

} else {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
response.setStatusCode(HttpStatus.SC_OK);
NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));
response.setEntity(body);
System.out.println(conn + ": serving file " + file.getPath());
}
}

}

}


 

6.异步的Http逆向代理程序(基于非阻塞I/O模型:NIO框架)

This example demonstrates how HttpCore NIO can be used to build an asynchronous, fully streaming reverse HTTP proxy.

/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation.  For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples.nio;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Locale;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.http.ConnectionReuseStrategy;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpException;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.HttpVersion;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.EnglishReasonPhraseCatalog;
import org.apache.http.impl.nio.DefaultHttpClientIODispatch;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.pool.BasicNIOConnPool;
import org.apache.http.impl.nio.pool.BasicNIOPoolEntry;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHttpEntityEnclosingRequest;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.message.BasicHttpResponse;
import org.apache.http.nio.ContentDecoder;
import org.apache.http.nio.ContentEncoder;
import org.apache.http.nio.IOControl;
import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.pool.NIOConnFactory;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestExecutor;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequester;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerResolver;
import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.apache.http.nio.protocol.HttpAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.ConnectingIOReactor;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.pool.PoolStats;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;

/**
* Asynchronous, fully streaming HTTP/1.1 reverse proxy.
*/
public class NHttpReverseProxy {

public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.out.println("Usage: NHttpReverseProxy <hostname> [port]");
System.exit(1);
}
URI uri = new URI(args[0]);
int port = 8080;
if (args.length > 1) {
port = Integer.parseInt(args[1]);
}

// Target host
HttpHost targetHost = new HttpHost(
uri.getHost(),
uri.getPort() > 0 ? uri.getPort() : 80,
uri.getScheme() != null ? uri.getScheme() : "http");

System.out.println("Reverse proxy to " + targetHost);

HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 30000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "Test/1.1")
.setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1");

IOReactorConfig config = new IOReactorConfig();
config.setIoThreadCount(1);
final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(config);
final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(config);

// Set up HTTP protocol processor for incoming connections
HttpProcessor inhttpproc = new ImmutableHttpProcessor(
new HttpResponseInterceptor[] {
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});

// Set up HTTP protocol processor for outgoing connections
HttpProcessor outhttpproc = new ImmutableHttpProcessor(
new HttpRequestInterceptor[] {
new RequestContent(),
new RequestTargetHost(),
new RequestConnControl(),
new RequestUserAgent(),
new RequestExpectContinue()
});

ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler();
HttpAsyncRequester executor = new HttpAsyncRequester(
outhttpproc, new ProxyOutgoingConnectionReuseStrategy(), params);

ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, params);
connPool.setMaxTotal(100);
connPool.setDefaultMaxPerRoute(20);

HttpAsyncRequestHandlerRegistry handlerRegistry = new HttpAsyncRequestHandlerRegistry();
handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool));

ProxyServiceHandler serviceHandler = new ProxyServiceHandler(
inhttpproc, new ProxyIncomingConnectionReuseStrategy(), handlerRegistry, params);

final IOEventDispatch connectingEventDispatch = new DefaultHttpClientIODispatch(
clientHandler, params);

final IOEventDispatch listeningEventDispatch = new DefaultHttpServerIODispatch(
serviceHandler, params);

Thread t = new Thread(new Runnable() {

public void run() {
try {
connectingIOReactor.execute(connectingEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException ex) {
ex.printStackTrace();
} finally {
try {
listeningIOReactor.shutdown();
} catch (IOException ex2) {
ex2.printStackTrace();
}
}
}

});
t.start();
try {
listeningIOReactor.listen(new InetSocketAddress(port));
listeningIOReactor.execute(listeningEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException ex) {
ex.printStackTrace();
} finally {
try {
connectingIOReactor.shutdown();
} catch (IOException ex2) {
ex2.printStackTrace();
}
}
}

static class ProxyHttpExchange {

private final ByteBuffer inBuffer;
private final ByteBuffer outBuffer;

private volatile String id;
private volatile HttpHost target;
private volatile HttpAsyncExchange responseTrigger;
private volatile IOControl originIOControl;
private volatile IOControl clientIOControl;
private volatile HttpRequest request;
private volatile boolean requestReceived;
private volatile HttpResponse response;
private volatile boolean responseReceived;
private volatile Exception ex;

public ProxyHttpExchange() {
super();
this.inBuffer = ByteBuffer.allocateDirect(10240);
this.outBuffer = ByteBuffer.allocateDirect(10240);
}

public ByteBuffer getInBuffer() {
return this.inBuffer;
}

public ByteBuffer getOutBuffer() {
return this.outBuffer;
}

public String getId() {
return this.id;
}

public void setId(final String id) {
this.id = id;
}

public HttpHost getTarget() {
return this.target;
}

public void setTarget(final HttpHost target) {
this.target = target;
}

public HttpRequest getRequest() {
return this.request;
}

public void setRequest(final HttpRequest request) {
this.request = request;
}

public HttpResponse getResponse() {
return this.response;
}

public void setResponse(final HttpResponse response) {
this.response = response;
}

public HttpAsyncExchange getResponseTrigger() {
return this.responseTrigger;
}

public void setResponseTrigger(final HttpAsyncExchange responseTrigger) {
this.responseTrigger = responseTrigger;
}

public IOControl getClientIOControl() {
return this.clientIOControl;
}

public void setClientIOControl(final IOControl clientIOControl) {
this.clientIOControl = clientIOControl;
}

public IOControl getOriginIOControl() {
return this.originIOControl;
}

public void setOriginIOControl(final IOControl originIOControl) {
this.originIOControl = originIOControl;
}

public boolean isRequestReceived() {
return this.requestReceived;
}

public void setRequestReceived() {
this.requestReceived = true;
}

public boolean isResponseReceived() {
return this.responseReceived;
}

public void setResponseReceived() {
this.responseReceived = true;
}

public Exception getException() {
return this.ex;
}

public void setException(final Exception ex) {
this.ex = ex;
}

public void reset() {
this.inBuffer.clear();
this.outBuffer.clear();
this.target = null;
this.id = null;
this.responseTrigger = null;
this.clientIOControl = null;
this.originIOControl = null;
this.request = null;
this.requestReceived = false;
this.response = null;
this.responseReceived = false;
this.ex = null;
}

}

static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> {

private final HttpHost target;
private final HttpAsyncRequester executor;
private final BasicNIOConnPool connPool;
private final AtomicLong counter;

public ProxyRequestHandler(
final HttpHost target,
final HttpAsyncRequester executor,
final BasicNIOConnPool connPool) {
super();
this.target = target;
this.executor = executor;
this.connPool = connPool;
this.counter = new AtomicLong(1);
}

public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest(
final HttpRequest request,
final HttpContext context) {
ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange");
if (httpExchange == null) {
httpExchange = new ProxyHttpExchange();
context.setAttribute("http-exchange", httpExchange);
}
synchronized (httpExchange) {
httpExchange.reset();
String id = String.format("%08X", this.counter.getAndIncrement());
httpExchange.setId(id);
httpExchange.setTarget(this.target);
return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool);
}
}

public void handle(
final ProxyHttpExchange httpExchange,
final HttpAsyncExchange responseTrigger,
final HttpContext context) throws HttpException, IOException {
synchronized (httpExchange) {
Exception ex = httpExchange.getException();
if (ex != null) {
System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex);
int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
String message = ex.getMessage();
if (message == null) {
message = "Unexpected error";
}
response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT));
responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered");
}
HttpResponse response = httpExchange.getResponse();
if (response != null) {
responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange));
System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered");
}
// No response yet.
httpExchange.setResponseTrigger(responseTrigger);
}
}

}

static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> {

private final ProxyHttpExchange httpExchange;
private final HttpAsyncRequester executor;
private final BasicNIOConnPool connPool;

private volatile boolean completed;

public ProxyRequestConsumer(
final ProxyHttpExchange httpExchange,
final HttpAsyncRequester executor,
final BasicNIOConnPool connPool) {
super();
this.httpExchange = httpExchange;
this.executor = executor;
this.connPool = connPool;
}

public void close() throws IOException {
}

public void requestReceived(final HttpRequest request) {
synchronized (this.httpExchange) {
System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine());
this.httpExchange.setRequest(request);
this.executor.execute(
new ProxyRequestProducer(this.httpExchange),
new ProxyResponseConsumer(this.httpExchange),
this.connPool);
}
}

public void consumeContent(
final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setClientIOControl(ioctrl);
// Receive data from the client
ByteBuffer buf = this.httpExchange.getInBuffer();
int n = decoder.read(buf);
System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read");
if (decoder.isCompleted()) {
System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read");
}
// If the buffer is full, suspend client input until there is free
// space in the buffer
if (!buf.hasRemaining()) {
ioctrl.suspendInput();
System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input");
}
// If there is some content in the input buffer make sure origin
// output is active
if (buf.position() > 0) {
if (this.httpExchange.getOriginIOControl() != null) {
this.httpExchange.getOriginIOControl().requestOutput();
System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output");
}
}
}
}

public void requestCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
this.completed = true;;
System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed");
this.httpExchange.setRequestReceived();
if (this.httpExchange.getOriginIOControl() != null) {
this.httpExchange.getOriginIOControl().requestOutput();
System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output");
}
}
}

public Exception getException() {
return null;
}

public ProxyHttpExchange getResult() {
return this.httpExchange;
}

public boolean isDone() {
return this.completed;
}

public void failed(final Exception ex) {
System.out.println("[client->proxy] " + ex.toString());
}

}

static class ProxyRequestProducer implements HttpAsyncRequestProducer {

private final ProxyHttpExchange httpExchange;

public ProxyRequestProducer(final ProxyHttpExchange httpExchange) {
super();
this.httpExchange = httpExchange;
}

public void close() throws IOException {
}

public HttpHost getTarget() {
synchronized (this.httpExchange) {
return this.httpExchange.getTarget();
}
}

public HttpRequest generateRequest() {
synchronized (this.httpExchange) {
HttpRequest request = this.httpExchange.getRequest();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine());
// Rewrite request!!!!
if (request instanceof HttpEntityEnclosingRequest) {
BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest(
request.getRequestLine());
r.setEntity(((HttpEntityEnclosingRequest) request).getEntity());
return r;
} else {
return new BasicHttpRequest(request.getRequestLine());
}
}
}

public void produceContent(
final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setOriginIOControl(ioctrl);
// Send data to the origin server
ByteBuffer buf = this.httpExchange.getInBuffer();
buf.flip();
int n = encoder.write(buf);
buf.compact();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written");
// If there is space in the buffer and the message has not been
// transferred, make sure the client is sending more data
if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) {
if (this.httpExchange.getClientIOControl() != null) {
this.httpExchange.getClientIOControl().requestInput();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request client input");
}
}
if (buf.position() == 0) {
if (this.httpExchange.isRequestReceived()) {
encoder.complete();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written");
} else {
// Input buffer is empty. Wait until the client fills up
// the buffer
ioctrl.suspendOutput();
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output");
}
}
}
}

public void requestCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request completed");
}
}

public boolean isRepeatable() {
return false;
}

public void resetRequest() {
}

public void failed(final Exception ex) {
System.out.println("[proxy->origin] " + ex.toString());
}

}

static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> {

private final ProxyHttpExchange httpExchange;

private volatile boolean completed;

public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) {
super();
this.httpExchange = httpExchange;
}

public void close() throws IOException {
}

public void responseReceived(final HttpResponse response) {
synchronized (this.httpExchange) {
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine());
this.httpExchange.setResponse(response);
HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger();
if (responseTrigger != null && !responseTrigger.isCompleted()) {
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered");
responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange));
}
}
}

public void consumeContent(
final ContentDecoder decoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setOriginIOControl(ioctrl);
// Receive data from the origin
ByteBuffer buf = this.httpExchange.getOutBuffer();
int n = decoder.read(buf);
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read");
if (decoder.isCompleted()) {
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read");
}
// If the buffer is full, suspend origin input until there is free
// space in the buffer
if (!buf.hasRemaining()) {
ioctrl.suspendInput();
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input");
}
// If there is some content in the input buffer make sure client
// output is active
if (buf.position() > 0) {
if (this.httpExchange.getClientIOControl() != null) {
this.httpExchange.getClientIOControl().requestOutput();
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output");
}
}
}
}

public void responseCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
if (this.completed) {
return;
}
this.completed = true;
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed");
this.httpExchange.setResponseReceived();
if (this.httpExchange.getClientIOControl() != null) {
this.httpExchange.getClientIOControl().requestOutput();
System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output");
}
}
}

public void failed(final Exception ex) {
synchronized (this.httpExchange) {
if (this.completed) {
return;
}
this.completed = true;
this.httpExchange.setException(ex);
HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger();
if (responseTrigger != null && !responseTrigger.isCompleted()) {
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex);
int status = HttpStatus.SC_INTERNAL_SERVER_ERROR;
HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status,
EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US));
String message = ex.getMessage();
if (message == null) {
message = "Unexpected error";
}
response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT));
responseTrigger.submitResponse(new BasicAsyncResponseProducer(response));
}
}
}

public boolean cancel() {
synchronized (this.httpExchange) {
if (this.completed) {
return false;
}
failed(new InterruptedIOException("Cancelled"));
return true;
}
}

public ProxyHttpExchange getResult() {
return this.httpExchange;
}

public Exception getException() {
return null;
}

public boolean isDone() {
return this.completed;
}

}

static class ProxyResponseProducer implements HttpAsyncResponseProducer {

private final ProxyHttpExchange httpExchange;

public ProxyResponseProducer(final ProxyHttpExchange httpExchange) {
super();
this.httpExchange = httpExchange;
}

public void close() throws IOException {
this.httpExchange.reset();
}

public HttpResponse generateResponse() {
synchronized (this.httpExchange) {
HttpResponse response = this.httpExchange.getResponse();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine());
// Rewrite response!!!!
BasicHttpResponse r = new BasicHttpResponse(response.getStatusLine());
r.setEntity(response.getEntity());
return r;
}
}

public void produceContent(
final ContentEncoder encoder, final IOControl ioctrl) throws IOException {
synchronized (this.httpExchange) {
this.httpExchange.setClientIOControl(ioctrl);
// Send data to the client
ByteBuffer buf = this.httpExchange.getOutBuffer();
buf.flip();
int n = encoder.write(buf);
buf.compact();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written");
// If there is space in the buffer and the message has not been
// transferred, make sure the origin is sending more data
if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) {
if (this.httpExchange.getOriginIOControl() != null) {
this.httpExchange.getOriginIOControl().requestInput();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " request origin input");
}
}
if (buf.position() == 0) {
if (this.httpExchange.isResponseReceived()) {
encoder.complete();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written");
} else {
// Input buffer is empty. Wait until the origin fills up
// the buffer
ioctrl.suspendOutput();
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " suspend client output");
}
}
}
}

public void responseCompleted(final HttpContext context) {
synchronized (this.httpExchange) {
System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response completed");
}
}

public void failed(final Exception ex) {
System.out.println("[client<-proxy] " + ex.toString());
}

}

static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {

@Override
public boolean keepAlive(final HttpResponse response, final HttpContext context) {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
boolean keepAlive = super.keepAlive(response, context);
if (keepAlive) {
System.out.println("[client->proxy] connection kept alive " + conn);
}
return keepAlive;
}

};

static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy {

@Override
public boolean keepAlive(final HttpResponse response, final HttpContext context) {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
boolean keepAlive = super.keepAlive(response, context);
if (keepAlive) {
System.out.println("[proxy->origin] connection kept alive " + conn);
}
return keepAlive;
}

};

static class ProxyServiceHandler extends HttpAsyncService {

public ProxyServiceHandler(
final HttpProcessor httpProcessor,
final ConnectionReuseStrategy reuseStrategy,
final HttpAsyncRequestHandlerResolver handlerResolver,
final HttpParams params) {
super(httpProcessor, reuseStrategy, handlerResolver, params);
}

@Override
protected void log(final Exception ex) {
ex.printStackTrace();
}

@Override
public void connected(final NHttpServerConnection conn) {
System.out.println("[client->proxy] connection open " + conn);
super.connected(conn);
}

@Override
public void closed(final NHttpServerConnection conn) {
System.out.println("[client->proxy] connection closed " + conn);
super.closed(conn);
}

}

static class ProxyClientProtocolHandler extends HttpAsyncRequestExecutor {

public ProxyClientProtocolHandler() {
super();
}

@Override
protected void log(final Exception ex) {
ex.printStackTrace();
}

@Override
public void connected(final NHttpClientConnection conn,
final Object attachment) throws IOException, HttpException {
System.out.println("[proxy->origin] connection open " + conn);
super.connected(conn, attachment);
}

@Override
public void closed(final NHttpClientConnection conn) {
System.out.println("[proxy->origin] connection closed " + conn);
super.closed(conn);
}

}

static class ProxyConnPool extends BasicNIOConnPool {

public ProxyConnPool(final ConnectingIOReactor ioreactor, final HttpParams params) {
super(ioreactor, params);
}

public ProxyConnPool(
final ConnectingIOReactor ioreactor,
final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory,
final HttpParams params) {
super(ioreactor, connFactory, params);
}

@Override
public void release(final BasicNIOPoolEntry entry, boolean reusable) {
System.out.println("[proxy->origin] connection released " + entry.getConnection());
super.release(entry, reusable);
StringBuilder buf = new StringBuilder();
PoolStats totals = getTotalStats();
buf.append("[total kept alive: ").append(totals.getAvailable()).append("; ");
buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable());
buf.append(" of ").append(totals.getMax()).append("]");
System.out.println("[proxy->origin] " + buf.toString());
}

}

}


本篇博文的版权信息如下:





Tag:本文所有源代码来源均取自The Apache™ Software Foundation Organization (http://hc.apache.org/httpcomponents-core-ga/examples.html)

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐