您的位置:首页 > 编程语言 > Java开发

java爬虫框架webmagic-core-0.5.3源码分析

2016-10-12 14:27 453 查看
webmagic框架使用java语言,抽象出爬虫常用操作,封装了爬虫中使用频繁的库类,使整个爬取逻辑更加清晰有条理。

关于webmagic的使用指南在官网描述的很清楚,简单介绍下项目结构。

先看下官网的一个流程图介绍:



这里展现了爬虫的4个组件,Scheduler、Downloader、PageProcessor、Pipeline,它们相互独立,协同合作。它们直接通过Request、Page、ResultItem来进行通信。

再看看webmagic-core源码的结构:

Page.java
Request.java
ResultItems.java
Site.java
Spider.java
SpiderListener.java
Task.java

Downloader

AbstractDownloader.java
Downloader.java
HttpClientDownloader.java
HttpClientGenerator.java

Scheduler

DuplicateRemovedScheduler.java
MonitorableScheduler.java
PriorityScheduler.java
QueueScheduler.java
Scheduler.java

Pipeline

CollectorPipeline.java
ConsolePipeline.java
FilePipeline.java        
Pipeline.java                
ResultItemsCollectorPipeline.java

Processor

PageProcessor.java
SimplePageProcessor.java

Proxy

Proxy.java                
ProxyPool.java        
SimpleProxyPool.java

Selector

AbstractSelectable.java        
CssSelector.java                
HtmlNode.java                        
OrSelector.java                        
RegexSelector.java                
Selector.java                        
XpathSelector.java
AndSelector.java                
ElementSelector.java                
Json.java                        
PlainText.java                        
ReplaceSelector.java                
Selectors.java                
BaseElementSelector.java        
Html.java                        
JsonPathSelector.java                
RegexResult.java                
Selectable.java                        
SmartContentSelector.java

Thread

CountableThreadPool.java

Utils

Experimental.java        
FilePersistentBase.java        
HttpConstant.java        
NumberUtils.java        
ProxyUtils.java                
UrlUtils.java

Spider是实现了Task,创建一个Spider就是新建一个任务,这个类中调用其他组件,是整体爬取流程的核心。
Page是downloader下载页面后的保存结果,之后要将Page传入processor中根据自己需求进行处理。
Site是对爬取页面进行配置,如设置起始页地址,重试次数,重试间隔等等。
ResultItems是Page处理之后的结果,之后要将ResultItems传入Pipeline中进行结果处理。
SpiderListener是对Spider进行监听,处理成功还是失败。利用这个功能,你可以查看爬虫的执行情况——已经下载了多少页面、还有多少页面、启动了多少线程等信息。该功能通过JMX实现,你可以使用Jconsole等JMX工具查看本地或者远程的爬虫信息。
Request是对一次请求的抽象,其中包含请求的url和method等属性。

downloader下载器,默认HttpClientDownloader。
secheduler负责待抓取urls队列的维护。
pipeline主要负责对结果ResultItems的处理,默认ConsolePipeline,(可由用户自定义)。
processor负责解析页面,抽取结果保存到ResultItems,(可由用户自定义)。
proxy代理相关配置。
selector页面元素选择器,包括css、正则、xpath等等。
thread线程池管理。
utils一些工具类。

这里以maven安装方式,举一个简单的例子来描述各个类的作用。
以爬取这个网址的列表为例http://www.dailianmeng.com/p2pblacklist/index.html,我们需要把这个列表全部爬取并保存,使用webmagic只需要新建三个文件。

App.java

public class App{
Logger logger;

App() {
InputStream inputStream = App.class.getResourceAsStream("/log4j.properties");
if (inputStream != null)
PropertyConfigurator.configure(inputStream);
else
BasicConfigurator.configure();

logger = LoggerFactory.getLogger(getClass());
}

public static void main(String[] args) {
App app = new App();
app.run();
}

void run() {

logger.info("spider for dlm start....");

Spider.create(new DLMPageprocessor())
.addUrl(DLMPageprocessor.START_PAGE)
.addPipeline(new DLMPipeline())
.addPipeline(new ConsolePipeline())
.thread(5)
.run();
}
}

DLMPageprocessor.java

package com.xxx.dailianmeng;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.Page;
import us.codecraft.webmagic.Site;
import us.codecraft.webmagic.processor.PageProcessor;
import us.codecraft.webmagic.selector.Selectable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

public class DLMPageprocessor implements PageProcessor {
private Site site = Site.me().setRetryTimes(3).setSleepTime(1000);
static Logger logger = LoggerFactory.getLogger(DLMPageprocessor.class);
public static String START_PAGE = "http://www.dailianmeng.com/p2pblacklist/index.html";

@Override
public void process(Page page) {
page.addTargetRequests(page.getHtml().links().regex("/p2pblacklist/index.html\\?P2pBlacklist_page=\\w+").all());

ArrayList list = new ArrayList();

Selectable selectable = page.getHtml().xpath("//div[@class='table-responsive']//table//tbody//tr");
for (Selectable sel : selectable.nodes()) {
String[] str = new String[10];
sel.xpath("//td/html()").all().toArray(str);

Map map = new HashMap<>();
map.put("name", str[0].replace(" ", ""));
map.put("idcard", str[1].replace(" ", ""));
map.put("mobile", str[2].replace(" ", ""));
map.put("addr", str[3].replace(" ", ""));
map.put("totalAmount", str[4].replace(" ", "0.00"));
map.put("paidAmount", str[5].replace(" ", "0.00"));
map.put("unpaidAmount", str[6].replace(" ", "0.00"));
map.put("debtDate", str[7].replace(" ", "0000-00-00"));
map.put("debtTerms", str[8].replace(" ", ""));
map.put("extId", sel.xpath("//td//a").links().get());

list.add(map);
}

page.putField("mysql", list);
}

@Override
public Site getSite() {
return site;
}
}

