您的位置:首页 > 其它

简单的生产消费者模型

2014-12-18 17:55 302 查看
案例场景:httpclient4.3.5抓取网页,用自带的线程池进行多线程测试。

httpclient4.3.5简单介绍:对于同一主机的请求,会保存路由信息,下次的请求会根据保存的路由走,减少了查找主机的时间。

类介绍:数据结构用的阻塞队列结构;监控线程、生产线程、消费线程

代码如下:

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpContext;
import org.apache.http.util.EntityUtils;

public class AQSTest {

public static void main(String[] args){

BlockQueue queue = new BlockQueue();
monitor motor = new monitor(queue) ;
motor.start() ;// 监控队列大小
Producer prod1 = new Producer(queue) ;
Producer prod2 = new Producer(queue) ;
Producer prod3 = new Producer(queue) ;

prod1.start();
prod2.start();
prod3.start();

Crawel crawel = new Crawel(queue);
crawel.start() ;

try {
Thread.sleep(60*1000*2);
prod1.shutdown();
prod2.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}

/*
int i = new Random().nextInt()%5;
System.out.println(i);
*/
}

public static class Crawel extends Thread{

private BlockQueue queue ;
public Crawel(BlockQueue _queue){
queue = _queue ;
}
@Override
public void run() {

PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
// 将最大连接数增加到200
cm.setMaxTotal(200);
// 将每个路由基础的连接增加到20
cm.setDefaultMaxPerRoute(20);
//将目标主机的最大连接数增加到50
HttpHost localhost = new HttpHost("www.yeetrack.com", 80);
cm.setMaxPerRoute(new HttpRoute(localhost), 50);

CloseableHttpClient httpClient = HttpClients.custom()
.setConnectionManager(cm)
.build();
// 为每个url创建一个线程,GetThread是自定义的类
GetThread[] threads = new GetThread[50];
for (int i = 0; i < threads.length; i++) {
threads[i] = new GetThread(httpClient,queue );
}

// 启动线程
for (int j = 0; j < threads.length; j++) {
threads[j].start();
}

// join the threads
for (int j = 0; j < threads.length; j++) {
try {
threads[j].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static class GetThread extends Thread {

private final CloseableHttpClient httpClient;
private final HttpContext context;
private  HttpGet httpget;
private volatile boolean stop = false;
private final BlockQueue queue ;

public GetThread(CloseableHttpClient httpClient,BlockQueue _queue) {
this.httpClient = httpClient;
this.context = HttpClientContext.create();
this.queue = _queue ;
}

public void shutdown(){
synchronized (this) {
stop  = true ;
}
}

@Override
public void run() {
while(!stop){
try {
String url = queue.take() ;
httpget = new HttpGet(url);
CloseableHttpResponse response = httpClient.execute(
httpget, context);
try {
HttpEntity entity = response.getEntity();
String resp = EntityUtils.toString(entity);

System.out.println("成功获取源码");
EntityUtils.consume(entity);
} finally {
response.close();
}
} catch (ClientProtocolException ex) {
// Handle protocol errors
} catch (IOException ex) {
// Handle I/O errors
}
}

}

}

public static class monitor extends  Thread{

private BlockQueue queue ;
private volatile boolean shutdown = false;
public monitor(BlockQueue _queue){
queue = _queue ;
}
public void run() {
while(!shutdown){
System.out.println("Queue Size: "+queue.size());
try {
sleep(500) ;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void shutdown(){
synchronized (this) {
shutdown = true;
}
}
}

public static class Const {

private static String[] urls = {"http://www.baidu.com","http://www.bing.com","http://www.hao123.com","http://www.163.com"
,"http://www.csdn.net"};

private static Lock lock = new ReentrantLock() ;
public static String get(){
lock.lock() ;
int i = new Random().nextInt()%urls.length;
i = Math.abs(i);
String str = urls[i] ;
lock.unlock() ;
return str ;
}
}

public static class Producer extends Thread {

private volatile boolean stop = false;
private BlockQueue queue ;
public Producer(BlockQueue _queue){
queue = _queue ;
}
@Override
public void run() {
while(!stop){
queue.put(Const.get()) ;
try {
sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void shutdown(){
synchronized (this) {
stop = true;
}
}

}

public static class BlockQueue {
public  LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<String>(1000);

public void put(String url){
try {
queue.put(url) ;
} catch (InterruptedException e) {
e.printStackTrace();
queue.clear();
}
}

public String take(){
String element = null ;
try {
element = queue.take() ;
} catch (InterruptedException e) {
e.printStackTrace();
}
return element ;
}

public int size(){
int size = queue.size() ;
return size ;
}
}

public static class IdleConnectionMonitorThread extends Thread {

private final HttpClientConnectionManager connMgr;
private volatile boolean shutdown;

public IdleConnectionMonitorThread(HttpClientConnectionManager connMgr) {
super();
this.connMgr = connMgr;
}

@Override
public void run() {
try {
while (!shutdown) {
synchronized (this) {
wait(5000);
// 关闭失效的连接
connMgr.closeExpiredConnections();
// 可选的, 关闭30秒内不活动的连接
connMgr.closeIdleConnections(30, TimeUnit.SECONDS);
}
}
} catch (InterruptedException ex) {
// terminate
}
}

public void shutdown() {
shutdown = true;
synchronized (this) {
notifyAll();
}
}

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: