一个Java实现的多层爬取器架构
2016-09-04 22:03
253 查看
一个Java实现的多层爬取器架构
代码中已给出了注释,还是比较容易理解的package edu.fzu.ir.crawl; import java.util.Collections; import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import edu.fzu.ir.model.LongTextCrawlInfo; /** * 多层采集类 * 初始时给定一个待采集的列表 * 根据每个待采集对象的url和深度进行采集 * @author asoar * */ public class MultiDepthCrawler { public static void main(String[] args) { List<LongTextCrawlInfo> crawlInfoList = new LinkedList<LongTextCrawlInfo>(); for(int i=0; i<10; ++i) { String url = "url"+i; int depth = 2; crawlInfoList.add(new LongTextCrawlInfo(url, depth)); } MultiDepthCrawler mdc = new MultiDepthCrawler(); mdc.start(crawlInfoList); System.out.println(new Date()); } /** * queryCrawlInfoList的长度大于此值,才会去连接server进行询问 */ private static final int queryListMaxLength = 500; /** * 待采集的资源(url和深度)队列 */ private BlockingQueue<LongTextCrawlInfo> crawlInfoQueue; /** * 执行采集的线程池,不断的从队列中取出元素,并采集 */ private ExecutorService threadPool; /** * threadPool线程池的大小 */ private int threadPoolSize; /** * 将采集中抽取出的url加入到queryCrawlInfoList中, * 如果加入之后发现queryCrawlInfoList长度大于queryListMaxLength,则询问server在queryListMaxLength中哪些还未采集 * 将未采集的url加入到队列中 */ private ExecutorService addThreadPool; /** * addThreadPool线程池的大小 */ private int addThreadPoolSize; /** * 执行采集任务的线程列表,用于查看每个线程是否在采集 */ private List<FetchAndCrawl> fetchAndCrawlList; /** * 将待采集url加入队列的线程的执行返回结果列表,用于检测是否执行完毕 */ private List<Future<Object>> futureList; /** * 网页采集并抽取出的url存储到此列表中,当列表长度大于queryListMaxLength时,询问Server此列表中哪些未采集, * 将未采集的url加入到队列中,并情况此列表 */ private List<LongTextCrawlInfo> queryCrawlInfoList; private Integer count=0; /** * 构造函数 初始化各个变量 */ public MultiDepthCrawler() { init(); } /** * 初始化各个变量 */ private void init() { threadPoolSize = 8; addThreadPoolSize = 8; crawlInfoQueue = new LinkedBlockingQueue<LongTextCrawlInfo>(20); threadPool = Executors.newFixedThreadPool(threadPoolSize); addThreadPool = Executors.newFixedThreadPool(addThreadPoolSize); fetchAndCrawlList = new LinkedList<FetchAndCrawl>(); futureList = Collections.synchronizedList(new LinkedList<Future<Object>>()); queryCrawlInfoList = Collections.synchronizedList(new LinkedList<LongTextCrawlInfo>()); count=0; } /** * 启动多层采集 * @param crawlInfoList 待采集列表 */ public void start(List<LongTextCrawlInfo> crawlInfoList) { /** * 采集资源加入队列中 */ for(LongTextCrawlInfo crawlInfo:crawlInfoList) { try { crawlInfoQueue.put(crawlInfo); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 建立threadPoolSize个采集的线程,并启动 */ for(int i=0; i<threadPoolSize; ++i) { fetchAndCrawlList.add(new FetchAndCrawl()); } for(FetchAndCrawl fethcAndCrawl : fetchAndCrawlList) { threadPool.submit(fethcAndCrawl); } /** * 循环判断采集是否结束 * 结束条件:同时满足以下几个条件 * 未再有AddQueue的线程未结束 * 所有的FetchAndCrawl线程均未在采集(isRunning为false) * 队列为空 * queryCrawlInfoList长度size为0 */ boolean isBreak = false; while(!isBreak) { isBreak = true; synchronized(crawlInfoQueue) { for(Future<Object> future: futureList) { if(!future.isDone()) { System.out.println("not done"); isBreak = false; d4ab break; } } if(isBreak) { System.out.println("done"); for(FetchAndCrawl fetchAndCrawl:fetchAndCrawlList) { if(fetchAndCrawl.isRunning) { System.out.println("isRunning"); isBreak = false; break; } } } if(isBreak) { System.out.println("not running"); if(crawlInfoQueue.isEmpty()) { if(queryCrawlInfoList.size() > 0) { List<LongTextCrawlInfo> addToQueueCrawlInfoList = new LinkedList<LongTextCrawlInfo>(); synchronized(queryCrawlInfoList) { addToQueueCrawlInfoList.addAll(queryServerIsCrawled(queryCrawlInfoList)); queryCrawlInfoList.clear(); } System.out.println("add to queue!"); for(LongTextCrawlInfo nextCrawlInfo: addToQueueCrawlInfoList) { try { crawlInfoQueue.put(nextCrawlInfo); } catch (InterruptedException e) { e.printStackTrace(); } } isBreak = false; } else { System.out.println("break"); isBreak = true; } } else { System.out.println("not empty"); isBreak = false; } } } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 上面判断采集结束之后,将所有的线程循环条件更改,跳出循环 */ for(FetchAndCrawl fethcAndCrawl : fetchAndCrawlList) { fethcAndCrawl.setBreak(true); } /** * 关闭线程池 */ threadPool.shutdown(); addThreadPool.shutdown(); } /** * 循环从队列中取出一个待采集资源,采集网页, * 若有抽取的url,则建立一个AddQueue线程执行url加入队列的操作,并将此线程交给addThreadPool线程池执行 * @author asoar * */ private class FetchAndCrawl implements Callable<Object> { private boolean isRunning; private boolean isBreak; @Override public Object call() throws Exception { OnePageCrawler onePageCrawler = null; List<LongTextCrawlInfo> nextDepthCrawlInfo = null; while(!isBreak) { // System.out.println("fetch "); //取队列 LongTextCrawlInfo crawlInfo = crawlInfoQueue.poll(); if(crawlInfo == null) { //若为空,则isRunning为false isRunning = false; Thread.sleep(10); continue; } else { //否则,isRunning为true,表示在采集 isRunning = true; System.out.println("crawl one page"); synchronized (count) { System.out.println(count++); } onePageCrawler = new OnePageCrawler(crawlInfo); onePageCrawler.crawl(); nextDepthCrawlInfo = onePageCrawler.getNextDepthCrawlInfo(); if(nextDepthCrawlInfo != null && nextDepthCrawlInfo.size()>0) { Future<Object> f = addThreadPool.submit(new AddQueue(nextDepthCrawlInfo)); futureList.add(f); } } } System.out.println("return"); return null; } private FetchAndCrawl() { this.isRunning = false; this.isBreak = false; } public void setBreak(boolean isBreak) { this.isBreak = isBreak; } } /** * 将未采集的资源加入到queryCrawlInfoList, * queryCrawlInfoList长度大于queryListMaxLength时,询问server哪些url未采集, * 将未采集的加入到队列中 * @author asoar * */ private class AddQueue implements Callable<Object> { private List<LongTextCrawlInfo> nextDepthCrawlInfo; @Override public Object call() throws Exception { System.out.println("add to query list!"); queryCrawlInfoList.addAll(nextDepthCrawlInfo); List<LongTextCrawlInfo> addToQueueCrawlInfoList = new LinkedList<LongTextCrawlInfo>(); synchronized(queryCrawlInfoList) { if(queryCrawlInfoList.size()>queryListMaxLength) { System.out.println("query server which is not crawled"); addToQueueCrawlInfoList.addAll(queryServerIsCrawled(queryCrawlInfoList)); queryCrawlInfoList.clear(); } } System.out.println("add to queue!"); for(LongTextCrawlInfo nextCrawlInfo: addToQueueCrawlInfoList) { try { crawlInfoQueue.put(nextCrawlInfo); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("add over"); return "ok"; } private AddQueue(List<LongTextCrawlInfo> nextDepthCrawlInfo) { this.nextDepthCrawlInfo = nextDepthCrawlInfo; } } /** * 询问server,哪些未采集,并返回未采集的LongTextCrawlInfo列表 * @param nextDepthCrawlInfo * @return */ private List<LongTextCrawlInfo> queryServerIsCrawled(List<LongTextCrawlInfo> nextDepthCrawlInfo) { System.out.println("query server which is not crawled"); List<LongTextCrawlInfo> addToQueueCrawlInfoList = new LinkedList<LongTextCrawlInfo>(); addToQueueCrawlInfoList.addAll(queryCrawlInfoList); return addToQueueCrawlInfoList; } }
package edu.fzu.ir.crawl; import java.util.LinkedList; import java.util.List; import java.util.Random; import edu.fzu.ir.model.LongTextCrawlInfo; /** * 单个页面采集类 * 首先采集当前url * 然后根据采集深度决定是否抽取其中url * @author asoar * */ public class OnePageCrawler { /** * 待采集信息 */ private LongTextCrawlInfo crawlInfo; /** * 采集抽取的url,若采集深度为0,此列表为空 */ private List<LongTextCrawlInfo> nextDepthCrawlInfo; /** * 采集网页,并抽取url */ public void crawl() { if(crawlInfo == null) { return; } // System.out.println("crawl :" + crawlInfo.getUrl()); // System.out.println("depth:"+crawlInfo.getDepth()); //采集深度大于0才抽取url if(crawlInfo.getDepth()>0) { // System.out.println("extract url from page"); for(int i=0; i<100; ++i) { nextDepthCrawlInfo.add(new LongTextCrawlInfo("url"+new Random().nextInt(10000) +new Random().nextInt(1000)+new Random().nextInt(10000)+new Random().nextInt(1000), crawlInfo.getDepth()-1)); } } } public LongTextCrawlInfo getCrawlInfo() { return crawlInfo; } public void setCrawlInfo(LongTextCrawlInfo crawlInfo) { this.crawlInfo = crawlInfo; } public List<LongTextCrawlInfo> getNextDepthCrawlInfo() { return nextDepthCrawlInfo; } public void setNextDepthCrawlInfo(List<LongTextCrawlInfo> nextDepthCrawlInfo) { this.nextDepthCrawlInfo = nextDepthCrawlInfo; } public OnePageCrawler(LongTextCrawlInfo crawlInfo) { this.crawlInfo = crawlInfo; this.nextDepthCrawlInfo = new LinkedList<LongTextCrawlInfo>(); } }
package edu.fzu.ir.model; import java.io.Serializable; /** * 长文本采集信息的基本类 * 包含了一个采集的url和采集的深度 * @author asoar * */ public class LongTextCrawlInfo implements Serializable{ /** * */ private static final long serialVersionUID = -8542848699399696344L; private String url; private int depth; public String getUrl() { return url; } public void setUrl(String url) { this.url = url; } public int getDepth() { return depth; } public void setDepth(int depth) { this.depth = depth; } @Override public String toString() { return "LongTextCrawlInfo [url=" + url + ", depth=" + depth + "]"; } public LongTextCrawlInfo(String url, int depth) { super(); this.url = url; this.depth = depth; } }
相关文章推荐
- 一个实现MD5的简洁的java类
- 一个实现MD5的简洁的java类 Jagie 原创
- 一个用Dijkstra算法实现的路由算法的java程序——3 STDijkstra类
- 一个用Dijkstra算法实现的路由算法的java程序——8 GraphMain类
- 一个用Dijkstra算法实现的路由算法的java程序——7 GraphShowAdv类
- 用多层架构构建一个简易留言本
- 一个实现MD5的简洁的java类
- 一个用Dijkstra算法实现的路由算法的java程序——1 GraphAdjList类
- 一个用Dijkstra算法实现的路由算法的java程序——9 图信息文件示例
- 推荐一个关于AOP的AOP@WORK的专栏作者的AOP专题系列文章,主要关于Java下的AOP实现,毕竟Java下的AOP实现确实要比.Net下成熟得多,共12篇
- Java.NET --一个基于Java的Microsoft.NET框架的实现
- 一个将数据文件转换成excel文件打印的java实现方法的代码片断(Struts+poi)
- 求助 用java写一个实现键盘功能按钮的小程序
- 一个用Dijkstra算法实现的路由算法的java程序——2 GraphFromFile类
- 一个实现MD5的简洁的java类
- 一个实现排列和组合的JavaBean
- 用JAVA实现一个分页类
- 一个简单的用JAVA实现的屏幕抓图(源代码)
- 一个用Dijkstra算法实现的路由算法的java程序——6 STDijkstraAdv类
- 一个用Dijkstra算法实现的路由算法的java程序——5 GraphShow类