DLMPipeline.java

package com.xxx.dailianmeng;

import com.u51.xsg.mysql.Mysql;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.ResultItems;
import us.codecraft.webmagic.Task;
import us.codecraft.webmagic.pipeline.Pipeline;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;

public class DLMPipeline implements Pipeline {
Logger logger = LoggerFactory.getLogger(DLMPipeline.class);

public void process(ResultItems resultItems, Task task) {
Iterator iterator = resultItems.getAll().entrySet().iterator();

while (iterator.hasNext()) {
//处理结果
}
}
}

爬取操作非常简单,App.java中一个main函数,Pageprocessor对爬取的页面进行处理,Pipeline对处理结果进行操作,用户只需关心业务逻辑,而不用关心爬虫实现。

那么在简单操作的背后,具体是怎么实现的呢?

首先看App.java中创建了一个Spider对象,Spider也控制着整个爬取流程。所以我们可以看下Spider的run方法里都做了什么。

@Override
public void run() {
checkRunningStat();
initComponent();
logger.info("Spider " + getUUID() + " started!");
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
Request request = scheduler.poll(this);
if (request == null) {
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
// wait until new url added
waitNewUrl();
} else {
final Request requestFinal = request;
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
processRequest(requestFinal);
onSuccess(requestFinal);
} catch (Exception e) {
onError(requestFinal);

d2a6
logger.error("process request " + requestFinal + " error", e);
} finally {
if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) requestFinal.getExtra(Request.PROXY), (Integer) requestFinal
.getExtra(Request.STATUS_CODE));
}
pageCount.incrementAndGet();
signalNewUrl();
}
}
});
}
}
stat.set(STAT_STOPPED);
// release some resources
if (destroyWhenExit) {
close();
}
}run方法总大致分成俩步操作,先初始化各个组件initComponent(),再从线程池中取一个Request出来处理。
1、initComponent():

protected void initComponent() {
if (downloader == null) {
this.downloader = new HttpClientDownloader();
}
if (pipelines.isEmpty()) {
pipelines.add(new ConsolePipeline());
}
downloader.setThread(threadNum);
if (threadPool == null || threadPool.isShutdown()) {
if (executorService != null && !executorService.isShutdown()) {
threadPool = new CountableThreadPool(threadNum, executorService);
} else {
threadPool = new CountableThreadPool(threadNum);
}
}
if (startRequests != null) {
for (Request request : startRequests) {
scheduler.push(request, this);
}
startRequests.clear();
}
startTime = new Date();
}初始化默认的downloader,添加pipelines默认为ConsolePipeline(),初始化线程池,如果设置了startRequests就会把里面的request推入schedule队列中。

2、如果当前线程没有被打断,而且状态是RUNNING那么就从schedule去一个request出来处理,如果schedule取出的request是空,那么等待新的请求加进来,如果设置了没任务就结束爬取进程那么此时就break结束。
在线程池中启用一个线程来进行爬取,执行processRequest()方法。

protected void processRequest(Request request) {
Page page = downloader.download(request, this);
if (page == null) {
sleep(site.getRetrySleepTime());
onError(request);
return;
}
// for cycle retry
if (page.isNeedCycleRetry()) {
extractAndAddRequests(page, true);
sleep(site.getRetrySleepTime());
return;
}
pageProcessor.process(page);
extractAndAddRequests(page, spawnUrl);
if (!page.getResultItems().isSkip()) {
for (Pipeline pipeline : pipelines) {
pipeline.process(page.getResultItems(), this);
}
}
//for proxy status management
request.putExtra(Request.STATUS_CODE, page.getStatusCode());
sleep(site.getSleepTime());
}
首先调用downloader下载页面,如果需要周期下载,那么将页面中的地址提取出来添加到待抓取队列。

然后将page丢给pageProcessor的process方法,上面这个例子我们的pageProcessor = new DLMPageprocessor(),在process方法中

page.addTargetRequests(page.getHtml().links().regex("/p2pblacklist/index.html\\?P2pBlacklist_page=\\w+").all());


我们将下一步需要爬取的地址存入targetRequests变量中,这里再用extractAndAddRequests()方法从targetRequests变量中提取地址,放入到schedule中。

process方法中的

page.putField("mysql", list);


会将处理的结果存入到ResultItems变量里,这里循环pipelines将Resultem和当前Spider丢个各个注册的pipeline进行处理,这里便是能注册多个pipeline的原因。
最后将请求状态码存入Request.extras变量中, 对代理管理有用。

到这里简单的介绍了一下爬取的流程,现在总结以下几个点。

1、所谓的pageprocessor、pipeline组件化就是将这些对象设为可变的变量,定义统一接口。

2、爬取的流程都在Spider类中,逻辑比较清晰不复杂。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: