基于Java NIO 异步读取网络数据
2014-03-28 01:19
323 查看
http://blog.csdn.net/maoxiang/article/details/4370311
关于Java NIO 请参考 java 手册。
简单的来说,Java
NIO 提供了一种异步非阻塞模型,使得网络请求都可以并发执行。
服务器端采用这种模型,响应速度将大大提高,Apache,Nginx
都是这种模型。
本文介绍的是客户端如何采用这种模型来提高客户端访问网络的速度。
1. 使用范例
[java] view
plaincopy
public static void main(String[] args) throws Exception {
//初始化
NHttpClient httpClient = new NHttpClient();
httpClient.init();
//调用的url
String url = "http://www.pconline.com.cn/";
//调用的方法
httpClient.getUrl(url, new NHttpClientCallback() {
public void finished(String content) {
System.out.println("content=" + content.substring(0, 1000));
}
});
//注意这里是立即返回,可以根据需要进行处理
System.in.read();
}
2. NHttpClient 的代码
[c-sharp] view
plaincopy
/**
* 专注互联网,分享创造价值
* maoxiang@gmail.com
*/
package cn.jteam.app.taobao;
import common.util.ValidateUtil;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.protocol.BufferingHttpClientHandler;
import org.apache.http.nio.protocol.EventListener;
import org.apache.http.nio.protocol.HttpRequestExecutionHandler;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;
/**
*
* 作用: 支持异步读取的httpClient
* 暂时不支持socks代理
*/
public class NHttpClient {
private final static Log log = LogFactory.getLog(NHttpClient.class);
private int timeOut = 10000; // 10秒
private String localAddress = null;
private SocketAddress localSocketAddress = null; //本地端口
private boolean useProxy = false;
private int maxConnection = 2;
private Map<String, String> defaultHeaders = new HashMap<String, String>();
private DefaultConnectingIOReactor ioReactor;
private String host;
private String proxyServerType = "http";
private String directHost = "127.0.0.1,localhost";
private String proxyServer;
private int proxyPort;
private String proxyUser;
private String proxyPassword;
private int connections = 0;
private Lock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
public void addConnection() throws Exception {
lock.lock();
try {
if (connections > maxConnection) {
full.await();
}
connections++;
} finally {
lock.unlock();
}
}
public void removeConnection() {
lock.lock();
try {
if (connections <= maxConnection) {
full.signal();
}
connections--;
} finally {
lock.unlock();
}
}
public boolean isRunning() {
return connections > 0;
}
public int getConnections() {
return connections;
}
public Map<String, String> getDefaultHeaders() {
return defaultHeaders;
}
public void setDefaultHeaders(Map<String, String> defaultHeaders) {
this.defaultHeaders = defaultHeaders;
}
public String getDirectHost() {
return directHost;
}
public void setDirectHost(String directHost) {
this.directHost = directHost;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getLocalAddress() {
return localAddress;
}
public void setLocalAddress(String localAddress) {
this.localAddress = localAddress;
}
public SocketAddress getLocalSocketAddress() {
return localSocketAddress;
}
public void setLocalSocketAddress(SocketAddress localSocketAddress) {
this.localSocketAddress = localSocketAddress;
}
public int getMaxConnection() {
return maxConnection;
}
public void setMaxConnection(int maxConnection) {
this.maxConnection = maxConnection;
}
public String getProxyPassword() {
return proxyPassword;
}
public void setProxyPassword(String proxyPassword) {
this.proxyPassword = proxyPassword;
}
public int getProxyPort() {
return proxyPort;
}
public void setProxyPort(int proxyPort) {
this.proxyPort = proxyPort;
}
public String getProxyServer() {
return proxyServer;
}
public void setProxyServer(String proxyServer) {
this.proxyServer = proxyServer;
}
public String getProxyServerType() {
return proxyServerType;
}
public void setProxyServerType(String proxyServerType) {
this.proxyServerType = proxyServerType;
}
public String getProxyUser() {
return proxyUser;
}
public void setProxyUser(String proxyUser) {
this.proxyUser = proxyUser;
}
public int getTimeOut() {
return timeOut;
}
public void setTimeOut(int timeOut) {
this.timeOut = timeOut;
}
public boolean isUseProxy() {
return useProxy;
}
public void setUseProxy(boolean useProxy) {
this.useProxy = useProxy;
}
public void init() throws Exception {
HttpParams params = new BasicHttpParams();
params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000).
setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, timeOut).
setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 512 * 1024).
setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, true);
// setBooleanParameter(CoreConnectionPNames., true);
if (!ValidateUtil.isNull(localAddress)) {
localSocketAddress = InetSocketAddress.createUnresolved(localAddress, 0);
}
defaultHeaders.put("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5 GTB5");
defaultHeaders.put("Accept-Language", "zh-cn,zh;q=0.5");
defaultHeaders.put("Accept-Charset", "GB2312,utf-8;q=0.7,*;q=0.7");
defaultHeaders.put("Accept", "*/*");
/**
* 设置几个固定的http 头
*/
// defaultHeaders.put("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5 GTB5");
// defaultHeaders.put("Accept-Language", "zh-cn,zh;q=0.5");
// defaultHeaders.put("Accept-Charset", "GB2312,utf-8;q=0.7,*;q=0.7");
// defaultHeaders.put("Accept", "*/*");
ioReactor = new DefaultConnectingIOReactor(2, params);
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
httpproc.addInterceptor(new RequestTargetHost());
httpproc.addInterceptor(new RequestConnControl());
httpproc.addInterceptor(new RequestUserAgent());
httpproc.addInterceptor(new RequestExpectContinue());
BufferingHttpClientHandler handler = new BufferingHttpClientHandler(
httpproc,
new MyHttpRequestExecutionHandler(),
new DefaultConnectionReuseStrategy(),
params);
handler.setEventListener(new EventLogger());
final IOEventDispatch ioEventDispatch = new DefaultClientIOEventDispatch(handler, params);
ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
public boolean handle(IOException e) {
e.printStackTrace();
log.error("IOException=" + e.getMessage());
return true;
}
public boolean handle(RuntimeException e) {
e.printStackTrace();
log.error("RuntimeException=" + e.getMessage());
return true;
}
});
Thread t = new Thread(new Runnable() {
public void run() {
try {
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
log.error("Interrupted." + ex.getMessage());
} catch (Exception e) {
log.error("I/O error: " + e.getMessage());
}
log.debug("shutdown");
}
});
t.start();
}
public void destroy() throws Exception {
if (ioReactor != null) {
ioReactor.shutdown();
}
}
//减少dns查询
private Map<String, InetAddress> dns = new HashMap<String, InetAddress>();
public void getUrl(String url, NHttpClientCallback callback) throws Exception {
addConnection();
if (!url.startsWith("http://")) {
url += "http://" + host;
}
URL u = new URL(url);
int port = u.getPort() < 0 ? u.getDefaultPort() : u.getPort();
String path = u.getPath();
if (ValidateUtil.isNull(path)) {
path = "/";
}
if (u.getQuery() != null) {
path += "?" + u.getQuery();
}
if (dns.get(u.getHost()) == null) {
InetAddress address = InetAddress.getByName(u.getHost());
dns.put(u.getHost(), address);
}
InetAddress address = dns.get(u.getHost());
SessionRequest sessionRequest = null;
InternalObject object = new InternalObject(path, callback);
object.setUrl(url);
if (!useProxy) {
sessionRequest = ioReactor.connect(
new InetSocketAddress(address, port),
localSocketAddress, //localhost
object,//attachment
new MySessionRequestCallback());
} else {
//TODO
SocketAddress addr = new InetSocketAddress(proxyServer, proxyPort);
sessionRequest = ioReactor.connect(
addr,
localSocketAddress, //localhost
object,//attachment
new MySessionRequestCallback());
}
/* * */
sessionRequest.waitFor();
if (sessionRequest.getException() != null) {
throw sessionRequest.getException();
}
}
private class InternalObject {
private NHttpClientCallback callback;
private String uri;
private String url;
public InternalObject(String uri, NHttpClientCallback callback) {
this.uri = uri;
this.callback = callback;
}
public NHttpClientCallback getCallback() {
return callback;
}
public void setCallback(NHttpClientCallback callback) {
this.callback = callback;
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
}
private class MySessionRequestCallback implements SessionRequestCallback {
public MySessionRequestCallback() {
super();
}
public void cancelled(final SessionRequest request) {
log.debug("Connect request cancelled: " + request.getRemoteAddress());
}
public void completed(final SessionRequest request) {
log.debug("Connect request completed: " + request.getRemoteAddress());
}
public void failed(final SessionRequest request) {
log.debug("Connect request failed: " + request.getRemoteAddress());
}
public void timeout(final SessionRequest request) {
log.debug("Connect request timed out: " + request.getRemoteAddress());
}
}
private class EventLogger implements EventListener {
public void connectionOpen(final NHttpConnection conn) {
log.debug("Connection open: " + conn);
}
public void connectionTimeout(final NHttpConnection conn) {
log.debug("Connection timed out: " + conn);
}
public void connectionClosed(final NHttpConnection conn) {
log.debug("Connection closed: " + conn);
}
public void fatalIOException(final IOException ex, final NHttpConnection conn) {
log.error("I/O error: " + ex.getMessage());
}
public void fatalProtocolException(final HttpException ex, final NHttpConnection conn) {
log.error("HTTP error: " + ex.getMessage());
}
}
private class MyHttpRequestExecutionHandler implements HttpRequestExecutionHandler {
private final static String REQUEST_SENT = "request-sent";
private final static String RESPONSE_RECEIVED = "response-received";
public MyHttpRequestExecutionHandler() {
super();
}
public void initalizeContext(final HttpContext context, final Object attachment) {
InternalObject internalObject = (InternalObject) attachment;
context.setAttribute("internalObject", internalObject);
}
public void finalizeContext(final HttpContext context) {
Object flag = context.getAttribute(RESPONSE_RECEIVED);
if (flag == null) {
// Signal completion of the request execution
}
}
public HttpRequest submitRequest(final HttpContext context) {
InternalObject internalObject = (InternalObject) context.getAttribute("internalObject");
Object flag = context.getAttribute(REQUEST_SENT);
if (flag == null) {
try {
// Stick some object into the context
context.setAttribute(REQUEST_SENT, Boolean.TRUE);
log.debug("Sending request to " + internalObject.getUrl());
System.out.println("Sending request to " + internalObject.getUrl());
BasicHttpRequest httpRequest = new BasicHttpRequest("GET", internalObject.getUri());
//FIXMED me
// httpRequest.addHeader("Accept-Encoding", "gzip,deflate");
Iterator<String> iteratorDefault = defaultHeaders.keySet().iterator();
while (iteratorDefault.hasNext()) {
String key = iteratorDefault.next();
httpRequest.setHeader(key, defaultHeaders.get(key));
log.debug(key + "=" + defaultHeaders.get(key));
}
return httpRequest;
} catch (Exception e) {
e.printStackTrace();
}
return null;
} else {
// No new request to submit
return null;
}
}
public void handleResponse(final HttpResponse response, final HttpContext context) {
InternalObject internalObject = (InternalObject) context.getAttribute("internalObject");
HttpEntity entity = response.getEntity();
String content = "";
try {
if (response.getStatusLine().getStatusCode() != 200) {
throw new IOException("invalid response code=" + response.getStatusLine().getStatusCode() + ",url=" + internalObject.getUrl());
}
log.debug(response.getStatusLine());
Header[] headers = response.getAllHeaders();
for (Header header : headers) {
log.debug(header.getName() + "=" + header.getValue());
}
if (entity.getContentEncoding() != null && "gzip".equals(entity.getContentEncoding().getValue())) {
//是压缩的流
GZIPInputStream inStream = new GZIPInputStream(entity.getContent());
content = IOUtils.toString(inStream);
} else {
content = IOUtils.toString(entity.getContent(), "GBK");
// content = EntityUtils.toString(entity, "GBK");
}
System.out.println("-----------------------");
System.out.println("response " + response.getStatusLine() + " of url=" + internalObject.getUrl() + ",content=" + content.length());
System.out.println("content=" + content.indexOf("page-info"));
System.out.println("-----------------------");
//System.out.println("content="+content);
internalObject.getCallback().finished(content);
log.debug("Document length: " + content.length());
} catch (Exception e) {
e.printStackTrace();
log.error("I/O error: " + e.getMessage());
} finally {
removeConnection();
}
context.setAttribute(RESPONSE_RECEIVED, Boolean.TRUE);
}
}
/**
*
* 作用:
*/
public interface NHttpClientCallback {
public void finished(String content);
}
}
3. 说明
如果应用程序的瓶颈在网络读取上,可以采用这种方式来处理。经过试验,这中速度要远远快于阻塞方式读取。
如果是要做爬虫或者是DDos攻击,这种方式都比较理想。
关于Java NIO 请参考 java 手册。
简单的来说,Java
NIO 提供了一种异步非阻塞模型,使得网络请求都可以并发执行。
服务器端采用这种模型,响应速度将大大提高,Apache,Nginx
都是这种模型。
本文介绍的是客户端如何采用这种模型来提高客户端访问网络的速度。
1. 使用范例
[java] view
plaincopy
public static void main(String[] args) throws Exception {
//初始化
NHttpClient httpClient = new NHttpClient();
httpClient.init();
//调用的url
String url = "http://www.pconline.com.cn/";
//调用的方法
httpClient.getUrl(url, new NHttpClientCallback() {
public void finished(String content) {
System.out.println("content=" + content.substring(0, 1000));
}
});
//注意这里是立即返回,可以根据需要进行处理
System.in.read();
}
2. NHttpClient 的代码
[c-sharp] view
plaincopy
/**
* 专注互联网,分享创造价值
* maoxiang@gmail.com
*/
package cn.jteam.app.taobao;
import common.util.ValidateUtil;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.GZIPInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpResponse;
import org.apache.http.impl.DefaultConnectionReuseStrategy;
import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
import org.apache.http.message.BasicHttpRequest;
import org.apache.http.nio.NHttpConnection;
import org.apache.http.nio.protocol.BufferingHttpClientHandler;
import org.apache.http.nio.protocol.EventListener;
import org.apache.http.nio.protocol.HttpRequestExecutionHandler;
import org.apache.http.nio.reactor.IOEventDispatch;
import org.apache.http.nio.reactor.IOReactorExceptionHandler;
import org.apache.http.nio.reactor.SessionRequest;
import org.apache.http.nio.reactor.SessionRequestCallback;
import org.apache.http.params.BasicHttpParams;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.params.HttpParams;
import org.apache.http.protocol.BasicHttpProcessor;
import org.apache.http.protocol.HttpContext;
import org.apache.http.protocol.RequestConnControl;
import org.apache.http.protocol.RequestContent;
import org.apache.http.protocol.RequestExpectContinue;
import org.apache.http.protocol.RequestTargetHost;
import org.apache.http.protocol.RequestUserAgent;
import org.apache.http.util.EntityUtils;
/**
*
* 作用: 支持异步读取的httpClient
* 暂时不支持socks代理
*/
public class NHttpClient {
private final static Log log = LogFactory.getLog(NHttpClient.class);
private int timeOut = 10000; // 10秒
private String localAddress = null;
private SocketAddress localSocketAddress = null; //本地端口
private boolean useProxy = false;
private int maxConnection = 2;
private Map<String, String> defaultHeaders = new HashMap<String, String>();
private DefaultConnectingIOReactor ioReactor;
private String host;
private String proxyServerType = "http";
private String directHost = "127.0.0.1,localhost";
private String proxyServer;
private int proxyPort;
private String proxyUser;
private String proxyPassword;
private int connections = 0;
private Lock lock = new ReentrantLock();
private final Condition full = lock.newCondition();
public void addConnection() throws Exception {
lock.lock();
try {
if (connections > maxConnection) {
full.await();
}
connections++;
} finally {
lock.unlock();
}
}
public void removeConnection() {
lock.lock();
try {
if (connections <= maxConnection) {
full.signal();
}
connections--;
} finally {
lock.unlock();
}
}
public boolean isRunning() {
return connections > 0;
}
public int getConnections() {
return connections;
}
public Map<String, String> getDefaultHeaders() {
return defaultHeaders;
}
public void setDefaultHeaders(Map<String, String> defaultHeaders) {
this.defaultHeaders = defaultHeaders;
}
public String getDirectHost() {
return directHost;
}
public void setDirectHost(String directHost) {
this.directHost = directHost;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getLocalAddress() {
return localAddress;
}
public void setLocalAddress(String localAddress) {
this.localAddress = localAddress;
}
public SocketAddress getLocalSocketAddress() {
return localSocketAddress;
}
public void setLocalSocketAddress(SocketAddress localSocketAddress) {
this.localSocketAddress = localSocketAddress;
}
public int getMaxConnection() {
return maxConnection;
}
public void setMaxConnection(int maxConnection) {
this.maxConnection = maxConnection;
}
public String getProxyPassword() {
return proxyPassword;
}
public void setProxyPassword(String proxyPassword) {
this.proxyPassword = proxyPassword;
}
public int getProxyPort() {
return proxyPort;
}
public void setProxyPort(int proxyPort) {
this.proxyPort = proxyPort;
}
public String getProxyServer() {
return proxyServer;
}
public void setProxyServer(String proxyServer) {
this.proxyServer = proxyServer;
}
public String getProxyServerType() {
return proxyServerType;
}
public void setProxyServerType(String proxyServerType) {
this.proxyServerType = proxyServerType;
}
public String getProxyUser() {
return proxyUser;
}
public void setProxyUser(String proxyUser) {
this.proxyUser = proxyUser;
}
public int getTimeOut() {
return timeOut;
}
public void setTimeOut(int timeOut) {
this.timeOut = timeOut;
}
public boolean isUseProxy() {
return useProxy;
}
public void setUseProxy(boolean useProxy) {
this.useProxy = useProxy;
}
public void init() throws Exception {
HttpParams params = new BasicHttpParams();
params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000).
setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, timeOut).
setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 512 * 1024).
setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, true);
// setBooleanParameter(CoreConnectionPNames., true);
if (!ValidateUtil.isNull(localAddress)) {
localSocketAddress = InetSocketAddress.createUnresolved(localAddress, 0);
}
defaultHeaders.put("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5 GTB5");
defaultHeaders.put("Accept-Language", "zh-cn,zh;q=0.5");
defaultHeaders.put("Accept-Charset", "GB2312,utf-8;q=0.7,*;q=0.7");
defaultHeaders.put("Accept", "*/*");
/**
* 设置几个固定的http 头
*/
// defaultHeaders.put("User-Agent", "Mozilla/5.0 (Windows; U; Windows NT 5.1; zh-CN; rv:1.9.1) Gecko/20090624 Firefox/3.5 GTB5");
// defaultHeaders.put("Accept-Language", "zh-cn,zh;q=0.5");
// defaultHeaders.put("Accept-Charset", "GB2312,utf-8;q=0.7,*;q=0.7");
// defaultHeaders.put("Accept", "*/*");
ioReactor = new DefaultConnectingIOReactor(2, params);
BasicHttpProcessor httpproc = new BasicHttpProcessor();
httpproc.addInterceptor(new RequestContent());
httpproc.addInterceptor(new RequestTargetHost());
httpproc.addInterceptor(new RequestConnControl());
httpproc.addInterceptor(new RequestUserAgent());
httpproc.addInterceptor(new RequestExpectContinue());
BufferingHttpClientHandler handler = new BufferingHttpClientHandler(
httpproc,
new MyHttpRequestExecutionHandler(),
new DefaultConnectionReuseStrategy(),
params);
handler.setEventListener(new EventLogger());
final IOEventDispatch ioEventDispatch = new DefaultClientIOEventDispatch(handler, params);
ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
public boolean handle(IOException e) {
e.printStackTrace();
log.error("IOException=" + e.getMessage());
return true;
}
public boolean handle(RuntimeException e) {
e.printStackTrace();
log.error("RuntimeException=" + e.getMessage());
return true;
}
});
Thread t = new Thread(new Runnable() {
public void run() {
try {
ioReactor.execute(ioEventDispatch);
} catch (InterruptedIOException ex) {
log.error("Interrupted." + ex.getMessage());
} catch (Exception e) {
log.error("I/O error: " + e.getMessage());
}
log.debug("shutdown");
}
});
t.start();
}
public void destroy() throws Exception {
if (ioReactor != null) {
ioReactor.shutdown();
}
}
//减少dns查询
private Map<String, InetAddress> dns = new HashMap<String, InetAddress>();
public void getUrl(String url, NHttpClientCallback callback) throws Exception {
addConnection();
if (!url.startsWith("http://")) {
url += "http://" + host;
}
URL u = new URL(url);
int port = u.getPort() < 0 ? u.getDefaultPort() : u.getPort();
String path = u.getPath();
if (ValidateUtil.isNull(path)) {
path = "/";
}
if (u.getQuery() != null) {
path += "?" + u.getQuery();
}
if (dns.get(u.getHost()) == null) {
InetAddress address = InetAddress.getByName(u.getHost());
dns.put(u.getHost(), address);
}
InetAddress address = dns.get(u.getHost());
SessionRequest sessionRequest = null;
InternalObject object = new InternalObject(path, callback);
object.setUrl(url);
if (!useProxy) {
sessionRequest = ioReactor.connect(
new InetSocketAddress(address, port),
localSocketAddress, //localhost
object,//attachment
new MySessionRequestCallback());
} else {
//TODO
SocketAddress addr = new InetSocketAddress(proxyServer, proxyPort);
sessionRequest = ioReactor.connect(
addr,
localSocketAddress, //localhost
object,//attachment
new MySessionRequestCallback());
}
/* * */
sessionRequest.waitFor();
if (sessionRequest.getException() != null) {
throw sessionRequest.getException();
}
}
private class InternalObject {
private NHttpClientCallback callback;
private String uri;
private String url;
public InternalObject(String uri, NHttpClientCallback callback) {
this.uri = uri;
this.callback = callback;
}
public NHttpClientCallback getCallback() {
return callback;
}
public void setCallback(NHttpClientCallback callback) {
this.callback = callback;
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
}
private class MySessionRequestCallback implements SessionRequestCallback {
public MySessionRequestCallback() {
super();
}
public void cancelled(final SessionRequest request) {
log.debug("Connect request cancelled: " + request.getRemoteAddress());
}
public void completed(final SessionRequest request) {
log.debug("Connect request completed: " + request.getRemoteAddress());
}
public void failed(final SessionRequest request) {
log.debug("Connect request failed: " + request.getRemoteAddress());
}
public void timeout(final SessionRequest request) {
log.debug("Connect request timed out: " + request.getRemoteAddress());
}
}
private class EventLogger implements EventListener {
public void connectionOpen(final NHttpConnection conn) {
log.debug("Connection open: " + conn);
}
public void connectionTimeout(final NHttpConnection conn) {
log.debug("Connection timed out: " + conn);
}
public void connectionClosed(final NHttpConnection conn) {
log.debug("Connection closed: " + conn);
}
public void fatalIOException(final IOException ex, final NHttpConnection conn) {
log.error("I/O error: " + ex.getMessage());
}
public void fatalProtocolException(final HttpException ex, final NHttpConnection conn) {
log.error("HTTP error: " + ex.getMessage());
}
}
private class MyHttpRequestExecutionHandler implements HttpRequestExecutionHandler {
private final static String REQUEST_SENT = "request-sent";
private final static String RESPONSE_RECEIVED = "response-received";
public MyHttpRequestExecutionHandler() {
super();
}
public void initalizeContext(final HttpContext context, final Object attachment) {
InternalObject internalObject = (InternalObject) attachment;
context.setAttribute("internalObject", internalObject);
}
public void finalizeContext(final HttpContext context) {
Object flag = context.getAttribute(RESPONSE_RECEIVED);
if (flag == null) {
// Signal completion of the request execution
}
}
public HttpRequest submitRequest(final HttpContext context) {
InternalObject internalObject = (InternalObject) context.getAttribute("internalObject");
Object flag = context.getAttribute(REQUEST_SENT);
if (flag == null) {
try {
// Stick some object into the context
context.setAttribute(REQUEST_SENT, Boolean.TRUE);
log.debug("Sending request to " + internalObject.getUrl());
System.out.println("Sending request to " + internalObject.getUrl());
BasicHttpRequest httpRequest = new BasicHttpRequest("GET", internalObject.getUri());
//FIXMED me
// httpRequest.addHeader("Accept-Encoding", "gzip,deflate");
Iterator<String> iteratorDefault = defaultHeaders.keySet().iterator();
while (iteratorDefault.hasNext()) {
String key = iteratorDefault.next();
httpRequest.setHeader(key, defaultHeaders.get(key));
log.debug(key + "=" + defaultHeaders.get(key));
}
return httpRequest;
} catch (Exception e) {
e.printStackTrace();
}
return null;
} else {
// No new request to submit
return null;
}
}
public void handleResponse(final HttpResponse response, final HttpContext context) {
InternalObject internalObject = (InternalObject) context.getAttribute("internalObject");
HttpEntity entity = response.getEntity();
String content = "";
try {
if (response.getStatusLine().getStatusCode() != 200) {
throw new IOException("invalid response code=" + response.getStatusLine().getStatusCode() + ",url=" + internalObject.getUrl());
}
log.debug(response.getStatusLine());
Header[] headers = response.getAllHeaders();
for (Header header : headers) {
log.debug(header.getName() + "=" + header.getValue());
}
if (entity.getContentEncoding() != null && "gzip".equals(entity.getContentEncoding().getValue())) {
//是压缩的流
GZIPInputStream inStream = new GZIPInputStream(entity.getContent());
content = IOUtils.toString(inStream);
} else {
content = IOUtils.toString(entity.getContent(), "GBK");
// content = EntityUtils.toString(entity, "GBK");
}
System.out.println("-----------------------");
System.out.println("response " + response.getStatusLine() + " of url=" + internalObject.getUrl() + ",content=" + content.length());
System.out.println("content=" + content.indexOf("page-info"));
System.out.println("-----------------------");
//System.out.println("content="+content);
internalObject.getCallback().finished(content);
log.debug("Document length: " + content.length());
} catch (Exception e) {
e.printStackTrace();
log.error("I/O error: " + e.getMessage());
} finally {
removeConnection();
}
context.setAttribute(RESPONSE_RECEIVED, Boolean.TRUE);
}
}
/**
*
* 作用:
*/
public interface NHttpClientCallback {
public void finished(String content);
}
}
3. 说明
如果应用程序的瓶颈在网络读取上,可以采用这种方式来处理。经过试验,这中速度要远远快于阻塞方式读取。
如果是要做爬虫或者是DDos攻击,这种方式都比较理想。
相关文章推荐
- 基于Java NIO 异步读取网络数据
- Java NIO 异步读取网络数据
- ANDROID笔记:基于handle的异步请求网络数据的一种方法
- 异步操作读取网络的json数据(有道翻译实例)
- 开发一个基于React Native的简易demo--读取网络数据并展示
- Android中基于HTTP的通信技术(2)使用Http的Post方式读取网络数据
- Java基础知识强化之网络编程笔记16:Android网络通信之 使用Http的Get方式读取网络数据(基于HTTP通信技术)
- Android中基于HTTP的通信技术(1)使用Http的Get方式读取网络数据
- Java基础知识强化之网络编程笔记17:Android网络通信之 使用Http的Post方式读取网络数据(基于HTTP通信技术)
- Java基础知识强化之网络编程笔记18:Android网络通信之 使用HttpClient的Post / Get 方式读取网络数据(基于HTTP通信技术)
- Android使用ImageLoader异步加载网络图片(一)读取单张图片
- 异步加载网络数据,自定义进度条显示
- 一个大数据方案:基于Nutch+Hadoop+Hbase+ElasticSearch的网络爬虫及搜索引擎
- iOS 信号量解决-网络异步请求的数据同步返回问题
- 基于VC++开发InlineHook网络数据发送接收函数
- Swift 2.0 异步网络请求从网页获取json,并转化为NSArray,遍历得到其中数据.
- 从网络读取数据
- 使用AsyncTask异步加载类进行访问网络数据json的理解和用法
- Swift - 异步获取网络数据封装类
- 分享非常有用的Java程序(关键代码)(八)---Java InputStream读取网络响应Response数据的方法!(重要)