您的位置:首页 > 编程语言 > Python开发

python爬虫-->下载缓存

2017-07-03 13:25 302 查看
上一篇博文中,我们讲解了如何从下载的网页中抓取自己感兴趣的数据,以及如何把获取的数据保存到表格中。但是如果我们突然又想抓取另外一个字段的数据,怎么办呢?不可能重新写程序,重新爬取吧?对于一个小型网站,我们可以重新抓取,但是对于一个拥有数百万个网站而言,重新爬取耗时太大。因此本博文提出对爬取的网页进行缓存的方案。

本篇博文主要任务是将深度为maxdepth内所有url对应的网页下载下来,并且分别进行磁盘缓存和数据库缓存。这里面的难点是如何根据url映射出独一无二的文件名。

这篇博文里面的爬虫程序的设计流程如下:

1)link_crawler.py爬取深度为maxdepth内所有符合正则表达式的网址。

2)在link_crawler爬取网址时,利用downloader.py下载该网址对应的网页。

3)在downloader下载网页时,首先检查缓存cache中是否有该网页缓存,若有并查看其是否已经过期。若无则重新下载并存进缓存cache内。

前面我们在download网页时,每次下载都会throttle一下,如果我们已经有缓存,从缓存中取出数据则不需要Throttle,只需要在真正发生下载时进行限速即可。故我们需要修改download函数。为避免每次下载都得传入多个参数,我们借此将download函数重构为一个类。这样只要在创建一个类对象时一次性传入多个参数即可。如果不定义成类而是函数那么在每次调用时都需要传入许多参数很麻烦。

因为在下载过程中可能会出现错误,有必要了解下各种错误代码的含义:



下面是download.py代码:

#coding:utf-8

import urlparse
import urllib2
import random
import time
from datetime import datetime, timedelta
import socket

DEFAULT_AGENT = 'wswp'
DEFAULT_DELAY = 5
DEFAULT_RETRIES = 1
DEFAULT_TIMEOUT = 60

class Downloader:
def __init__(self, delay=DEFAULT_DELAY, user_agent=DEFAULT_AGENT, proxies=None, num_retries=DEFAULT_RETRIES,
timeout=DEFAULT_TIMEOUT, opener=None, cache=None):
socket.setdefaulttimeout(timeout) ## 设置默认网络连接超时时间
self.throttle = Throttle(delay)
self.user_agent = user_agent
self.proxies = proxies
self.num_retries = num_retries
self.opener = opener
self.cache = cache ## 传入缓存类对象

def __call__(self, url):
'''
先从缓存中取出该url对应的数据,如果缓存中有该数据则不必下载也不必限速
如果缓存中没有该数据,则需要重新下载,并且下载前需要限速throttle
'''
result = None
if self.cache:
try:
result = self.cache

上述downloader代码中,注意_call_特殊方法,该方法会首先检查缓存中是否已经定义。如果已经定义,则检查之前是否已经缓存了该url,如果该url已经定义,则检查之前的下载是否遇到了服务器错误,如果也没遇到则表面该缓存可用。

在cache中,我们通过调用result=cache[url]从cache中加载数据,并通过cache[url]=result向cache中保存结果。这是python内建字典类型的使用方式。为了支持该接口,我们的cache类需要定义_getitem__()和_setitem__()这两个特殊类方法。

为了支持缓存功能,链接爬虫需要做一些微调。以下是link_crawler.py代码:

#coding:utf-8
import re
import urlparse
import urllib2
import time
import datetime
import robotparser
from  downloader import Downloader

def link_crawler(seed_url, link_regex=None, delay=5, max_depth=-1, max_urls=-1, user_agent='wswp', proxies=None,
num_retries=1, scrape_callback=None, cache=None):
"""Crawl from the given seed URL following links matched by link_regex
"""
# the queue of URL's that still need to be crawled
crawl_queue = [seed_url]
# the URL's that have been seen and at what depth
seen = {seed_url: 0}
# track how many URL's have been downloaded
num_urls = 0
rp = get_robots(seed_url)
##在此处创建一个Downloader类对象,在此处一次性的传入多个参数,在下面调用时就可以避免多次传入
D=Downloader(delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, cache=cache)

