您的位置:首页 > Web前端

Nutch 1.0 Fetcher 抓取模型解析

2010-06-01 23:15 274 查看
1. 介绍

2. 抓取流程分析
3. 结束
---------------
1. 介绍
Nutch是apache Lucene的一个子项目,它主要用来进行网页数据的收集和索引。它用结合apache的Hadoop和Lucene等子项目。Nutch的一般抓取流程如下:
1. 把初始网址inject到crawlDb中进行准备抓取
2. 用generate模块对crawlDb中的网址进行过滤
3. 用fetcher 模块对generate产生的网页进行抓取
4. 用parse后的网页进行解析
5. 用crawlDb的update工具对抓取网页中的外链接进行更新,使之成为下一轮抓取的种子
本文主要介绍一下Nutch中Fetch流程。流程一般介绍如下:
Nutch的抓取方法是通过基于多队列的生产者与消费者线程模型。
/**
* A queue-based fetcher.
*
* <p>
* This fetcher uses a well-known
model of one producer (a QueueFeeder) and many
* consumers (FetcherThread-s).
*
* <p>
* QueueFeeder reads input fetchlists
and populates a set of FetchItemQueue-s,
* which hold FetchItem-s that
describe the items to be fetched. There are as
* many queues as there are unique
hosts, but at any given time the total number
* of fetch items in all queues is
less than a fixed number (currently set to a
* multiple of the number of
threads).
*
* <p>
* As items are consumed from the
queues, the QueueFeeder continues to add new
* input items, so that their total
count stays fixed (FetcherThread-s may also
* add new items to the queues e.g.
as a results of redirection) - until all
* input items are exhausted, at
which point the number of items in the queues
* begins to decrease. When this
number reaches 0 fetcher will finish.
*
* <p>
* This fetcher implementation
handles per-host blocking itself, instead of
* delegating this work to
protocol-specific plugins. Each per-host queue
* handles its own
"politeness" settings, such as the maximum number of
* concurrent requests and crawl
delay between consecutive requests - and also a
* list of requests in progress, and
the time the last request was finished. As
* FetcherThread-s ask for new items
to be fetched, queues may return eligible
* items or null if for
"politeness" reasons this host's queue is not yet ready.
*
* <p>
* If there are still unfetched items
in the queues, but none of the items are
* ready, FetcherThread-s will
spin-wait until either some items become
* available, or a timeout is reached
(at which point the Fetcher will abort,
* assuming the task is hung).
*
* @author Andrzej Bialecki
*/
2. 抓取流程解析
在Nutch包的org.apache.nutch中有一个Fetcher.java,它就是用来对generate产生的网页进行抓取的。
2.1 Main函数
其中有一个Mian方法,主要有三处参数,segment,threads和noParsing
view plaincopy to clipboardprint?

1.
String usage = "Usage: Fetcher <segment> [-threads n] [-noParsing]";
2.
if (args.length < 1) {
3.
System.err.println(usage);
4.
System.exit(-1);
5.
}
6.
Path segment = new Path(args[0]);
7.
Configuration conf = NutchConfiguration.create();
8.
int threads = conf.getInt("fetcher.threads.fetch", 10);
9.
boolean parsing = true;
10.
for (int i = 1; i < args.length; i++) { // parse command line
11.
if (args[i].equals("-threads")) { // found -threads option
12.
threads = Integer.parseInt(args[++i]);
13.
} else if (args[i].equals("-noParsing"))
14.
parsing = false;
15.
}
16.
conf.setInt("fetcher.threads.fetch", threads);
17.
if (!parsing) {
18.
conf.setBoolean("fetcher.parse", parsing);
19.
}
20.
Fetcher fetcher = new Fetcher(conf); // make a Fetcher
21.
fetcher.fetch(segment, threads, parsing); // run the Fetcher

2.2 Fecther类中的fetch方法
这个方法主要用于对MapReduce的一些初始设置和启动Fecther的run方法,主要代码如下:

view plaincopy to clipboardprint?

1.

2.
// set input path and input format class
3.
FileInputFormat.addInputPath(job, new Path(segment,
4.
CrawlDatum.GENERATE_DIR_NAME));
5.
job.setInputFormat(InputFormat.class); // 设置读入类,进行相应的Split操作,这个类在Fetcher中定义
6.
// set map runnable class
7.
job.setMapRunnerClass(Fetcher.class); // 定义MapRunner类,这个类是Fetcher类,它继承自MapRunnable,这里只进行Map,没有Reduce过程
8.
// set output path and output format class
9.
FileOutputFormat.setOutputPath(job, segment);
10.
job.setOutputFormat(FetcherOutputFormat.class); // 设置输出处理类,它继承自OutputFormat<Text,NutchWritable>
11.
// set output key and value class 设置输出的Key和Value的格式,这两个类都可以被序列化到文件系统上。除非你自己定义OutputFormat
12.
job.setOutputKeyClass(Text.class);
13.
job.setOutputValueClass(NutchWritable.class);
14.
JobClient.runJob(job); // 提交任务,运行 Map

