您的位置:首页 > 运维架构 > 网站架构

一个Java实现的多层爬取器架构

2016-09-04 22:03 253 查看

一个Java实现的多层爬取器架构

代码中已给出了注释,还是比较容易理解的

package edu.fzu.ir.crawl;

import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

import edu.fzu.ir.model.LongTextCrawlInfo;
/**
* 多层采集类
* 初始时给定一个待采集的列表
* 根据每个待采集对象的url和深度进行采集
* @author asoar
*
*/
public class MultiDepthCrawler {

public static void main(String[] args) {
List<LongTextCrawlInfo> crawlInfoList = new LinkedList<LongTextCrawlInfo>();
for(int i=0; i<10; ++i) {
String url = "url"+i;
int depth = 2;
crawlInfoList.add(new LongTextCrawlInfo(url, depth));
}
MultiDepthCrawler mdc = new MultiDepthCrawler();
mdc.start(crawlInfoList);

System.out.println(new Date());
}

/**
* queryCrawlInfoList的长度大于此值,才会去连接server进行询问
*/
private static final int queryListMaxLength = 500;

/**
* 待采集的资源(url和深度)队列
*/
private BlockingQueue<LongTextCrawlInfo> crawlInfoQueue;

/**
* 执行采集的线程池,不断的从队列中取出元素,并采集
*/
private ExecutorService threadPool;

/**
* threadPool线程池的大小
*/
private int threadPoolSize;

/**
* 将采集中抽取出的url加入到queryCrawlInfoList中,
* 如果加入之后发现queryCrawlInfoList长度大于queryListMaxLength,则询问server在queryListMaxLength中哪些还未采集
* 将未采集的url加入到队列中
*/
private ExecutorService addThreadPool;

/**
* addThreadPool线程池的大小
*/
private int addThreadPoolSize;

/**
* 执行采集任务的线程列表,用于查看每个线程是否在采集
*/
private List<FetchAndCrawl> fetchAndCrawlList;

/**
* 将待采集url加入队列的线程的执行返回结果列表,用于检测是否执行完毕
*/
private List<Future<Object>> futureList;

/**
* 网页采集并抽取出的url存储到此列表中,当列表长度大于queryListMaxLength时,询问Server此列表中哪些未采集,
* 将未采集的url加入到队列中,并情况此列表
*/
private List<LongTextCrawlInfo> queryCrawlInfoList;

private Integer count=0;

/**
* 构造函数 初始化各个变量
*/
public MultiDepthCrawler() {
init();
}

/**
* 初始化各个变量
*/
private void init() {
threadPoolSize = 8;
addThreadPoolSize = 8;
crawlInfoQueue = new LinkedBlockingQueue<LongTextCrawlInfo>(20);
threadPool = Executors.newFixedThreadPool(threadPoolSize);
addThreadPool = Executors.newFixedThreadPool(addThreadPoolSize);
fetchAndCrawlList = new LinkedList<FetchAndCrawl>();
futureList = Collections.synchronizedList(new LinkedList<Future<Object>>());
queryCrawlInfoList = Collections.synchronizedList(new LinkedList<LongTextCrawlInfo>());
count=0;
}

/**
* 启动多层采集
* @param crawlInfoList 待采集列表
*/
public void start(List<LongTextCrawlInfo> crawlInfoList) {

/**
* 采集资源加入队列中
*/
for(LongTextCrawlInfo crawlInfo:crawlInfoList) {
try {
crawlInfoQueue.put(crawlInfo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 建立threadPoolSize个采集的线程,并启动
*/
for(int i=0; i<threadPoolSize; ++i) {
fetchAndCrawlList.add(new FetchAndCrawl());
}
for(FetchAndCrawl fethcAndCrawl : fetchAndCrawlList) {
threadPool.submit(fethcAndCrawl);
}

/**
* 循环判断采集是否结束
* 结束条件:同时满足以下几个条件
*          未再有AddQueue的线程未结束
*          所有的FetchAndCrawl线程均未在采集(isRunning为false)
*          队列为空
*          queryCrawlInfoList长度size为0
*/
boolean isBreak = false;
while(!isBreak) {
isBreak = true;
synchronized(crawlInfoQueue) {
for(Future<Object> future: futureList) {
if(!future.isDone()) {
System.out.println("not done");
isBreak = false;

d4ab
break;
}
}
if(isBreak) {
System.out.println("done");
for(FetchAndCrawl fetchAndCrawl:fetchAndCrawlList) {
if(fetchAndCrawl.isRunning) {
System.out.println("isRunning");
isBreak = false;
break;
}
}
}
if(isBreak) {
System.out.println("not running");
if(crawlInfoQueue.isEmpty()) {
if(queryCrawlInfoList.size() > 0) {
List<LongTextCrawlInfo> addToQueueCrawlInfoList = new LinkedList<LongTextCrawlInfo>();
synchronized(queryCrawlInfoList) {
addToQueueCrawlInfoList.addAll(queryServerIsCrawled(queryCrawlInfoList));
queryCrawlInfoList.clear();
}
System.out.println("add to queue!");
for(LongTextCrawlInfo nextCrawlInfo: addToQueueCrawlInfoList) {
try {
crawlInfoQueue.put(nextCrawlInfo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isBreak = false;
} else {
System.out.println("break");
isBreak = true;
}
} else {
System.out.println("not empty");
isBreak = false;
}
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 上面判断采集结束之后,将所有的线程循环条件更改,跳出循环
*/
for(FetchAndCrawl fethcAndCrawl : fetchAndCrawlList) {
fethcAndCrawl.setBreak(true);
}

/**
* 关闭线程池
*/
threadPool.shutdown();
addThreadPool.shutdown();
}

/**
* 循环从队列中取出一个待采集资源,采集网页,
* 若有抽取的url,则建立一个AddQueue线程执行url加入队列的操作,并将此线程交给addThreadPool线程池执行
* @author asoar
*
*/
private class FetchAndCrawl implements Callable<Object> {
private boolean isRunning;
private boolean isBreak;

@Override
public Object call() throws Exception {
OnePageCrawler onePageCrawler = null;
List<LongTextCrawlInfo> nextDepthCrawlInfo = null;
while(!isBreak) {
//              System.out.println("fetch ");
//取队列
LongTextCrawlInfo crawlInfo = crawlInfoQueue.poll();
if(crawlInfo == null) {
//若为空,则isRunning为false
isRunning = false;
Thread.sleep(10);
continue;
} else {
//否则,isRunning为true,表示在采集
isRunning = true;
System.out.println("crawl one page");
synchronized (count) {
System.out.println(count++);
}
onePageCrawler = new OnePageCrawler(crawlInfo);
onePageCrawler.crawl();
nextDepthCrawlInfo = onePageCrawler.getNextDepthCrawlInfo();
if(nextDepthCrawlInfo != null && nextDepthCrawlInfo.size()>0) {
Future<Object> f = addThreadPool.submit(new AddQueue(nextDepthCrawlInfo));
futureList.add(f);
}
}
}
System.out.println("return");
return null;
}

private FetchAndCrawl() {
this.isRunning = false;
this.isBreak = false;
}

public void setBreak(boolean isBreak) {
this.isBreak = isBreak;
}

}

/**
* 将未采集的资源加入到queryCrawlInfoList,
* queryCrawlInfoList长度大于queryListMaxLength时,询问server哪些url未采集,
* 将未采集的加入到队列中
* @author asoar
*
*/
private class AddQueue implements Callable<Object> {
private List<LongTextCrawlInfo> nextDepthCrawlInfo;
@Override
public Object call() throws Exception {
System.out.println("add to query list!");
queryCrawlInfoList.addAll(nextDepthCrawlInfo);
List<LongTextCrawlInfo> addToQueueCrawlInfoList = new LinkedList<LongTextCrawlInfo>();
synchronized(queryCrawlInfoList) {
if(queryCrawlInfoList.size()>queryListMaxLength) {
System.out.println("query server which is not crawled");

addToQueueCrawlInfoList.addAll(queryServerIsCrawled(queryCrawlInfoList));

queryCrawlInfoList.clear();
}
}

System.out.println("add to queue!");
for(LongTextCrawlInfo nextCrawlInfo: addToQueueCrawlInfoList) {
try {
crawlInfoQueue.put(nextCrawlInfo);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("add over");
return "ok";
}

private AddQueue(List<LongTextCrawlInfo> nextDepthCrawlInfo) {
this.nextDepthCrawlInfo = nextDepthCrawlInfo;
}
}

/**
* 询问server,哪些未采集,并返回未采集的LongTextCrawlInfo列表
* @param nextDepthCrawlInfo
* @return
*/
private List<LongTextCrawlInfo> queryServerIsCrawled(List<LongTextCrawlInfo> nextDepthCrawlInfo) {
System.out.println("query server which is not crawled");
List<LongTextCrawlInfo> addToQueueCrawlInfoList = new LinkedList<LongTextCrawlInfo>();

addToQueueCrawlInfoList.addAll(queryCrawlInfoList);

return addToQueueCrawlInfoList;
}

}


package edu.fzu.ir.crawl;

import java.util.LinkedList;
import java.util.List;
import java.util.Random;

import edu.fzu.ir.model.LongTextCrawlInfo;
/**
* 单个页面采集类
* 首先采集当前url
* 然后根据采集深度决定是否抽取其中url
* @author asoar
*
*/
public class OnePageCrawler {
/**
* 待采集信息
*/
private LongTextCrawlInfo crawlInfo;
/**
* 采集抽取的url,若采集深度为0,此列表为空
*/
private List<LongTextCrawlInfo> nextDepthCrawlInfo;

/**
* 采集网页,并抽取url
*/
public void crawl() {
if(crawlInfo == null) {
return;
}
//      System.out.println("crawl :" + crawlInfo.getUrl());
//      System.out.println("depth:"+crawlInfo.getDepth());

//采集深度大于0才抽取url
if(crawlInfo.getDepth()>0) {
//          System.out.println("extract url from page");
for(int i=0; i<100; ++i) {
nextDepthCrawlInfo.add(new LongTextCrawlInfo("url"+new Random().nextInt(10000)
+new Random().nextInt(1000)+new Random().nextInt(10000)+new Random().nextInt(1000), crawlInfo.getDepth()-1));
}
}
}

public LongTextCrawlInfo getCrawlInfo() {
return crawlInfo;
}

public void setCrawlInfo(LongTextCrawlInfo crawlInfo) {
this.crawlInfo = crawlInfo;
}

public List<LongTextCrawlInfo> getNextDepthCrawlInfo() {
return nextDepthCrawlInfo;
}

public void setNextDepthCrawlInfo(List<LongTextCrawlInfo> nextDepthCrawlInfo) {
this.nextDepthCrawlInfo = nextDepthCrawlInfo;
}

public OnePageCrawler(LongTextCrawlInfo crawlInfo) {
this.crawlInfo = crawlInfo;
this.nextDepthCrawlInfo = new LinkedList<LongTextCrawlInfo>();
}

}


package edu.fzu.ir.model;

import java.io.Serializable;

/**
* 长文本采集信息的基本类
* 包含了一个采集的url和采集的深度
* @author asoar
*
*/
public class LongTextCrawlInfo implements Serializable{
/**
*
*/
private static final long serialVersionUID = -8542848699399696344L;
private String url;
private int depth;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public int getDepth() {
return depth;
}
public void setDepth(int depth) {
this.depth = depth;
}
@Override
public String toString() {
return "LongTextCrawlInfo [url=" + url + ", depth=" + depth + "]";
}
public LongTextCrawlInfo(String url, int depth) {
super();
this.url = url;
this.depth = depth;
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 多层爬取 架构