while crawl_queue:
url = crawl_queue.pop()
depth = seen[url]
# check url passes robots.txt restrictions
if rp.can_fetch(user_agent, url):
html = D(url)## 下载url对应的html网页
links = []
if scrape_callback:
links.extend(scrape_callback(url, html) or [])

if depth != max_depth:
# can still crawl further
if link_regex:
# filter for links matching our regular expression
links.extend(link for link in get_links(html) if re.search(link_regex, link))

for link in links:
link = normalize(seed_url, link)
# check whether already crawled this link
if link not in seen:
seen[link] = depth + 1
# check link is within same domain
if same_domain(seed_url, link):
# success! add this new link to queue
crawl_queue.append(link)

# check whether have reached downloaded maximum
num_urls += 1
if num_urls == max_urls:
break
else:
print 'Blocked by robots.txt:', url

def normalize(seed_url, link):
"""Normalize this URL by removing hash and adding domain
"""
link, _ = urlparse.urldefrag(link)  # remove hash to avoid duplicates
return urlparse.urljoin(seed_url, link)

def same_domain(url1, url2):
"""Return True if both URL's belong to same domain
"""
return urlparse.urlparse(url1).netloc == urlparse.urlparse(url2).netloc

def get_robots(url):
"""Initialize robots parser for this domain
"""
rp = robotparser.RobotFileParser()
rp.set_url(urlparse.urljoin(url, '/robots.txt'))
rp.read()
return rp

def get_links(html):
"""Return a list of links from html
"""
# a regular expression to extract all links from the webpage
webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)
# list of all links from the webpage
return webpage_regex.findall(html)

if __name__ == '__main__':
link_crawler('http://example.webscraping.com', '/(index|view)', delay=0, num_retries=1, user_agent='BadCrawler')
link_crawler('http://example.webscraping.com', '/(index|view)', delay=0, num_retries=1, max_depth=1,
user_agent='GoodCrawler')


那么如何把缓存的数据保存到本地呢?首先我们来讲保存到本地磁盘。



为了满足各个主流文件系统的限制。我们需要做以下几点:

①将除数字,字母,基本符号以外的符号替换为下划线。

②各个目录长度需要限制在255字符内。

③以url命名本地文件名,尽量做到每个url文件名不重复

我们考虑以下url:

-
http://example.webscraping.com/places/default/index/" target=_blank>
except KeyError:
# 如果这个url不在缓存中
pass
else:
if self.num_retries > 0 and 500 <= result['code'] < 600:
# 如果有服务器错误,说明之前缓存的数据有误不可用
#并且num_retries>0,则重新下载
result = None
if result is None:
# 此时才是真正发生下载,不是从缓存中下载获取,故需要限速,防止被封
self.throttle.wait(url)
proxy = random.choice(self.proxies) if self.proxies else None
headers = {'User-agent': self.user_agent}
result = self.download(url, headers, proxy=proxy, num_retries=self.num_retries)
if self.cache:
# 把下载得到的html网页存进缓存中
self.cache[url] = result
return result['html']

def download(self, url, headers, proxy, num_retries, data=None):
print 'Downloading:', url
request = urllib2.Request(url, data, headers or {})
opener = self.opener or urllib2.build_opener()
if proxy:
proxy_params = {urlparse.urlparse(url).scheme: proxy}
opener.add_handler(urllib2.ProxyHandler(proxy_params))
try:
response = opener.open(request)
html = response.read()
code = response.code
except Exception as e:
print 'Download error:', str(e)
html = ''
if hasattr(e, 'code'):
code = e.code
if num_retries > 0 and 500 <= code < 600:
# 服务器错误,并且下载num_retries>0,需重新下载
return self._get(url, headers, proxy, num_retries - 1, data)
else:
code = None
##返回html同时,也返回其HTTP状态码用来检查该html可用不可用
return {'html': html, 'code': code}