2.3 Fetcher中的run方法
主要是用于启动生产者-消费者这个线程模型,这里的生产者是QueueFeeder,用于收集input得到的数据(网页地址元信息)放到多个队列中去,这里的队列ID是用queueID = proto + "://" + host;协议类型和host来组成唯一的队列ID。
view plaincopy to clipboardprint?

1.
// crawl datum feed thread that used to feed the queue from
2.
// RecordReader. 生产者进行网页抓取数据的生成
3.
feeder = new QueueFeeder(input, fetchQueues, threadCount * 50); // threadCount * 50 为队列容量
4.
// feeder.setPriority((Thread.MAX_PRIORITY + Thread.NORM_PRIORITY) / 2);
5.
feeder.start();
6.
// set non-blocking & no-robots mode for HTTP protocol plugins.
7.
getConf().setBoolean(Protocol.CHECK_BLOCKING, false);
8.
getConf().setBoolean(Protocol.CHECK_ROBOTS, false);
9.
// 生成消费者线程,从公共队列中取出数据来进行抓取
10.
for (int i = 0; i < threadCount; i++) { // spawn threads 启动抓了线程
11.
new FetcherThread(getConf()).start();
12.
}
13.
// select a timeout that avoids a task timeout
14.
long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000) / 2;
15.
do { // wait for threads to exit 等待线程结束
16.
try {
17.
Thread.sleep(1000);
18.
} catch (InterruptedException e) {
19.
}
20.
reportStatus();
21.
LOG.info("-activeThreads=" + activeThreads + ", spinWaiting="
22.
+ spinWaiting.get() + ", fetchQueues.totalSize="
23.
+ fetchQueues.getTotalSize());
24.
if (!feeder.isAlive() && fetchQueues.getTotalSize() < 5) {
25.
fetchQueues.dump();
26.
}
27.
// some requests seem to hang, despite all intentions
28.
if ((System.currentTimeMillis() - lastRequestStart.get()) > timeout) {
29.
if (LOG.isWarnEnabled()) {
30.
LOG.warn("Aborting with " + activeThreads
31.
+ " hung threads.");
32.
}
33.
return;
34.
}
35.
} while (activeThreads.get() > 0);
36.
LOG.info("-activeThreads=" + activeThreads);

2.4 QueueFeederrun方法
主要代码如下:
view plaincopy to clipboardprint?

1.
while (hasMore) {
2.
int feed = size - queues.getTotalSize();
3.
if (feed <= 0) {
4.
// queues are full - spin-wait until they have some free
5.
// space
6.
try {
7.
Thread.sleep(1000);
8.
} catch (Exception e) {
9.
}
10.
;
11.
continue;
12.
} else {
13.
LOG.debug("-feeding " + feed + " input urls ...");
14.
// add feed numbers of fetch items to queue until the feed
15.
// number less that 0
16.
while (feed > 0 && hasMore) {
17.
try {
18.
Text url = new Text();
19.
CrawlDatum datum = new CrawlDatum();
20.
hasMore = reader.next(url, datum); // 这里是用Map的Input中读出Key和Value,并且判断是否读取成功
21.
if (hasMore) {
22.
queues.addFetchItem(url, datum); // 放入队列
23.
cnt++;
24.
feed--;
25.
}
26.
} catch (IOException e) {
27.
LOG.fatal(
28.
"QueueFeeder error reading input, record "
29.
+ cnt, e);
30.
return;
31.
}
32.
}
33.
}
34.
}

2.5 FetcherThreadrun方法
从fetchQueues中读出一个Item,对其进行抓取,如果抓取成功,就从fetchQueue队列中删除,如果不成功,就做相应的处理,这里的判断条件都是根据抓取协议的返回状态来做判断的。具体的抓取协议类型都是从Nutch的插件库中读出来的。
还有一点要注意的是FetchItemQueue有两个队列,一个是queue用于存储Item项,另一个是inProgress列队,用于存储正在被抓取的项,当要从queue队列中得到一个抓取项时,它会从queue队列中把待抓取项移出后放入inProgress队列中,如果当inProgress列队中项大于最大线程数时,就停止返回数据项,这样可以防止很多数据项在等待抓取。在FetchItemQueue中还有一个要注意的是它有一个nextFetchTime,它是有来控制抓取间隔的。
2.5 FetcherThread中的output方法
把抓取的数据写到output中,这就是Map的输出,输出格式就是前面job中定义的job.setOutputKeyClass(Text.class);job.setOutputValueClass(NutchWritable.class);两个方法。
3. 结束
本文只是对Nutch流程的一个简单的介绍,其中一些细节还没有还得急展开。如FetcherOutputFormat等类的使用,这个会在下一次整理好后发出。也希望有兴趣的同学一起来讨论
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: