WebMagic源码分析
2016-09-09 00:00
106 查看
摘要: 涉及内容包括:各组件初始化及可扩展,如何实现多线程,多线程并发控制,程序如何终止,HttpClient使用http连接池发送http请求, URL在Scheduler中去重 ,代理池如何实现反爬虫
WebMagic项目代码分为核心和扩展两部分。核心部分(webmagic-core)是一个精简的、模块化的爬虫实现,而扩展部分则包括一些便利的、实用性的功能。WebMagic的架构设计参照了Scrapy,目标是尽量的模块化,并体现爬虫的功能特点。
这部分提供非常简单、灵活的API,在基本不改变开发模式的情况下,编写一个爬虫。
扩展部分(webmagic-extension)提供一些便捷的功能,例如注解模式编写爬虫等。同时内置了一些常用的组件,便于爬虫开发。
另外WebMagic还包括一些外围扩展和一个正在开发的产品化项目webmagic-avalon。
##2. 核心组件
###2.1 结构图
###2.2 四大组件
1.Downloader:下载器
2.PageProcessor:抽取器
3.Scheduler:调度器
4.Pipeline:结果处理器
##3. 源码分析(主类Spider)
###3.1 各组件初始化及可扩展
####3.1.1
初始化Scheduler:(默认QueueScheduler)
采用新的Scheduler:
####3.1.2
初始化Downloader:(默认HttpClientDownloader)
####3.1.3
初始化Pipeline:(默认ConsolePipeline)
####3.1.4
初始化PageProcessor:(用户自定义完成)
###3.2 如何实现多线程
####3.2.1
初始化线程池(默认Executors.newFixedThreadPool(threadNum))
Executors.newFixedThreadPool作用:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待.
####3.2.2
多线程并发控制
####3.2.3 Java中的ReentrantLock和synchronized两种锁定机制的对比
ReentrantLock默认情况下为不公平锁
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
Request request = scheduler.poll(this);
//当scheduler内目标URL为空时
if (request == null) {
//已经没有线程在运行了, exitWhenComplete默认为true
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
// wait until new url added
waitNewUrl();
}
public void setThread(int thread) {
httpClientGenerator.setPoolSize(thread);
}
public HttpClientGenerator setPoolSize(int poolSize) {
// 将最大连接数增加为poolSize
connectionManager.setMaxTotal(poolSize);
return this;
}
public void push(Request request, Task task) {
logger.trace("get a candidate url {}", request.getUrl());
if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) {
logger.debug("push to queue {}", request.getUrl());
pushWhenNoDuplicate(request, task);
}
}
//从以往保存的本地文件中读取代理信息作为新的代理池
public SimpleProxyPool() {
this(null, true);
}
//以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池
public SimpleProxyPool(List<String[]> httpProxyList) {
this(httpProxyList, true);
}
//以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池(后者可认为操控)
public SimpleProxyPool(List<String[]> httpProxyList, boolean isUseLastProxy) {
if (httpProxyList != null) {
addProxy(httpProxyList.toArray(new String[httpProxyList.size()][]));
}
if (isUseLastProxy) {
if (!new File(proxyFilePath).exists()) {
setFilePath();
}
readProxyList();
timer.schedule(saveProxyTask, 0, saveProxyInterval);
}
}
String[] source = { "::0.0.0.1:0", "::0.0.0.2:0", "::0.0.0.3:0", "::0.0.0.4:0" };
for (String line : source) {
httpProxyList.add(new String[] {line.split(":")[0], line.split(":")[1], line.split(":")[2], line.split(":")[3] });
}
if (isUseLastProxy) {
if (!new File(proxyFilePath).exists()) {
setFilePath();
}
readProxyList();
timer.schedule(saveProxyTask, 0, saveProxyInterval);
}
public void returnProxy(HttpHost host, int statusCode) {
Proxy p = allProxy.get(host.getAddress().getHostAddress());
if (p == null) {
return;
}
switch (statusCode) {
//成功
case Proxy.SUCCESS:
p.setReuseTimeInterval(reuseInterval);
p.setFailedNum(0);
p.setFailedErrorType(new ArrayList<Integer>());
p.recordResponse();
p.successNumIncrement(1);
break;
//失败
case Proxy.ERROR_403:
// banned,try longer interval
p.fail(Proxy.ERROR_403);
p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
break;
//代理被禁
case Proxy.ERROR_BANNED:
p.fail(Proxy.ERROR_BANNED);
p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum());
logger.warn("this proxy is banned >>>> " + p.getHttpHost());
logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
break;
//404
case Proxy.ERROR_404:
// p.fail(Proxy.ERROR_404);
// p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
break;
default:
p.fail(statusCode);
break;
}
//当前代理失败次数超过20:reviveTime = 2 * 60 * 60 * 1000;
if (p.getFailedNum() > 20) {
p.setReuseTimeInterval(reviveTime);
logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
return;
}
//检验代理ip符合下列要求的:
if (p.getFailedNum() > 0 && p.getFailedNum() % 5 == 0) {
if (!ProxyUtils.validateProxy(host)) {
p.setReuseTimeInterval(reviveTime);
logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
return;
}
}
try {
proxyQueue.put(p);
} catch (InterruptedException e) {
logger.warn("proxyQueue return proxy error", e);
}
}
public static boolean validateProxy(HttpHost p) {
if (localAddr == null) {
logger.error("cannot get local IP");
return false;
}
boolean isReachable = false;
Socket socket = null;
try {
socket = new Socket();
socket.bind(new InetSocketAddress(localAddr, 0));
InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort());
socket.connect(endpointSocketAddr, 3000);
logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p);
isReachable = true;
} catch (IOException e) {
logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p);
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
logger.warn("Error occurred while closing socket of validating proxy", e);
}
}
}
return isReachable;
}
Webmagic
##1. WebMagic概览WebMagic项目代码分为核心和扩展两部分。核心部分(webmagic-core)是一个精简的、模块化的爬虫实现,而扩展部分则包括一些便利的、实用性的功能。WebMagic的架构设计参照了Scrapy,目标是尽量的模块化,并体现爬虫的功能特点。
这部分提供非常简单、灵活的API,在基本不改变开发模式的情况下,编写一个爬虫。
扩展部分(webmagic-extension)提供一些便捷的功能,例如注解模式编写爬虫等。同时内置了一些常用的组件,便于爬虫开发。
另外WebMagic还包括一些外围扩展和一个正在开发的产品化项目webmagic-avalon。
##2. 核心组件
###2.1 结构图
###2.2 四大组件
1.Downloader:下载器
2.PageProcessor:抽取器
3.Scheduler:调度器
4.Pipeline:结果处理器
##3. 源码分析(主类Spider)
###3.1 各组件初始化及可扩展
####3.1.1
初始化Scheduler:(默认QueueScheduler)
protected Scheduler scheduler = new QueueScheduler();
采用新的Scheduler:
public Spider setScheduler(Scheduler scheduler) { checkIfRunning(); Scheduler oldScheduler = this.scheduler; this.scheduler = scheduler; if (oldScheduler != null) { Request request; while ((request = oldScheduler.poll(this)) != null) { //复制原来的url到新的scheduler this.scheduler.push(request, this); } } return this; }
####3.1.2
初始化Downloader:(默认HttpClientDownloader)
protected void initComponent() { if (downloader == null) { //用户没有自定义Downloader this.downloader = new HttpClientDownloader(); } if (pipelines.isEmpty()) { //用户没有自定义Pipeline pipelines.add(new ConsolePipeline()); } downloader.setThread(threadNum); if (threadPool == null || threadPool.isShutdown()) { if (executorService != null && !executorService.isShutdown()) { threadPool = new CountableThreadPool(threadNum, executorService); } else { threadPool = new CountableThreadPool(threadNum); } } if (startRequests != null) { for (Request request : startRequests) { scheduler.push(request, this); } startRequests.clear(); } startTime = new Date(); }
####3.1.3
初始化Pipeline:(默认ConsolePipeline)
####3.1.4
初始化PageProcessor:(用户自定义完成)
###3.2 如何实现多线程
####3.2.1
初始化线程池(默认Executors.newFixedThreadPool(threadNum))
Executors.newFixedThreadPool作用:创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待.
public CountableThreadPool(int threadNum) { this.threadNum = threadNum; this.executorService = Executors.newFixedThreadPool(threadNum); }
####3.2.2
多线程并发控制
public void execute(final Runnable runnable) { if (threadAlive.get() >= threadNum) { try { reentrantLock.lock();//同步锁 下面为保护代码块 while (threadAlive.get() >= threadNum) { try { condition.await(); } catch (InterruptedException e) { } } } finally { reentrantLock.unlock(); } } threadAlive.incrementAndGet(); executorService.execute(new Runnable() { @Override public void run() { try { runnable.run(); } finally { try { reentrantLock.lock(); threadAlive.decrementAndGet(); //线程数量减少一个时,通过signal()方法通知前面condition.await()的线程 condition.signal(); } finally { reentrantLock.unlock(); } } } }); }
####3.2.3 Java中的ReentrantLock和synchronized两种锁定机制的对比
ReentrantLock默认情况下为不公平锁
private ReentrantLock lock = new ReentrantLock(true); //公平锁 try { lock.lock(); //如果被其它资源锁定,会在此等待锁释放,达到暂停的效果 //操作 } finally { lock.unlock(); }``` 不公平锁与公平锁的区别: 公平情况下,操作会排一个队按顺序执行,来保证执行顺序。(会消耗更多的时间来排队) 不公平情况下,是无序状态允许插队,jvm会自动计算如何处理更快速来调度插队。(如果不关心顺序,这个速度会更快) #### 3.2.4 AtomicInteger && CAS >AtomicInteger,一个提供原子操作的Integer的类。在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字。而AtomicInteger则通过一种线程安全的加减操作接口。 首先要说一下,AtomicInteger类compareAndSet通过原子操作实现了CAS操作,最底层基于汇编语言实现 CAS是Compare And Set的一个简称,如下理解: 1,已知当前内存里面的值current和预期要修改成的值new传入 2,内存中AtomicInteger对象地址对应的真实值(因为有可能别修改)real与current对比,相等表示real未被修改过,是“安全”的,将new赋给real结束然后返回;不相等说明real已经被修改,结束并重新执行1直到修改成功 ####3.2.5 程序如何终止
while (!Thread.currentThread().isInterrupted() && stat.get() == STAT_RUNNING) {
Request request = scheduler.poll(this);
//当scheduler内目标URL为空时
if (request == null) {
//已经没有线程在运行了, exitWhenComplete默认为true
if (threadPool.getThreadAlive() == 0 && exitWhenComplete) {
break;
}
// wait until new url added
waitNewUrl();
}
上述while循环结束,则程序完成任务并终止 ###3.3 HttpClient使用http连接池发送http请求 将用户设置的线程数设置为httpclient最大连接池数
public void setThread(int thread) {
httpClientGenerator.setPoolSize(thread);
}
public HttpClientGenerator setPoolSize(int poolSize) {
// 将最大连接数增加为poolSize
connectionManager.setMaxTotal(poolSize);
return this;
}
###3.4 URL在Scheduler中去重 将下载结果页面中的链接抽取出来并放入scheduler中
public void push(Request request, Task task) {
logger.trace("get a candidate url {}", request.getUrl());
if (!duplicatedRemover.isDuplicate(request, task) || shouldReserved(request)) {
logger.debug("push to queue {}", request.getUrl());
pushWhenNoDuplicate(request, task);
}
}
####3.4.1 redischeduler URL去重复 ```boolean isDuplicate = jedis.sismember(getSetKey(task), request.getUrl());``` RedisScheduler 中判断url是否重复的方法,因为一个Spider就是对应只有一个UUID,故上述的判断则是:判断当前的url是否是uuid集合的元素 >System.out.println(jedis.sismember("sname", "minxr"));// 判断 minxr是否是sname集合的元素 ####3.4.2 bloomFilter URL去重复 ```boolean isDuplicate = bloomFilter.mightContain(getUrl(request));``` ####3.4.3 hashset URL去重复 ```public boolean isDuplicate(Request request, Task task) { return !urls.add(getUrl(request)); }``` 优点: 1)节约缓存空间(空值的映射),不再需要空值映射。 2)减少数据库或缓存的请求次数。 3)提升业务的处理效率以及业务隔离性。 缺点: 1)存在误判的概率。 2)传统的Bloom Filter不能作删除操作。 ###3.5 抽取部分API | 方法 | 说明 | 示例 | | ------------- |:-------------:| -----:| | xpath(String xpath) | 使用XPath选择| html.xpath("//div[@class='title']")| | $(String selector) | 使用Css选择器选择| html.$("div.title") | | css(String selector) | 功能同$(),使用Css选择器选择 | html.css("div.title") | |regex(String regex) | 使用正则表达式抽取 | html.regex("\(.*?)\") | | replace(String regex, String replacement) | 替换内容 | html.replace("\","") | 这部分抽取API返回的都是一个Selectable接口,意思是说,抽取是支持链式调用的。 ###3.6 代理池 ####3.6.1 代理池初始化:
//从以往保存的本地文件中读取代理信息作为新的代理池
public SimpleProxyPool() {
this(null, true);
}
//以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池
public SimpleProxyPool(List<String[]> httpProxyList) {
this(httpProxyList, true);
}
//以往保存的本地文件中读取代理+用户输入的httpProxyList合并为新的代理池(后者可认为操控)
public SimpleProxyPool(List<String[]> httpProxyList, boolean isUseLastProxy) {
if (httpProxyList != null) {
addProxy(httpProxyList.toArray(new String[httpProxyList.size()][]));
}
if (isUseLastProxy) {
if (!new File(proxyFilePath).exists()) {
setFilePath();
}
readProxyList();
timer.schedule(saveProxyTask, 0, saveProxyInterval);
}
}
####3.6.2 httpProxyList怎么传值
String[] source = { "::0.0.0.1:0", "::0.0.0.2:0", "::0.0.0.3:0", "::0.0.0.4:0" };
for (String line : source) {
httpProxyList.add(new String[] {line.split(":")[0], line.split(":")[1], line.split(":")[2], line.split(":")[3] });
}
####3.6.3 本地文件Proxy获存储与获取:定时任务
if (isUseLastProxy) {
if (!new File(proxyFilePath).exists()) {
setFilePath();
}
readProxyList();
timer.schedule(saveProxyTask, 0, saveProxyInterval);
}
saveProxyTask()函数负责把最新的代理池ip写入到本地指定文件 ####3.6.4 使用DelayQueue管理Proxy 目的:可以根据compareTo方法制定的优先取出代理池中使用间隔较短的代理(一开始默认都为1.5s)优先取出并执行. 目前代理池的策略是: * 1. 在添加时连接相应端口做校验 * 2. 每个代理有1.5S的使用间隔 * 3. 每次失败后,下次取出代理的时间改为1.5S*失败次数 * 4. 如果代理失败次数超过20次,则直接丢弃
public void returnProxy(HttpHost host, int statusCode) {
Proxy p = allProxy.get(host.getAddress().getHostAddress());
if (p == null) {
return;
}
switch (statusCode) {
//成功
case Proxy.SUCCESS:
p.setReuseTimeInterval(reuseInterval);
p.setFailedNum(0);
p.setFailedErrorType(new ArrayList<Integer>());
p.recordResponse();
p.successNumIncrement(1);
break;
//失败
case Proxy.ERROR_403:
// banned,try longer interval
p.fail(Proxy.ERROR_403);
p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
break;
//代理被禁
case Proxy.ERROR_BANNED:
p.fail(Proxy.ERROR_BANNED);
p.setReuseTimeInterval(10 * 60 * 1000 * p.getFailedNum());
logger.warn("this proxy is banned >>>> " + p.getHttpHost());
logger.info(host + " >>>> reuseTimeInterval is >>>> " + p.getReuseTimeInterval() / 1000.0);
break;
//404
case Proxy.ERROR_404:
// p.fail(Proxy.ERROR_404);
// p.setReuseTimeInterval(reuseInterval * p.getFailedNum());
break;
default:
p.fail(statusCode);
break;
}
//当前代理失败次数超过20:reviveTime = 2 * 60 * 60 * 1000;
if (p.getFailedNum() > 20) {
p.setReuseTimeInterval(reviveTime);
logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
return;
}
//检验代理ip符合下列要求的:
if (p.getFailedNum() > 0 && p.getFailedNum() % 5 == 0) {
if (!ProxyUtils.validateProxy(host)) {
p.setReuseTimeInterval(reviveTime);
logger.error("remove proxy >>>> " + host + ">>>>" + p.getFailedType() + " >>>> remain proxy >>>> " + proxyQueue.size());
return;
}
}
try {
proxyQueue.put(p);
} catch (InterruptedException e) {
logger.warn("proxyQueue return proxy error", e);
}
}
使用Socket来校验代理是否有效,客户端为本地.创建与代理的连接
public static boolean validateProxy(HttpHost p) {
if (localAddr == null) {
logger.error("cannot get local IP");
return false;
}
boolean isReachable = false;
Socket socket = null;
try {
socket = new Socket();
socket.bind(new InetSocketAddress(localAddr, 0));
InetSocketAddress endpointSocketAddr = new InetSocketAddress(p.getAddress().getHostAddress(), p.getPort());
socket.connect(endpointSocketAddr, 3000);
logger.debug("SUCCESS - connection established! Local: " + localAddr.getHostAddress() + " remote: " + p);
isReachable = true;
} catch (IOException e) {
logger.warn("FAILRE - CAN not connect! Local: " + localAddr.getHostAddress() + " remote: " + p);
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
logger.warn("Error occurred while closing socket of validating proxy", e);
}
}
}
return isReachable;
}
相关文章推荐
- java对世界各个时区(TimeZone)的通用转换处理方法(转载)
- java-注解annotation
- java-模拟tomcat服务器
- java-用HttpURLConnection发送Http请求.
- java-WEB中的监听器Lisener
- Android IPC进程间通讯机制
- Android Native 绘图方法
- Android java 与 javascript互访(相互调用)的方法例子
- 从源码安装Mysql/Percona 5.5
- 介绍一款信息管理系统的开源框架---jeecg
- 聚类算法之kmeans算法java版本
- java实现 PageRank算法
- PropertyChangeListener简单理解
- c++11 + SDL2 + ffmpeg +OpenAL + java = Android播放器
- 插入排序
- 冒泡排序