class Throttle:
"""
爬虫速度过快,可能会造成服务器过载,或者是ip地址被封,为了避免这个问题,我们的爬虫将会设置一个delay标识,
用于设定请求同一域名时最小时间间隔。注意是同一域名。
爬取同一域名下不同网页时,需要注意两次下载之间至少需要1秒钟的间隔。
"""

def __init__(self, delay):
# amount of delay between downloads for each domain
self.delay = delay
# timestamp of when a domain was last accessed
self.domains = {}

def wait(self, url):
"""Delay if have accessed this domain recently
"""
domain = urlparse.urlsplit(url).netloc
last_accessed = self.domains.get(domain)
if self.delay > 0 and last_accessed is not None:
sleep_secs = self.delay - (datetime.now() - last_accessed).seconds
if sleep_secs > 0:
time.sleep(sleep_secs)
self.domains[domain] = datetime.now()[/code]

上述downloader代码中,注意_call_特殊方法,该方法会首先检查缓存中是否已经定义。如果已经定义,则检查之前是否已经缓存了该url,如果该url已经定义,则检查之前的下载是否遇到了服务器错误,如果也没遇到则表面该缓存可用。

在cache中,我们通过调用result=cache[url]从cache中加载数据,并通过cache[url]=result向cache中保存结果。这是python内建字典类型的使用方式。为了支持该接口,我们的cache类需要定义_getitem__()和_setitem__()这两个特殊类方法。

为了支持缓存功能,链接爬虫需要做一些微调。以下是link_crawler.py代码:

#coding:utf-8
import re
import urlparse
import urllib2
import time
import datetime
import robotparser
from  downloader import Downloader

def link_crawler(seed_url, link_regex=None, delay=5, max_depth=-1, max_urls=-1, user_agent='wswp', proxies=None,
num_retries=1, scrape_callback=None, cache=None):
"""Crawl from the given seed URL following links matched by link_regex
"""
# the queue of URL's that still need to be crawled
crawl_queue = [seed_url]
# the URL's that have been seen and at what depth
seen = {seed_url: 0}
# track how many URL's have been downloaded
num_urls = 0
rp = get_robots(seed_url)
##在此处创建一个Downloader类对象,在此处一次性的传入多个参数,在下面调用时就可以避免多次传入
D=Downloader(delay=delay, user_agent=user_agent, proxies=proxies, num_retries=num_retries, cache=cache)

while crawl_queue:
url = crawl_queue.pop()
depth = seen[url]
# check url passes robots.txt restrictions
if rp.can_fetch(user_agent, url):
html = D(url)## 下载url对应的html网页
links = []
if scrape_callback:
links.extend(scrape_callback(url, html) or [])

if depth != max_depth:
# can still crawl further
if link_regex:
# filter for links matching our regular expression
links.extend(link for link in get_links(html) if re.search(link_regex, link))

for link in links:
link = normalize(seed_url, link)
# check whether already crawled this link
if link not in seen:
seen[link] = depth + 1
# check link is within same domain
if same_domain(seed_url, link):
# success! add this new link to queue
crawl_queue.append(link)

# check whether have reached downloaded maximum
num_urls += 1
if num_urls == max_urls:
break
else:
print 'Blocked by robots.txt:', url

def normalize(seed_url, link):
"""Normalize this URL by removing hash and adding domain
"""
link, _ = urlparse.urldefrag(link)  # remove hash to avoid duplicates
return urlparse.urljoin(seed_url, link)

def same_domain(url1, url2):
"""Return True if both URL's belong to same domain
"""
return urlparse.urlparse(url1).netloc == urlparse.urlparse(url2).netloc

def get_robots(url):
"""Initialize robots parser for this domain
"""
rp = robotparser.RobotFileParser()
rp.set_url(urlparse.urljoin(url, '/robots.txt'))
rp.read()
return rp

def get_links(html):
"""Return a list of links from html
"""
# a regular expression to extract all links from the webpage
webpage_regex = re.compile('<a[^>]+href=["\'](.*?)["\']', re.IGNORECASE)
# list of all links from the webpage
return webpage_regex.findall(html)

