HttpCore组件案例程序(Java描述) (Http Components-- HttpCore Examples)
2012-08-13 08:33
507 查看
一、基本概念
HttpCore是一套实现了HTTP协议最基础方面的组件,尽管HTTP协议在使用最小占用来开发全功能的客户端和服务器的HTTP服务是足够的。
HttpCore有如下的范围和目标(@Author:南磊):
1. HttpCore范围
■构建客户端/代理/服务器端HTTP服务一致的API
■构建同步和异步HTTP服务一致的API
■基于阻塞(经典的)和非阻塞(NIO)I/O模型的一套低等级组件
2. HttpCore目标
■实现最基本的HTTP传输方面
■良好性能和清晰度&表现力之间的平衡
■小的(预测)内存占用
■自我包含的类库(没有超越JRE的额外依赖)
3. 什么是HttpCore不能做的
■HttpClient的替代
■Servlet容器或Servlet API竞争对手的替代
二、源代码详解
1.基本的Http获取程序(基于同步或加锁的I/O模型)
This example demonstrates how to execute a series of synchronous (blocking) HTTP GET requests./* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */ package org.apache.http.examples; import java.net.Socket; import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpHost; import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; import org.apache.http.HttpVersion; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.DefaultHttpClientConnection; import org.apache.http.message.BasicHttpRequest; import org.apache.http.params.HttpParams; import org.apache.http.params.HttpProtocolParams; import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.ExecutionContext; import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.HttpRequestExecutor; import org.apache.http.protocol.ImmutableHttpProcessor; import org.apache.http.protocol.RequestConnControl; import org.apache.http.protocol.RequestContent; import org.apache.http.protocol.RequestExpectContinue; import org.apache.http.protocol.RequestTargetHost; import org.apache.http.protocol.RequestUserAgent; import org.apache.http.util.EntityUtils; /** * Elemental example for executing multiple GET requests sequentially. * <p> * Please note the purpose of this application is demonstrate the usage of HttpCore APIs. * It is NOT intended to demonstrate the most efficient way of building an HTTP client. */ public class ElementalHttpGet { public static void main(String[] args) throws Exception { HttpParams params = new SyncBasicHttpParams(); HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1); HttpProtocolParams.setContentCharset(params, "UTF-8"); HttpProtocolParams.setUserAgent(params, "HttpComponents/1.1"); HttpProtocolParams.setUseExpectContinue(params, true); HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] { // Required protocol interceptors new RequestContent(), new RequestTargetHost(), // Recommended protocol interceptors new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue()}); HttpRequestExecutor httpexecutor = new HttpRequestExecutor(); HttpContext context = new BasicHttpContext(null); HttpHost host = new HttpHost("localhost", 8080); DefaultHttpClientConnection conn = new DefaultHttpClientConnection(); ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy(); context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn); context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host); try { String[] targets = { "/", "/servlets-examples/servlet/RequestInfoExample", "/somewhere%20in%20pampa"}; for (int i = 0; i < targets.length; i++) { if (!conn.isOpen()) { Socket socket = new Socket(host.getHostName(), host.getPort()); conn.bind(socket, params); } BasicHttpRequest request = new BasicHttpRequest("GET", targets[i]); System.out.println(">> Request URI: " + request.getRequestLine().getUri()); request.setParams(params); httpexecutor.preProcess(request, httpproc, context); HttpResponse response = httpexecutor.execute(request, conn, context); response.setParams(params); httpexecutor.postProcess(response, httpproc, context); System.out.println("<< Response: " + response.getStatusLine()); System.out.println(EntityUtils.toString(response.getEntity())); System.out.println("=============="); if (!connStrategy.keepAlive(response, context)) { conn.close(); } else { System.out.println("Connection kept alive..."); } } } finally { conn.close(); } } }
2.基本的Http发送程序(基于同步或加锁的I/O模型)
This example demonstrates how to execute a series of synchronous (blocking) HTTP POST requests that enclose entity content of various types: a string, a byte array, an arbitrary inputstream.
/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */ package org.apache.http.examples; import java.io.ByteArrayInputStream; import java.net.Socket; import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; import org.apache.http.HttpVersion; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.InputStreamEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.DefaultHttpClientConnection; import org.apache.http.message.BasicHttpEntityEnclosingRequest; import org.apache.http.params.HttpParams; import org.apache.http.params.HttpProtocolParams; import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.ExecutionContext; import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.HttpRequestExecutor; import org.apache.http.protocol.ImmutableHttpProcessor; import org.apache.http.protocol.RequestConnControl; import org.apache.http.protocol.RequestContent; import org.apache.http.protocol.RequestExpectContinue; import org.apache.http.protocol.RequestTargetHost; import org.apache.http.protocol.RequestUserAgent; import org.apache.http.util.EntityUtils; /** * Elemental example for executing multiple POST requests sequentially. * <p> * Please note the purpose of this application is demonstrate the usage of HttpCore APIs. * It is NOT intended to demonstrate the most efficient way of building an HTTP client. */ public class ElementalHttpPost { public static void main(String[] args) throws Exception { HttpParams params = new SyncBasicHttpParams(); HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1); HttpProtocolParams.setContentCharset(params, "UTF-8"); HttpProtocolParams.setUserAgent(params, "Test/1.1"); HttpProtocolParams.setUseExpectContinue(params, true); HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] { // Required protocol interceptors new RequestContent(), new RequestTargetHost(), // Recommended protocol interceptors new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue()}); HttpRequestExecutor httpexecutor = new HttpRequestExecutor(); HttpContext context = new BasicHttpContext(null); HttpHost host = new HttpHost("localhost", 8080); DefaultHttpClientConnection conn = new DefaultHttpClientConnection(); ConnectionReuseStrategy connStrategy = new DefaultConnectionReuseStrategy(); context.setAttribute(ExecutionContext.HTTP_CONNECTION, conn); context.setAttribute(ExecutionContext.HTTP_TARGET_HOST, host); try { HttpEntity[] requestBodies = { new StringEntity( "This is the first test request", "UTF-8"), new ByteArrayEntity( "This is the second test request".getBytes("UTF-8")), new InputStreamEntity( new ByteArrayInputStream( "This is the third test request (will be chunked)" .getBytes("UTF-8")), -1) }; for (int i = 0; i < requestBodies.length; i++) { if (!conn.isOpen()) { Socket socket = new Socket(host.getHostName(), host.getPort()); conn.bind(socket, params); } BasicHttpEntityEnclosingRequest request = new BasicHttpEntityEnclosingRequest("POST", "/servlets-examples/servlet/RequestInfoExample"); request.setEntity(requestBodies[i]); System.out.println(">> Request URI: " + request.getRequestLine().getUri()); request.setParams(params); httpexecutor.preProcess(request, httpproc, context); HttpResponse response = httpexecutor.execute(request, conn, context); response.setParams(params); httpexecutor.postProcess(response, httpproc, context); System.out.println("<< Response: " + response.getStatusLine()); System.out.println(EntityUtils.toString(response.getEntity())); System.out.println("=============="); if (!connStrategy.keepAlive(response, context)) { conn.close(); } else { System.out.println("Connection kept alive..."); } } } finally { conn.close(); } } }
3.基本的Http服务器程序(基于同步或加锁的I/O模型)
This is an example of an HTTP/1.1 file server based on a synchronous (blocking) I/O model.
/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */ package org.apache.http.examples; import java.io.File; import java.io.IOException; import java.io.InterruptedIOException; import java.net.ServerSocket; import java.net.Socket; import java.net.URLDecoder; import java.nio.charset.Charset; import java.util.Locale; import org.apache.http.ConnectionClosedException; import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpException; import org.apache.http.HttpRequest; import org.apache.http.HttpResponse; import org.apache.http.HttpResponseInterceptor; import org.apache.http.HttpServerConnection; import org.apache.http.HttpStatus; import org.apache.http.MethodNotSupportedException; import org.apache.http.entity.ContentType; import org.apache.http.entity.FileEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.DefaultHttpResponseFactory; import org.apache.http.impl.DefaultHttpServerConnection; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.params.HttpParams; import org.apache.http.params.CoreProtocolPNames; import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.HttpRequestHandler; import org.apache.http.protocol.HttpRequestHandlerRegistry; import org.apache.http.protocol.HttpService; import org.apache.http.protocol.ImmutableHttpProcessor; import org.apache.http.protocol.ResponseConnControl; import org.apache.http.protocol.ResponseContent; import org.apache.http.protocol.ResponseDate; import org.apache.http.protocol.ResponseServer; import org.apache.http.util.EntityUtils; /** * Basic, yet fully functional and spec compliant, HTTP/1.1 file server. * <p> * Please note the purpose of this application is demonstrate the usage of HttpCore APIs. * It is NOT intended to demonstrate the most efficient way of building an HTTP file server. * * */ public class ElementalHttpServer { public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Please specify document root directory"); System.exit(1); } Thread t = new RequestListenerThread(8080, args[0]); t.setDaemon(false); t.start(); } static class HttpFileHandler implements HttpRequestHandler { private final String docRoot; public HttpFileHandler(final String docRoot) { super(); this.docRoot = docRoot; } public void handle( final HttpRequest request, final HttpResponse response, final HttpContext context) throws HttpException, IOException { String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH); if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) { throw new MethodNotSupportedException(method + " method not supported"); } String target = request.getRequestLine().getUri(); if (request instanceof HttpEntityEnclosingRequest) { HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); byte[] entityContent = EntityUtils.toByteArray(entity); System.out.println("Incoming entity content (bytes): " + entityContent.length); } final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8")); if (!file.exists()) { response.setStatusCode(HttpStatus.SC_NOT_FOUND); StringEntity entity = new StringEntity( "<html><body><h1>File" + file.getPath() + " not found</h1></body></html>", ContentType.create("text/html", "UTF-8")); response.setEntity(entity); System.out.println("File " + file.getPath() + " not found"); } else if (!file.canRead() || file.isDirectory()) { response.setStatusCode(HttpStatus.SC_FORBIDDEN); StringEntity entity = new StringEntity( "<html><body><h1>Access denied</h1></body></html>", ContentType.create("text/html", "UTF-8")); response.setEntity(entity); System.out.println("Cannot read file " + file.getPath()); } else { response.setStatusCode(HttpStatus.SC_OK); FileEntity body = new FileEntity(file, ContentType.create("text/html", (Charset) null)); response.setEntity(body); System.out.println("Serving file " + file.getPath()); } } } static class RequestListenerThread extends Thread { private final ServerSocket serversocket; private final HttpParams params; private final HttpService httpService; public RequestListenerThread(int port, final String docroot) throws IOException { this.serversocket = new ServerSocket(port); this.params = new SyncBasicHttpParams(); this.params .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, false) .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpComponents/1.1"); // Set up the HTTP protocol processor HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] { new ResponseDate(), new ResponseServer(), new ResponseContent(), new ResponseConnControl() }); // Set up request handlers HttpRequestHandlerRegistry reqistry = new HttpRequestHandlerRegistry(); reqistry.register("*", new HttpFileHandler(docroot)); // Set up the HTTP service this.httpService = new HttpService( httpproc, new DefaultConnectionReuseStrategy(), new DefaultHttpResponseFactory(), reqistry, this.params); } @Override public void run() { System.out.println("Listening on port " + this.serversocket.getLocalPort()); while (!Thread.interrupted()) { try { // Set up HTTP connection Socket socket = this.serversocket.accept(); DefaultHttpServerConnection conn = new DefaultHttpServerConnection(); System.out.println("Incoming connection from " + socket.getInetAddress()); conn.bind(socket, this.params); // Start worker thread Thread t = new WorkerThread(this.httpService, conn); t.setDaemon(true); t.start(); } catch (InterruptedIOException ex) { break; } catch (IOException e) { System.err.println("I/O error initialising connection thread: " + e.getMessage()); break; } } } } static class WorkerThread extends Thread { private final HttpService httpservice; private final HttpServerConnection conn; public WorkerThread( final HttpService httpservice, final HttpServerConnection conn) { super(); this.httpservice = httpservice; this.conn = conn; } @Override public void run() { System.out.println("New connection thread"); HttpContext context = new BasicHttpContext(null); try { while (!Thread.interrupted() && this.conn.isOpen()) { this.httpservice.handleRequest(this.conn, context); } } catch (ConnectionClosedException ex) { System.err.println("Client closed connection"); } catch (IOException ex) { System.err.println("I/O error: " + ex.getMessage()); } catch (HttpException ex) { System.err.println("Unrecoverable HTTP protocol violation: " + ex.getMessage()); } finally { try { this.conn.shutdown(); } catch (IOException ignore) {} } } } }
4.异步的Http获取程序(基于非阻塞I/O模型:NIO框架)
This example demonstrates how HttpCore NIO can be used to execute multiple HTTP requests asynchronously using only one
I/O thread.
注释:
I. I/O反应器
HttpCore NIO是基于由Doug Lea定义描述的反应器模式的。I/O反应器的目标是反应I/O事件然后分发事件通知到独立的I/O会话中。I/O反应器模式的主要思想是从每次连接模型摆脱一个由经典阻塞I/O模型强加的线程。IOReactor接口代表了反应器模式抽象的对象实现。在其内部,IOReactor实现封装了了NIO java.nio.channels.Selector的功能。
I/O反应器通常使用一小部分分发线程(通常是一个)来分发I/O事件通知到一个很大数量(通常是好几千)的I/O会话或连接。通常建议每个CPU核心只有一个分发线程。
II.I/O分发器
IOReactor实现使用了IOEventDispatch接口来通知事件客户端为特定的会话挂起。IOEventDispatch的所有方法在I/O反应器的分发线程上执行。因此,在事件方法上的处理不会阻塞分发线程很长时间,这一点是很重要的,而且I/O反应器也不可能对其它事件做出反应。
由IOEventDispatch接口定义的通用I/O事件:
connected:当一个新的会话创建时触发。
inputReady:当新的会话等待输入时触发。
outputReady:当会话准备输出时触发。
timeout:当会话超时时触发。
disconnected:当会话终止时触发。
/* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */ package org.apache.http.examples.nio; import java.io.IOException; import java.io.InterruptedIOException; import java.util.concurrent.CountDownLatch; import org.apache.http.HttpHost; import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; import org.apache.http.concurrent.FutureCallback; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.nio.DefaultHttpClientIODispatch; import org.apache.http.impl.nio.pool.BasicNIOConnPool; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.message.BasicHttpRequest; import org.apache.http.nio.protocol.BasicAsyncRequestProducer; import org.apache.http.nio.protocol.BasicAsyncResponseConsumer; import org.apache.http.nio.protocol.HttpAsyncRequestExecutor; import org.apache.http.nio.protocol.HttpAsyncRequester; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOEventDispatch; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.params.CoreProtocolPNames; import org.apache.http.params.HttpParams; import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.protocol.BasicHttpContext; import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.ImmutableHttpProcessor; import org.apache.http.protocol.RequestConnControl; import org.apache.http.protocol.RequestContent; import org.apache.http.protocol.RequestExpectContinue; import org.apache.http.protocol.RequestTargetHost; import org.apache.http.protocol.RequestUserAgent; /** * Asynchronous HTTP/1.1 client. */ public class NHttpClient { public static void main(String[] args) throws Exception { // HTTP parameters for the client HttpParams params = new SyncBasicHttpParams(); params .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 3000) .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 3000) .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) .setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1"); // Create HTTP protocol processing chain HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpRequestInterceptor[] { // Use standard client-side protocol interceptors new RequestContent(), new RequestTargetHost(), new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue()}); // Create client-side HTTP protocol handler HttpAsyncRequestExecutor protocolHandler = new HttpAsyncRequestExecutor(); // Create client-side I/O event dispatch final IOEventDispatch ioEventDispatch = new DefaultHttpClientIODispatch(protocolHandler, params); // Create client-side I/O reactor IOReactorConfig config = new IOReactorConfig(); config.setIoThreadCount(1); final ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(config); // Create HTTP connection pool BasicNIOConnPool pool = new BasicNIOConnPool(ioReactor, params); // Limit total number of connections to just two pool.setDefaultMaxPerRoute(2); pool.setMaxTotal(2); // Run the I/O reactor in a separate thread Thread t = new Thread(new Runnable() { public void run() { try { // Ready to go! ioReactor.execute(ioEventDispatch); } catch (InterruptedIOException ex) { System.err.println("Interrupted"); } catch (IOException e) { System.err.println("I/O error: " + e.getMessage()); } System.out.println("Shutdown"); } }); // Start the client thread t.start(); // Create HTTP requester HttpAsyncRequester requester = new HttpAsyncRequester( httpproc, new DefaultConnectionReuseStrategy(), params); // Execute HTTP GETs to the following hosts and HttpHost[] targets = new HttpHost[] { new HttpHost("www.apache.org", 80, "http"), new HttpHost("www.verisign.com", 443, "https"), new HttpHost("www.google.com", 80, "http") }; final CountDownLatch latch = new CountDownLatch(targets.length); for (final HttpHost target: targets) { BasicHttpRequest request = new BasicHttpRequest("GET", "/"); requester.execute( new BasicAsyncRequestProducer(target, request), new BasicAsyncResponseConsumer(), pool, new BasicHttpContext(), // Handle HTTP response from a callback new FutureCallback<HttpResponse>() { public void completed(final HttpResponse response) { latch.countDown(); System.out.println(target + "->" + response.getStatusLine()); } public void failed(final Exception ex) { latch.countDown(); System.out.println(target + "->" + ex); } public void cancelled() { latch.countDown(); System.out.println(target + " cancelled"); } }); } latch.await(); System.out.println("Shutting down I/O reactor"); ioReactor.shutdown(); System.out.println("Done"); } }
5.异步的Http服务器程序(基于非阻塞I/O模型:NIO框架)
This example demonstrates the use of HttpCore NIO to build an asynchronous (non-blocking) HTTP server capable of direct channel
(zero copy) data transfer.
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.http.examples.nio;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLDecoder;
import java.security.KeyStore;
import java.util.Locale;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.HttpStatus;
import org.apache.http.MethodNotSupportedException;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultNHttpServerConnection;
import org.apache.http.impl.nio.DefaultNHttpServerConnectionFactory;
import org.apache.http.impl.nio.DefaultHttpServerIODispatch;
import org.apache.http.impl.nio.SSLNHttpServerConnectionFactory;
import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.NHttpConnectionFactory;
import org.apache.http.nio.NHttpServerConnection;
import org.apache.http.nio.entity.NFileEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.nio.protocol.BasicAsyncRequestConsumer;
import org.apache.http.nio.protocol.BasicAsyncResponseProducer;
import org.apache.http.nio.protocol.HttpAsyncRequestConsumer;
import org.apache.http.nio.protocol.HttpAsyncRequestHandler;
import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry;
import org.apache.http.nio.protocol.HttpAsyncExchange;
import org.apache.http.nio.protocol.HttpAsyncService;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.ListeningIOReactor;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.CoreProtocolPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.params.SyncBasicHttpParams;
import org.apache.http.protocol.ExecutionContext;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.HttpProcessor;
import org.apache.http.protocol.ImmutableHttpProcessor;
import org.apache.http.protocol.ResponseConnControl;
import org.apache.http.protocol.ResponseContent;
import org.apache.http.protocol.ResponseDate;
import org.apache.http.protocol.ResponseServer;
/**
* HTTP/1.1 file server based on the non-blocking I/O model and capable of direct channel
* (zero copy) data transfer.
*/
public class NHttpServer {
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Please specify document root directory");
System.exit(1);
}
// Document root directory
File docRoot = new File(args[0]);
int port = 8080;
if (args.length >= 2) {
port = Integer.parseInt(args[1]);
}
// HTTP parameters for the server
HttpParams params = new SyncBasicHttpParams();
params
.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true)
.setParameter(CoreProtocolPNames.ORIGIN_SERVER, "HttpTest/1.1");
// Create HTTP protocol processing chain
HttpProcessor httpproc = new ImmutableHttpProcessor(new HttpResponseInterceptor[] {
// Use standard server-side protocol interceptors
new ResponseDate(),
new ResponseServer(),
new ResponseContent(),
new ResponseConnControl()
});
// Create request handler registry
HttpAsyncRequestHandlerRegistry reqistry = new HttpAsyncRequestHandlerRegistry();
// Register the default handler for all URIs
reqistry.register("*", new HttpFileHandler(docRoot));
// Create server-side HTTP protocol handler
HttpAsyncService protocolHandler = new HttpAsyncService(
httpproc, new DefaultConnectionReuseStrategy(), reqistry, params) {
@Override
public void connected(final NHttpServerConnection conn) {
System.out.println(conn + ": connection open");
super.connected(conn);
}
@Override
public void closed(final NHttpServerConnection conn) {
System.out.println(conn + ": connection closed");
super.closed(conn);
}
};
// Create HTTP connection factory
NHttpConnectionFactory<DefaultNHttpServerConnection> connFactory;
if (port == 8443) {
// Initialize SSL context
ClassLoader cl = NHttpServer.class.getClassLoader();
URL url = cl.getResource("my.keystore");
if (url == null) {
System.out.println("Keystore not found");
System.exit(1);
}
KeyStore keystore = KeyStore.getInstance("jks");
keystore.load(url.openStream(), "secret".toCharArray());
KeyManagerFactory kmfactory = KeyManagerFactory.getInstance(
KeyManagerFactory.getDefaultAlgorithm());
kmfactory.init(keystore, "secret".toCharArray());
KeyManager[] keymanagers = kmfactory.getKeyManagers();
SSLContext sslcontext = SSLContext.getInstance("TLS");
sslcontext.init(keymanagers, null, null);
connFactory = new SSLNHttpServerConnectionFactory(sslcontext, null, params);
} else {
connFactory = new DefaultNHttpServerConnectionFactory(params);
}
// Create server-side I/O event dispatch
IOEventDispatch ioEventDispatch = new DefaultHttpServerIODispatch(protocolHandler, connFactory);
// Create server-side I/O reactor
ListeningIOReactor ioReactor = new DefaultListeningIOReactor();
try {
// Listen of the given port
ioReactor.listen(new InetSocketAddress(port));
// Ready to go!
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
System.err.println("Interrupted");
} catch (IOException e) {
System.err.println("I/O error: " + e.getMessage());
}
System.out.println("Shutdown");
}
static class HttpFileHandler implements HttpAsyncRequestHandler<HttpRequest> {
private final File docRoot;
public HttpFileHandler(final File docRoot) {
super();
this.docRoot = docRoot;
}
public HttpAsyncRequestConsumer<HttpRequest> processRequest(
final HttpRequest request,
final HttpContext context) {
// Buffer request content in memory for simplicity
return new BasicAsyncRequestConsumer();
}
public void handle(
final HttpRequest request,
final HttpAsyncExchange httpexchange,
final HttpContext context) throws HttpException, IOException {
HttpResponse response = httpexchange.getResponse();
handleInternal(request, response, context);
httpexchange.submitResponse(new BasicAsyncResponseProducer(response));
}
private void handleInternal(
final HttpRequest request,
final HttpResponse response,
final HttpContext context) throws HttpException, IOException {
String method = request.getRequestLine().getMethod().toUpperCase(Locale.ENGLISH);
if (!method.equals("GET") && !method.equals("HEAD") && !method.equals("POST")) {
throw new MethodNotSupportedException(method + " method not supported");
}
String target = request.getRequestLine().getUri();
final File file = new File(this.docRoot, URLDecoder.decode(target, "UTF-8"));
if (!file.exists()) {
response.setStatusCode(HttpStatus.SC_NOT_FOUND);
NStringEntity entity = new NStringEntity(
"<html><body><h1>File" + file.getPath() +
" not found</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("File " + file.getPath() + " not found");
} else if (!file.canRead() || file.isDirectory()) {
response.setStatusCode(HttpStatus.SC_FORBIDDEN);
NStringEntity entity = new NStringEntity(
"<html><body><h1>Access denied</h1></body></html>",
ContentType.create("text/html", "UTF-8"));
response.setEntity(entity);
System.out.println("Cannot read file " + file.getPath());
} else {
NHttpConnection conn = (NHttpConnection) context.getAttribute(
ExecutionContext.HTTP_CONNECTION);
response.setStatusCode(HttpStatus.SC_OK);
NFileEntity body = new NFileEntity(file, ContentType.create("text/html"));
response.setEntity(body);
System.out.println(conn + ": serving file " + file.getPath());
}
}
}
}
6.异步的Http逆向代理程序(基于非阻塞I/O模型:NIO框架)
This example demonstrates how HttpCore NIO can be used to build an asynchronous, fully streaming reverse HTTP proxy./* * ==================================================================== * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * ==================================================================== * * This software consists of voluntary contributions made by many * individuals on behalf of the Apache Software Foundation. For more * information on the Apache Software Foundation, please see * <http://www.apache.org/>. * */ package org.apache.http.examples.nio; import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.Locale; import java.util.concurrent.atomic.AtomicLong; import org.apache.http.ConnectionReuseStrategy; import org.apache.http.HttpEntityEnclosingRequest; import org.apache.http.HttpException; import org.apache.http.HttpHost; import org.apache.http.HttpRequest; import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; import org.apache.http.HttpResponseInterceptor; import org.apache.http.HttpStatus; import org.apache.http.HttpVersion; import org.apache.http.entity.ContentType; import org.apache.http.impl.DefaultConnectionReuseStrategy; import org.apache.http.impl.EnglishReasonPhraseCatalog; import org.apache.http.impl.nio.DefaultHttpClientIODispatch; import org.apache.http.impl.nio.DefaultHttpServerIODispatch; import org.apache.http.impl.nio.pool.BasicNIOConnPool; import org.apache.http.impl.nio.pool.BasicNIOPoolEntry; import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor; import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.message.BasicHttpEntityEnclosingRequest; import org.apache.http.message.BasicHttpRequest; import org.apache.http.message.BasicHttpResponse; import org.apache.http.nio.ContentDecoder; import org.apache.http.nio.ContentEncoder; import org.apache.http.nio.IOControl; import org.apache.http.nio.NHttpClientConnection; import org.apache.http.nio.NHttpConnection; import org.apache.http.nio.NHttpServerConnection; import org.apache.http.nio.entity.NStringEntity; import org.apache.http.nio.pool.NIOConnFactory; import org.apache.http.nio.protocol.BasicAsyncResponseProducer; import org.apache.http.nio.protocol.HttpAsyncRequestExecutor; import org.apache.http.nio.protocol.HttpAsyncRequestConsumer; import org.apache.http.nio.protocol.HttpAsyncRequester; import org.apache.http.nio.protocol.HttpAsyncRequestHandler; import org.apache.http.nio.protocol.HttpAsyncRequestHandlerRegistry; import org.apache.http.nio.protocol.HttpAsyncRequestHandlerResolver; import org.apache.http.nio.protocol.HttpAsyncRequestProducer; import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.apache.http.nio.protocol.HttpAsyncResponseProducer; import org.apache.http.nio.protocol.HttpAsyncExchange; import org.apache.http.nio.protocol.HttpAsyncService; import org.apache.http.nio.reactor.ConnectingIOReactor; import org.apache.http.nio.reactor.IOEventDispatch; import org.apache.http.nio.reactor.ListeningIOReactor; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.params.CoreProtocolPNames; import org.apache.http.params.HttpParams; import org.apache.http.params.SyncBasicHttpParams; import org.apache.http.pool.PoolStats; import org.apache.http.protocol.ExecutionContext; import org.apache.http.protocol.HttpContext; import org.apache.http.protocol.HttpProcessor; import org.apache.http.protocol.ImmutableHttpProcessor; import org.apache.http.protocol.RequestConnControl; import org.apache.http.protocol.RequestContent; import org.apache.http.protocol.RequestExpectContinue; import org.apache.http.protocol.RequestTargetHost; import org.apache.http.protocol.RequestUserAgent; import org.apache.http.protocol.ResponseConnControl; import org.apache.http.protocol.ResponseContent; import org.apache.http.protocol.ResponseDate; import org.apache.http.protocol.ResponseServer; /** * Asynchronous, fully streaming HTTP/1.1 reverse proxy. */ public class NHttpReverseProxy { public static void main(String[] args) throws Exception { if (args.length < 1) { System.out.println("Usage: NHttpReverseProxy <hostname> [port]"); System.exit(1); } URI uri = new URI(args[0]); int port = 8080; if (args.length > 1) { port = Integer.parseInt(args[1]); } // Target host HttpHost targetHost = new HttpHost( uri.getHost(), uri.getPort() > 0 ? uri.getPort() : 80, uri.getScheme() != null ? uri.getScheme() : "http"); System.out.println("Reverse proxy to " + targetHost); HttpParams params = new SyncBasicHttpParams(); params .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 30000) .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true) .setParameter(CoreProtocolPNames.ORIGIN_SERVER, "Test/1.1") .setParameter(CoreProtocolPNames.USER_AGENT, "Test/1.1"); IOReactorConfig config = new IOReactorConfig(); config.setIoThreadCount(1); final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(config); final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(config); // Set up HTTP protocol processor for incoming connections HttpProcessor inhttpproc = new ImmutableHttpProcessor( new HttpResponseInterceptor[] { new ResponseDate(), new ResponseServer(), new ResponseContent(), new ResponseConnControl() }); // Set up HTTP protocol processor for outgoing connections HttpProcessor outhttpproc = new ImmutableHttpProcessor( new HttpRequestInterceptor[] { new RequestContent(), new RequestTargetHost(), new RequestConnControl(), new RequestUserAgent(), new RequestExpectContinue() }); ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler(); HttpAsyncRequester executor = new HttpAsyncRequester( outhttpproc, new ProxyOutgoingConnectionReuseStrategy(), params); ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, params); connPool.setMaxTotal(100); connPool.setDefaultMaxPerRoute(20); HttpAsyncRequestHandlerRegistry handlerRegistry = new HttpAsyncRequestHandlerRegistry(); handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool)); ProxyServiceHandler serviceHandler = new ProxyServiceHandler( inhttpproc, new ProxyIncomingConnectionReuseStrategy(), handlerRegistry, params); final IOEventDispatch connectingEventDispatch = new DefaultHttpClientIODispatch( clientHandler, params); final IOEventDispatch listeningEventDispatch = new DefaultHttpServerIODispatch( serviceHandler, params); Thread t = new Thread(new Runnable() { public void run() { try { connectingIOReactor.execute(connectingEventDispatch); } catch (InterruptedIOException ex) { System.err.println("Interrupted"); } catch (IOException ex) { ex.printStackTrace(); } finally { try { listeningIOReactor.shutdown(); } catch (IOException ex2) { ex2.printStackTrace(); } } } }); t.start(); try { listeningIOReactor.listen(new InetSocketAddress(port)); listeningIOReactor.execute(listeningEventDispatch); } catch (InterruptedIOException ex) { System.err.println("Interrupted"); } catch (IOException ex) { ex.printStackTrace(); } finally { try { connectingIOReactor.shutdown(); } catch (IOException ex2) { ex2.printStackTrace(); } } } static class ProxyHttpExchange { private final ByteBuffer inBuffer; private final ByteBuffer outBuffer; private volatile String id; private volatile HttpHost target; private volatile HttpAsyncExchange responseTrigger; private volatile IOControl originIOControl; private volatile IOControl clientIOControl; private volatile HttpRequest request; private volatile boolean requestReceived; private volatile HttpResponse response; private volatile boolean responseReceived; private volatile Exception ex; public ProxyHttpExchange() { super(); this.inBuffer = ByteBuffer.allocateDirect(10240); this.outBuffer = ByteBuffer.allocateDirect(10240); } public ByteBuffer getInBuffer() { return this.inBuffer; } public ByteBuffer getOutBuffer() { return this.outBuffer; } public String getId() { return this.id; } public void setId(final String id) { this.id = id; } public HttpHost getTarget() { return this.target; } public void setTarget(final HttpHost target) { this.target = target; } public HttpRequest getRequest() { return this.request; } public void setRequest(final HttpRequest request) { this.request = request; } public HttpResponse getResponse() { return this.response; } public void setResponse(final HttpResponse response) { this.response = response; } public HttpAsyncExchange getResponseTrigger() { return this.responseTrigger; } public void setResponseTrigger(final HttpAsyncExchange responseTrigger) { this.responseTrigger = responseTrigger; } public IOControl getClientIOControl() { return this.clientIOControl; } public void setClientIOControl(final IOControl clientIOControl) { this.clientIOControl = clientIOControl; } public IOControl getOriginIOControl() { return this.originIOControl; } public void setOriginIOControl(final IOControl originIOControl) { this.originIOControl = originIOControl; } public boolean isRequestReceived() { return this.requestReceived; } public void setRequestReceived() { this.requestReceived = true; } public boolean isResponseReceived() { return this.responseReceived; } public void setResponseReceived() { this.responseReceived = true; } public Exception getException() { return this.ex; } public void setException(final Exception ex) { this.ex = ex; } public void reset() { this.inBuffer.clear(); this.outBuffer.clear(); this.target = null; this.id = null; this.responseTrigger = null; this.clientIOControl = null; this.originIOControl = null; this.request = null; this.requestReceived = false; this.response = null; this.responseReceived = false; this.ex = null; } } static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> { private final HttpHost target; private final HttpAsyncRequester executor; private final BasicNIOConnPool connPool; private final AtomicLong counter; public ProxyRequestHandler( final HttpHost target, final HttpAsyncRequester executor, final BasicNIOConnPool connPool) { super(); this.target = target; this.executor = executor; this.connPool = connPool; this.counter = new AtomicLong(1); } public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest( final HttpRequest request, final HttpContext context) { ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange"); if (httpExchange == null) { httpExchange = new ProxyHttpExchange(); context.setAttribute("http-exchange", httpExchange); } synchronized (httpExchange) { httpExchange.reset(); String id = String.format("%08X", this.counter.getAndIncrement()); httpExchange.setId(id); httpExchange.setTarget(this.target); return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool); } } public void handle( final ProxyHttpExchange httpExchange, final HttpAsyncExchange responseTrigger, final HttpContext context) throws HttpException, IOException { synchronized (httpExchange) { Exception ex = httpExchange.getException(); if (ex != null) { System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex); int status = HttpStatus.SC_INTERNAL_SERVER_ERROR; HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status, EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US)); String message = ex.getMessage(); if (message == null) { message = "Unexpected error"; } response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT)); responseTrigger.submitResponse(new BasicAsyncResponseProducer(response)); System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered"); } HttpResponse response = httpExchange.getResponse(); if (response != null) { responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange)); System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered"); } // No response yet. httpExchange.setResponseTrigger(responseTrigger); } } } static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> { private final ProxyHttpExchange httpExchange; private final HttpAsyncRequester executor; private final BasicNIOConnPool connPool; private volatile boolean completed; public ProxyRequestConsumer( final ProxyHttpExchange httpExchange, final HttpAsyncRequester executor, final BasicNIOConnPool connPool) { super(); this.httpExchange = httpExchange; this.executor = executor; this.connPool = connPool; } public void close() throws IOException { } public void requestReceived(final HttpRequest request) { synchronized (this.httpExchange) { System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine()); this.httpExchange.setRequest(request); this.executor.execute( new ProxyRequestProducer(this.httpExchange), new ProxyResponseConsumer(this.httpExchange), this.connPool); } } public void consumeContent( final ContentDecoder decoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setClientIOControl(ioctrl); // Receive data from the client ByteBuffer buf = this.httpExchange.getInBuffer(); int n = decoder.read(buf); System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read"); if (decoder.isCompleted()) { System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read"); } // If the buffer is full, suspend client input until there is free // space in the buffer if (!buf.hasRemaining()) { ioctrl.suspendInput(); System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input"); } // If there is some content in the input buffer make sure origin // output is active if (buf.position() > 0) { if (this.httpExchange.getOriginIOControl() != null) { this.httpExchange.getOriginIOControl().requestOutput(); System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); } } } } public void requestCompleted(final HttpContext context) { synchronized (this.httpExchange) { this.completed = true;; System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed"); this.httpExchange.setRequestReceived(); if (this.httpExchange.getOriginIOControl() != null) { this.httpExchange.getOriginIOControl().requestOutput(); System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); } } } public Exception getException() { return null; } public ProxyHttpExchange getResult() { return this.httpExchange; } public boolean isDone() { return this.completed; } public void failed(final Exception ex) { System.out.println("[client->proxy] " + ex.toString()); } } static class ProxyRequestProducer implements HttpAsyncRequestProducer { private final ProxyHttpExchange httpExchange; public ProxyRequestProducer(final ProxyHttpExchange httpExchange) { super(); this.httpExchange = httpExchange; } public void close() throws IOException { } public HttpHost getTarget() { synchronized (this.httpExchange) { return this.httpExchange.getTarget(); } } public HttpRequest generateRequest() { synchronized (this.httpExchange) { HttpRequest request = this.httpExchange.getRequest(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine()); // Rewrite request!!!! if (request instanceof HttpEntityEnclosingRequest) { BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest( request.getRequestLine()); r.setEntity(((HttpEntityEnclosingRequest) request).getEntity()); return r; } else { return new BasicHttpRequest(request.getRequestLine()); } } } public void produceContent( final ContentEncoder encoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setOriginIOControl(ioctrl); // Send data to the origin server ByteBuffer buf = this.httpExchange.getInBuffer(); buf.flip(); int n = encoder.write(buf); buf.compact(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written"); // If there is space in the buffer and the message has not been // transferred, make sure the client is sending more data if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) { if (this.httpExchange.getClientIOControl() != null) { this.httpExchange.getClientIOControl().requestInput(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request client input"); } } if (buf.position() == 0) { if (this.httpExchange.isRequestReceived()) { encoder.complete(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written"); } else { // Input buffer is empty. Wait until the client fills up // the buffer ioctrl.suspendOutput(); System.out.println("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output"); } } } } public void requestCompleted(final HttpContext context) { synchronized (this.httpExchange) { System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request completed"); } } public boolean isRepeatable() { return false; } public void resetRequest() { } public void failed(final Exception ex) { System.out.println("[proxy->origin] " + ex.toString()); } } static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> { private final ProxyHttpExchange httpExchange; private volatile boolean completed; public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) { super(); this.httpExchange = httpExchange; } public void close() throws IOException { } public void responseReceived(final HttpResponse response) { synchronized (this.httpExchange) { System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine()); this.httpExchange.setResponse(response); HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger(); if (responseTrigger != null && !responseTrigger.isCompleted()) { System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered"); responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange)); } } } public void consumeContent( final ContentDecoder decoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setOriginIOControl(ioctrl); // Receive data from the origin ByteBuffer buf = this.httpExchange.getOutBuffer(); int n = decoder.read(buf); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read"); if (decoder.isCompleted()) { System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read"); } // If the buffer is full, suspend origin input until there is free // space in the buffer if (!buf.hasRemaining()) { ioctrl.suspendInput(); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input"); } // If there is some content in the input buffer make sure client // output is active if (buf.position() > 0) { if (this.httpExchange.getClientIOControl() != null) { this.httpExchange.getClientIOControl().requestOutput(); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); } } } } public void responseCompleted(final HttpContext context) { synchronized (this.httpExchange) { if (this.completed) { return; } this.completed = true; System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed"); this.httpExchange.setResponseReceived(); if (this.httpExchange.getClientIOControl() != null) { this.httpExchange.getClientIOControl().requestOutput(); System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); } } } public void failed(final Exception ex) { synchronized (this.httpExchange) { if (this.completed) { return; } this.completed = true; this.httpExchange.setException(ex); HttpAsyncExchange responseTrigger = this.httpExchange.getResponseTrigger(); if (responseTrigger != null && !responseTrigger.isCompleted()) { System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex); int status = HttpStatus.SC_INTERNAL_SERVER_ERROR; HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status, EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US)); String message = ex.getMessage(); if (message == null) { message = "Unexpected error"; } response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT)); responseTrigger.submitResponse(new BasicAsyncResponseProducer(response)); } } } public boolean cancel() { synchronized (this.httpExchange) { if (this.completed) { return false; } failed(new InterruptedIOException("Cancelled")); return true; } } public ProxyHttpExchange getResult() { return this.httpExchange; } public Exception getException() { return null; } public boolean isDone() { return this.completed; } } static class ProxyResponseProducer implements HttpAsyncResponseProducer { private final ProxyHttpExchange httpExchange; public ProxyResponseProducer(final ProxyHttpExchange httpExchange) { super(); this.httpExchange = httpExchange; } public void close() throws IOException { this.httpExchange.reset(); } public HttpResponse generateResponse() { synchronized (this.httpExchange) { HttpResponse response = this.httpExchange.getResponse(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine()); // Rewrite response!!!! BasicHttpResponse r = new BasicHttpResponse(response.getStatusLine()); r.setEntity(response.getEntity()); return r; } } public void produceContent( final ContentEncoder encoder, final IOControl ioctrl) throws IOException { synchronized (this.httpExchange) { this.httpExchange.setClientIOControl(ioctrl); // Send data to the client ByteBuffer buf = this.httpExchange.getOutBuffer(); buf.flip(); int n = encoder.write(buf); buf.compact(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written"); // If there is space in the buffer and the message has not been // transferred, make sure the origin is sending more data if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) { if (this.httpExchange.getOriginIOControl() != null) { this.httpExchange.getOriginIOControl().requestInput(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " request origin input"); } } if (buf.position() == 0) { if (this.httpExchange.isResponseReceived()) { encoder.complete(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written"); } else { // Input buffer is empty. Wait until the origin fills up // the buffer ioctrl.suspendOutput(); System.out.println("[client<-proxy] " + this.httpExchange.getId() + " suspend client output"); } } } } public void responseCompleted(final HttpContext context) { synchronized (this.httpExchange) { System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response completed"); } } public void failed(final Exception ex) { System.out.println("[client<-proxy] " + ex.toString()); } } static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy { @Override public boolean keepAlive(final HttpResponse response, final HttpContext context) { NHttpConnection conn = (NHttpConnection) context.getAttribute( ExecutionContext.HTTP_CONNECTION); boolean keepAlive = super.keepAlive(response, context); if (keepAlive) { System.out.println("[client->proxy] connection kept alive " + conn); } return keepAlive; } }; static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy { @Override public boolean keepAlive(final HttpResponse response, final HttpContext context) { NHttpConnection conn = (NHttpConnection) context.getAttribute( ExecutionContext.HTTP_CONNECTION); boolean keepAlive = super.keepAlive(response, context); if (keepAlive) { System.out.println("[proxy->origin] connection kept alive " + conn); } return keepAlive; } }; static class ProxyServiceHandler extends HttpAsyncService { public ProxyServiceHandler( final HttpProcessor httpProcessor, final ConnectionReuseStrategy reuseStrategy, final HttpAsyncRequestHandlerResolver handlerResolver, final HttpParams params) { super(httpProcessor, reuseStrategy, handlerResolver, params); } @Override protected void log(final Exception ex) { ex.printStackTrace(); } @Override public void connected(final NHttpServerConnection conn) { System.out.println("[client->proxy] connection open " + conn); super.connected(conn); } @Override public void closed(final NHttpServerConnection conn) { System.out.println("[client->proxy] connection closed " + conn); super.closed(conn); } } static class ProxyClientProtocolHandler extends HttpAsyncRequestExecutor { public ProxyClientProtocolHandler() { super(); } @Override protected void log(final Exception ex) { ex.printStackTrace(); } @Override public void connected(final NHttpClientConnection conn, final Object attachment) throws IOException, HttpException { System.out.println("[proxy->origin] connection open " + conn); super.connected(conn, attachment); } @Override public void closed(final NHttpClientConnection conn) { System.out.println("[proxy->origin] connection closed " + conn); super.closed(conn); } } static class ProxyConnPool extends BasicNIOConnPool { public ProxyConnPool(final ConnectingIOReactor ioreactor, final HttpParams params) { super(ioreactor, params); } public ProxyConnPool( final ConnectingIOReactor ioreactor, final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory, final HttpParams params) { super(ioreactor, connFactory, params); } @Override public void release(final BasicNIOPoolEntry entry, boolean reusable) { System.out.println("[proxy->origin] connection released " + entry.getConnection()); super.release(entry, reusable); StringBuilder buf = new StringBuilder(); PoolStats totals = getTotalStats(); buf.append("[total kept alive: ").append(totals.getAvailable()).append("; "); buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable()); buf.append(" of ").append(totals.getMax()).append("]"); System.out.println("[proxy->origin] " + buf.toString()); } } }
本篇博文的版权信息如下:
Tag:本文所有源代码来源均取自The Apache™ Software Foundation Organization (http://hc.apache.org/httpcomponents-core-ga/examples.html)
相关文章推荐
- This absolute uri http://java.sun.com/jsp/jstl/core cannot be resolved in either web.xml or the jar files deployed with this application
- The absolute uri: http://java.sun.com/jsp/jstl/core cannot be resolved in either web.xml or the jar
- Java程序性能优化 读书笔记(九)优化组件:池
- jsp 无法加载“http://java.sun.com/jsp/jstl/core”解决办法
- Can not find the tag library descriptor for "http://java.sun.com/jsp/jstl/core" 解决办法
- org.apache.jasper.JasperException: The absolute uri: http://java.sun.com/jsp/jstl/core cannot be res
- Can not find the tag library descriptor for "http://java.sun.com/jsp/jstl/core"
- jsp页面- Can not find the tag library descriptor for "http://java.sun.com/jsp/ jstl/core"
- org.apache.jasper.JasperException: The absolute uri: http://java.sun.com/jsp/jstl/core cannot be res
- org.apache.jasper.JasperException: The absolute uri: http://java.sun.com/jsp/jstl/core cannot be resolved in ……
- Can not find the tag library descriptor for "http://java.sun.com/jsp/jstl/core"
- <%@ taglib prefix="c" uri="http://java.sun.com/jstl/core" %>标签再使用时总是报错
- Http Core学习(Http Components 翻译和学习)
- 关于The absolute uri: http://java.sun.com/jstl/core_rt cannot be resolved in either web.xml or the jar
- 最终版宝宝取名程序,java版,我家宝宝名字已经确定。 HttpURLConnection
- [程序代写推荐]Exception in thread "http-apr-8080-exec-6" java.lang.OutOfMemoryError: PermGen space 解决!
- The absolute uri: http://java.sun.com/jsp/jstl/core cannot be resolved in either web.xml or the jar
- AspHTTP的组件的获取使用(小偷程序教学)
- org.apache.jasper.JasperException: The absolute uri: http://java.sun.com/jsp/jstl/core cannot be res
- The absolute uri: http://java.sun.com/jsp/jstl/core cannot be resolved