if __name__ == '__main__':
link_crawler('http://example.webscraping.com', '/(index|view)', delay=0, num_retries=1, user_agent='BadCrawler')
link_crawler('http://example.webscraping.com', '/(index|view)', delay=0, num_retries=1, max_depth=1,
user_agent='GoodCrawler')


那么如何把缓存的数据保存到本地呢?首先我们来讲保存到本地磁盘。



为了满足各个主流文件系统的限制。我们需要做以下几点:

①将除数字,字母,基本符号以外的符号替换为下划线。

②各个目录长度需要限制在255字符内。

③以url命名本地文件名,尽量做到每个url文件名不重复

我们考虑以下url:

- [url=http://example.webscraping.com/places/default/index/]http://example.webscraping.com/places/default/index/


- [url=http://example.webscraping.com/places/default/index/1]http://example.webscraping.com/places/default/index/1

- http://example.webscraping.com/places/default/index/2

我们希望这几个url都能保存下来,对于第二个第三个url我们希望以这样的结构保存下来:



就是以index作为目录名,1或2作为子路径(文件名)那么对于第一个url如何保存呢?第一个url以斜杠结尾,斜杠后面为空字符串就会是一个非法的文件名。如果直接以index作为子路径,那么其他的以index为目录的url无法保存。我们的解决办法在其后面添加index.html作为文件名。如果url根本没有path的话,我们直接添加/index.html。

当前在磁盘上缓存的网页数据,随着时间可能会过期,因为网页内容随时都可能发生改变。此时我们需要设置一个timedelta,设置一个过期时间。一旦缓存的网页缓存时间超过这个时间间隔就需要重新下载缓存。

下面是缓存类disk_cache.py函数代码:

#coding:utf-8
import os
import re
import urlparse
import shutil
import zlib
from datetime import datetime, timedelta

try:
import cPickle as pickle
except ImportError:
import pickle
from link_crawler import link_crawler

class DiskCache:

def __init__(self, cache_dir='cache', expires=timedelta(days=30), compress=True):
"""
cache_dir: 数据缓存的根目录地址
expires: 若缓存数据的存储时间超过expires,则文件失效,将其删除
compress: 布尔变量,是否将数据压缩存进缓存
"""
self.cache_dir = cache_dir
self.expires = expires
self.compress = compress

def __getitem__(self, url):
"""
由url,从硬盘加载数据
"""
path = self.url_to_path(url) ##由url映射出其存储地址
if os.path.exists(path):
with open(path, 'rb') as fp:
data = fp.read()
if self.compress:
data = zlib.decompress(data)
result, timestamp = pickle.loads(data)
if self.has_expired(timestamp): ##查看加载出的数据是否已经过期
raise KeyError(url + ' has expired')
return result
else:
# 该url数据还没有被加载到缓存
raise KeyError(url + ' does not exist')

def __setitem__(self, url, result):
"""Save data to disk for this url
"""
path = self.url_to_path(url)
folder = os.path.dirname(path)
if not os.path.exists(folder):
os.makedirs(folder)

## 将该url对应数据以及当前时间存进本地磁盘内,将时间存进为后期检查是否已经过期
##将输入数据转化为字符串,然后保存到磁盘
data = pickle.dumps((result, datetime.utcnow()))
if self.compress: ##是否对数据进行压缩存储
data = zlib.compress(data)
with open(path, 'wb') as fp:
fp.write(data)

def __delitem__(self, url):
"""Remove the value at this key and any empty parent sub-directories
"""
path = self._key_path(url)
try:
os.remove(path)
os.removedirs(os.path.dirname(path))
except OSError:
pass

def url_to_path(self, url):
"""
根据url创建一个本地文件,用来存储该url对应的数据
"""
components = urlparse.urlsplit(url)
# when empty path set to /index.html
path = components.path
if not path:
path = '/index.html'
elif path.endswith('/'):
path += 'index.html'

filename = components.netloc + path + components.query
# 将除数字,字母和基本符号外的其他符号用"_"代替
filename = re.sub('[^/0-9a-zA-Z\-.,;_ ]', '_', filename)
# 每个目录限制在255个字符内
filename = '/'.join(segment[:255] for segment in filename.split('/'))

return os.path.join(self.cache_dir, filename) ##cache_dir/filename

def has_expired(self, timestamp):
"""
查看缓存文件是否已经过期
"""
return datetime.utcnow() > timestamp + self.expires

def clear(self):
"""Remove all the cached values
"""
if os.path.exists(self.cache_dir):
shutil.rmtree(self.cache_dir)

if __name__ == '__main__':
cache = DiskCache()
cache.clear()
link_crawler('http://example.webscraping.com/', '/(index|view)', cache=DiskCache())


最后存储到本地文件系统的结构类似与:





磁盘缓存局限和缺点

1)

http://example.com/?a+b

http://example.com/?a*b

http://example.com/?a=b

http://example.com/?a!=b

里面的?!=符号都会替换为”_”导致如果其中一个url生成了缓存,其他3个url也会被认为生成了缓存。)

2)如果一些长的URL只在255字符之后有区别怎么办?截断后版本被映射为相同的文件名。

避免以上限制的办法是,使用URL的哈希值作为文件名。经过该方法能一定程度上改善上面情况。但在在磁盘中每个卷和每个目录大小是有限制的,即所能存储的网页数量是有限制的。可将缓存分割到不同的目录来避免。一个大型网站有一亿多个网页。为使Diskcache通用,我们需要把多个缓存网页存到合并到一个文件中,采用B+树算法来索引。我们可以采用数据库缓存来进行缓存,数据库中已经实现该算法来索引。

数据库缓存

对于大数据集而言,数据量太大使其难以存放在单一服务器中,此时需要扩展到多台服务器,因此我们采用mongoDB数据库进行数据缓存。

mongo_cache.py代码:

#coding:utf-8
try:
import cPickle as pickle
except ImportError:
import pickle
import zlib
from datetime import datetime, timedelta
from pymongo import MongoClient
from bson.binary import Binary
from link_crawler import link_crawler

class MongoCache:

def __init__(self, client=None, expires=timedelta(days=30)):
"""
client: mongo database client
expires: 若缓存数据的存储时间超过expires,则文件失效,将其删除
"""
# if a client object is not passed
# then try connecting to mongodb at the default localhost port
self.client = MongoClient('localhost', 27017) if client is None else client
# 创建一个集合来存储webpages
# which is the equivalent of a table in a relational database
self.db = self.client.cache
# 创建timestamp索引,在到达给定时间戳时,便可以自动删除记录
self.db.webpage.create_index('timestamp', expireAfterSeconds=expires.total_seconds())

def __contains__(self, url):
try:
self[url]
except KeyError:
return False
else:
return True

def __getitem__(self, url):
"""Load value at this URL
"""
record = self.db.webpage.find_one({'_id': url})
if record:
# return record['result']
return pickle.loads(zlib.decompress(record['result']))
else:
raise KeyError(url + ' does not exist')

def __setitem__(self, url, result):
"""Save value for this URL
"""
# record = {'result': result, 'timestamp': datetime.utcnow()}
record = {'result': Binary(zlib.compress(pickle.dumps(result))), 'timestamp': datetime.utcnow()}
##为避免重复,我们把ID设置为URL,并执行upsert操作,该操作表示当记录存在时更新记录,否则插入记录,
## 当我们尝试向同一条url插入记录时,将会更新其内容,不会创建冗余数据。
##实际上我们在link_crawler中爬取url时,已经用seen来避免抓取重复url的问题
##但是这里我们保险起见还是需要upsert=True
self.db.webpage.update({'_id': url}, {'$set': record}, upsert=True)

def clear(self):
self.db.webpage.drop()
if __name__ == '__main__':
cache=MongoCache()
cache.clear()
link_crawler('http://example.webscraping.com/', '/(index|view)', cache=MongoCache())
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: