Scrapy阅读源码分析<三>
2017-07-06 10:39
477 查看
爬虫类
接着上次代码讲,上次的运行入口执行到最后是执行了Crawler的
crawl方法:
1234567891011121314151617181920212223 | @defer.inlineCallbacksdef crawl(self, *args, **kwargs): assert not self.crawling, "Crawling already taking place" self.crawling = True try: # 从spiderloader中找到爬虫类,并实例化爬虫实例 self.spider = self._create_spider(*args, **kwargs) # 创建引擎 self.engine = self._create_engine() 4000 # 调用爬虫类的start_requests方法 start_requests = iter(self.spider.start_requests()) # 执行引擎的open_spider,并传入爬虫实例和初始请求 yield self.engine.open_spider(self.spider, start_requests) yield defer.maybeDeferred(self.engine.start) except Exception: if six.PY2: exc_info = sys.exc_info() self.crawling = False if self.engine is not None: yield self.engine.close() if six.PY2: six.reraise(*exc_info) raise |
依次来看,爬虫类是如何实例化的?上文已讲解过,在
Crawler实例化时,会创建
SpiderLoader,它会根据用户的配置文件
settings.py找到存放爬虫的位置,我们写的爬虫都会放在这里。
然后
SpiderLoader会扫描这里的所有文件,并找到父类是
scrapy.Spider爬虫类,然后根据爬虫类中的
name属性(在编写爬虫时,这个属性是必填的),最后生成一个
{spider_name: spider_cls}的字典,然后根据
scrapy crawl <spider_name>命令,根据
spider_name找到对应的爬虫类,然后实例化它,在这里就是调用了
_create_spider方法:
123 | def _create_spider(self, *args, **kwargs): # 调用类方法from_crawler实例化 return self.spidercls.from_crawler(self, *args, **kwargs) |
from_crawler进行的初始化,找到
scrapy.Spider类:
1234567891011 | @classmethoddef from_crawler(cls, crawler, *args, **kwargs): spider = cls(*args, **kwargs) spider._set_crawler(crawler) return spiderdef _set_crawler(self, crawler): self.crawler = crawler # 把settings对象赋给spider实例 self.settings = crawler.settings crawler.signals.connect(self.close, signals.spider_closed) |
settings配置,来看构造方法干了些什么?
1234567891011121314 | class Spider(object_ref): name = None custom_settings = None def __init__(self, name=None, **kwargs): # name必填 if name is not None: self.name = name elif not getattr(self, 'name', None): raise ValueError("%s must have a name" % type(self).__name__) self.__dict__.update(kwargs) # 如果没有设置start_urls,默认是[] if not hasattr(self, 'start_urls'): self.start_urls = [] |
name、
start_urls、
custom_settings。
name:在运行爬虫时通过它找到对应的爬虫脚本而使用;
start_urls:定义种子URL;
custom_settings:从字面意思可以看出,爬虫自定义配置,会覆盖配置文件的配置项;
引擎
分析完爬虫类的初始化后,还是回到Crawler的
crawl方法,紧接着就是创建引擎对象,也就是
_create_engine方法,这里直接进行了引擎初始化操作,看看都发生了什么?
1234567891011121314151617181920212223 | class ExecutionEngine(object): """引擎""" def __init__(self, crawler, spider_closed_callback): self.crawler = crawler # 这里也把settings配置保存到引擎中 self.settings = crawler.settings # 信号 self.signals = crawler.signals # 日志格式 self.logformatter = crawler.logformatter self.slot = None self.spider = None self.running = False self.paused = False # 从settings中找到Scheduler调度器,找到Scheduler类 self.scheduler_cls = load_object(self.settings['SCHEDULER']) # 同样,找到Downloader下载器类 downloader_cls = load_object(self.settings['DOWNLOADER']) # 实例化Downloader self.downloader = downloader_cls(crawler) # 实例化Scraper,它是引擎连接爬虫类的桥梁 self.scraper = Scraper(crawler) self._spider_closed_callback = spider_closed_callback |
Scheduler、
Downloader、
Scrapyer,其中
Scheduler只进行了类定义,没有实例化。
调度器
调度器初始化发生在引擎的open_spider方法中,我们提前来看一下调度器的初始化完成了哪些工作?
123456789101112131415161718192021222324252627282930313233 | class Scheduler(object): """调度器""" def __init__(self, dupefilter, jobdir=None, dqclass=None, mqclass=None, logunser=False, stats=None, pqclass=None): # 指纹过滤器 self.df = dupefilter # 任务队列文件夹 self.dqdir = self._dqdir(jobdir) # 优先级任务队列类 self.pqclass = pqclass # 磁盘任务队列类 self.dqclass = dqclass # 内存任务队列类 self.mqclass = mqclass # 日志是否序列化 self.logunser = logunser self.stats = stats @classmethod def from_crawler(cls, crawler): settings = crawler.settings # 从配置文件中获取指纹过滤器类 dupefilter_cls = load_object(settings['DUPEFILTER_CLASS']) # 实例化指纹过滤器 dupefilter = dupefilter_cls.from_settings(settings) # 从配置文件中依次获取优先级任务队列类、磁盘队列类、内存队列类 pqclass = load_object(settings['SCHEDULER_PRIORITY_QUEUE']) dqclass = load_object(settings['SCHEDULER_DISK_QUEUE']) mqclass = load_object(settings['SCHEDULER_MEMORY_QUEUE']) # 请求日志序列化开关 logunser = settings.getbool('LOG_UNSERIALIZABLE_REQUESTS', settings.getbool('SCHEDULER_DEBUG')) return cls(dupefilter, jobdir=job_dir(settings), logunser=logunser, stats=crawler.stats, pqclass=pqclass, dqclass=dqclass, mqclass=mqclass) |
实例化请求指纹过滤器:用来过滤重复请求,可自己重写替换之;
定义各种不同类型的任务队列:优先级任务队列、基于磁盘的任务队列、基于内存的任务队列;
请求指纹过滤器
先来看请求指纹过滤器是什么?在配置文件中定义的默认指纹过滤器是
RFPDupeFilter:
12345678910111213141516171819 | class RFPDupeFilter(BaseDupeFilter): """请求指纹过滤器""" def __init__(self, path=None, debug=False): self.file = None # 指纹集合,使用的是set,基于内存 self.fingerprints = set() self.logdupes = True self.debug = debug self.logger = logging.getLogger(__name__) # 请求指纹可存入磁盘 if path: self.file = open(os.path.join(path, 'requests.seen'), 'a+') self.file.seek(0) self.fingerprints.update(x.rstrip() for x in self.file) @classmethod def from_settings(cls, settings): debug = settings.getbool('DUPEFILTER_DEBUG') return cls(job_dir(settings), debug) |
set,而且可以控制这些指纹是否存入磁盘供下次重复使用。
指纹过滤器的主要职责是:过滤重复请求,可自定义过滤规则。
在下篇文章中会介绍到,每个请求是根据什么规则生成指纹,进而实现重复请求过滤逻辑的。
任务队列
调度器默认定义的2种队列类型:基于磁盘的任务队列:在配置文件可配置存储路径,每次执行后会把队列任务保存到磁盘上;
基于内存的任务队列:每次都在内存中执行,下次启动则消失;
配置文件默认定义如下:
123456 | # 基于磁盘的任务队列(后进先出)SCHEDULER_DISK_QUEUE = 'scrapy.squeues.PickleLifoDiskQueue'# 基于内存的任务队列(后进先出)SCHEDULER_MEMORY_QUEUE = 'scrapy.squeues.LifoMemoryQueue'# 优先级队列SCHEDULER_PRIORITY_QUEUE = 'queuelib.PriorityQueue' |
JOBDIR,那么则每次把任务队列保存在磁盘中,下次启动时自动加载。
如果没有定义,那么则使用的是内存队列。
细心的你会发现,默认定义的这些队列结构都是后进先出的,什么意思呢?
也就是说:Scrapy默认的采集规则是深度优先采集!
如何改变这种机制,变为广度优先采集呢?那么你可以看一下
scrapy.squeues模块,其中定义了:
12345678910111213141516 | # 先进先出磁盘队列(pickle序列化)PickleFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, \ _pickle_serialize, pickle.loads)# 后进先出磁盘队列(pickle序列化)PickleLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, \ _pickle_serialize, pickle.loads)# 先进先出磁盘队列(marshal序列化)MarshalFifoDiskQueue = _serializable_queue(queue.FifoDiskQueue, \ marshal.dumps, marshal.loads)# 后进先出磁盘队列(marshal序列化)MarshalLifoDiskQueue = _serializable_queue(queue.LifoDiskQueue, \ marshal.dumps, marshal.loads)# 先进先出内存队列FifoMemoryQueue = queue.FifoMemoryQueue# 后进先出内存队列LifoMemoryQueue = queue.LifoMemoryQueue |
如果你想追究这些队列是如何实现的,可以参考scrapy作者写的scrapy/queuelib模块。
下载器
回头引擎的初始化,来看下载器是如何初始化的。在默认的配置文件
default_settings.py中,下载器配置如下:
1 | DOWNLOADER = 'scrapy.core.downloader.Downloader' |
Downloader实例化:
12345678910111213141516171819202122 | class Downloader(object): """下载器""" def __init__(self, crawler): # 同样的,拿到settings对象 self.settings = crawler.settings self.signals = crawler.signals self.slots = {} self.active = set() # 初始化DownloadHandlers self.handlers = DownloadHandlers(crawler) # 从配置中获取设置的并发数 self.total_concurrency = self.settings.getint('CONCURRENT_REQUESTS') # 同一域名并发数 self.domain_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_DOMAIN') # 同一IP并发数 self.ip_concurrency = self.settings.getint('CONCURRENT_REQUESTS_PER_IP') # 随机延迟下载时间 self.randomize_delay = self.settings.getbool('RANDOMIZE_DOWNLOAD_DELAY') # 初始化下载器中间件 self.middleware = DownloaderMiddlewareManager.from_crawler(crawler) self._slot_gc_loop = task.LoopingCall(self._slot_gc) self._slot_gc_loop.start(60) |
下载器
DownloadHandlers是做什么的?
下载器中间件
DownloaderMiddlewareManager初始化发生了什么?
下载处理器
12345678910111213141516 | class DownloadHandlers(object): """下载器处理器""" def __init__(self, crawler): self._crawler = crawler self._schemes = {} # 存储scheme对应的类路径,后面用于实例化 self._handlers = {} # 存储scheme对应的下载器 self._notconfigured = {} # 从配置中找到DOWNLOAD_HANDLERS_BASE,构造下载处理器 # 注意:这里是调用getwithbase方法,取的是配置中的XXXX_BASE配置 handlers = without_none_values( crawler.settings.getwithbase('DOWNLOAD_HANDLERS')) # 存储scheme对应的类路径,后面用于实例化 for scheme, clspath in six.iteritems(handlers): self._schemes[scheme] = clspath crawler.signals.connect(self._close, signals.engine_stopped) |
12345678910 | # 用户可自定义的下载处理器DOWNLOAD_HANDLERS = {}# 默认的下载处理器DOWNLOAD_HANDLERS_BASE = { 'file': 'scrapy.core.downloader.handlers.file.FileDownloadHandler', 'http': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler', 'https': 'scrapy.core.downloader.handlers.http.HTTPDownloadHandler', 's3': 'scrapy.core.downloader.handlers.s3.S3DownloadHandler', 'ftp': 'scrapy.core.downloader.handlers.ftp.FTPDownloadHandler',} |
http和
https对应的处理器。
从这里你也能看出,scrapy的架构是非常低耦合的,任何涉及到的组件及模块都是可重写和配置的。scrapy提供了基础的服务组件,你也可以自己实现其中的某些组件,修改配置即可达到替换的目的。
到这里,大概就能明白,下载处理器的工作就是:管理着各种资源对应的下载器,在真正发起网络请求时,选取对应的下载器进行资源下载。
但是请注意,在这个初始化过程中,这些下载器是没有被实例化的,也就是说,在真正发起网络请求时,才会进行初始化,而且只会初始化一次,后面会讲到。
下载器中间件管理器
下面来看下载器中间件DownloaderMiddlewareManager初始化,同样的这里又调用了类方法
from_crawler进行初始化,
DownloaderMiddlewareManager继承了
MiddlewareManager类,来看它在初始化做了哪些工作:
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 | class MiddlewareManager(object): """所有中间件的父类,提供中间件公共的方法""" component_name = 'foo middleware' @classmethod def from_crawler(cls, crawler): # 调用from_settings return cls.from_settings(crawler.settings, crawler) @classmethod def from_settings(cls, settings, crawler=None): # 调用子类_get_mwlist_from_settings得到所有中间件类的模块 mwlist = cls._get_mwlist_from_settings(settings) middlewares = [] enabled = [] # 依次实例化 for clspath in mwlist: try: # 加载这些中间件模块 mwcls = load_object(clspath) # 如果此中间件类定义了from_crawler,则调用此方法实例化 if crawler and hasattr(mwcls, 'from_crawler'): mw = mwcls.from_crawler(crawler) # 如果此中间件类定义了from_settings,则调用此方法实例化 elif hasattr(mwcls, 'from_settings'): mw = mwcls.from_settings(settings) # 上面2个方法都没有,则直接调用构造实例化 else: mw = mwcls() middlewares.append(mw) enabled.append(clspath) except NotConfigured as e: if e.args: clsname = clspath.split('.')[-1] logger.warning("Disabled %(clsname)s: %(eargs)s", {'clsname': clsname, 'eargs': e.args[0]}, extra={'crawler': crawler}) logger.info("Enabled %(componentname)ss:\n%(enabledlist)s", {'componentname': cls.component_name, 'enabledlist': pprint.pformat(enabled)}, extra={'crawler': crawler}) # 调用构造方法 return cls(*middlewares) @classmethod def _get_mwlist_from_settings(cls, settings): # 具体有哪些中间件类,子类定义 raise NotImplementedError def __init__(self, *middlewares): self.middlewares = middlewares # 定义中间件方法 self.methods = defaultdict(list) for mw in middlewares: self._add_middleware(mw) def _add_middleware(self, mw): # 默认定义的,子类可覆盖 # 如果中间件类有定义open_spider,则加入到methods if hasattr(mw, 'open_spider'): self.methods['open_spider'].append(mw.open_spider) # 如果中间件类有定义close_spider,则加入到methods # methods就是一串中间件的方法链,后期会依次调用 if hasattr(mw, 'close_spider'): self.methods['close_spider'].insert(0, mw.close_spider) |
DownloaderMiddlewareManager实例化:
123456789101112131415161718 | class DownloaderMiddlewareManager(MiddlewareManager): """下载中间件管理器""" component_name = 'downloader middleware' @classmethod def _get_mwlist_from_settings(cls, settings): # 从配置文件DOWNLOADER_MIDDLEWARES_BASE和DOWNLOADER_MIDDLEWARES获得所有下载器中间件 return build_component_list( settings.getwithbase('DOWNLOADER_MIDDLEWARES')) def _add_middleware(self, mw): # 定义下载器中间件请求、响应、异常一串方法 if hasattr(mw, 'process_request'): self.methods['process_request'].append(mw.process_request) if hasattr(mw, 'process_response'): self.methods['process_response'].insert(0, mw.process_response) if hasattr(mw, 'process_exception'): self.methods['process_exception'].insert(0, mw.process_exception) |
MiddlewareManager类,然后重写了
_add_middleware方法,为下载行为定义默认的下载前、下载后、异常时对应的处理方法。
中间件的职责是什么?从这里能大概看出,从某个组件流向另一个组件时,会经过一系列中间件,每个中间件都定义了自己的处理流程,相当于一个个管道,输入时可以针对数据进行处理,然后送达到另一个组件,另一个组件处理完逻辑后,又经过这一系列中间件,这些中间件可再针对这个响应结果进行处理,最终输出。
Scraper
下载器实例化完了之后,回到引擎的初始化方法中,然后是实例化Scraper,在Scrapy源码分析(一)架构概览中已经大概说到,这个类没有在架构图中出现,但这个类其实是处于
Engine、
Spiders、
Pipeline之间,是连通这3个组件的桥梁。
来看它的初始化:
123456789101112131415 | class Scraper(object): def __init__(self, crawler): self.slot = None # 实例化爬虫中间件管理器 self.spidermw = SpiderMiddlewareManager.from_crawler(crawler) # 从配置文件中加载Pipeline处理器类 itemproc_cls = load_object(crawler.settings['ITEM_PROCESSOR']) # 实例化Pipeline处理器 self.itemproc = itemproc_cls.from_crawler(crawler) # 从配置文件中获取同时处理输出的任务个数 self.concurrent_items = crawler.settings.getint('CONCURRENT_ITEMS') self.crawler = crawler self.signals = crawler.signals self.logformatter = crawler.logformatter |
爬虫中间件管理器
SpiderMiddlewareManager初始化:
1234567891011121314151617181920 | class SpiderMiddlewareManager(MiddlewareManager): """爬虫中间件管理器""" component_name = 'spider middleware' @classmethod def _get_mwlist_from_settings(cls, settings): # 从配置文件中SPIDER_MIDDLEWARES_BASE和SPIDER_MIDDLEWARES获取默认的爬虫中间件类 return build_component_list(settings.getwithbase('SPIDER_MIDDLEWARES')) def _add_middleware(self, mw): super(SpiderMiddlewareManager, self)._add_middleware(mw) # 定义爬虫中间件处理方法 if hasattr(mw, 'process_spider_input'): self.methods['process_spider_input'].append(mw.process_spider_input) if hasattr(mw, 'process_spider_output'): self.methods['process_spider_output'].insert(0, mw.process_spider_output) if hasattr(mw, 'process_spider_exception'): self.methods['process_spider_exception'].insert(0, mw.process_spider_exception) if hasattr(mw, 'process_start_requests'): self.methods['process_start_requests'].insert(0, mw.process_start_requests) |
配置文件中定义的默认的爬虫中间件类如下:
12345678 | SPIDER_MIDDLEWARES_BASE = { # 默认的爬虫中间件类 'scrapy.spidermiddlewares.httperror.HttpErrorMiddleware': 50, 'scrapy.spidermiddlewares.offsite.OffsiteMiddleware': 500, 'scrapy.spidermiddlewares.referer.RefererMiddleware': 700, 'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': 800, 'scrapy.spidermiddlewares.depth.DepthMiddleware': 900,} |
HttpErrorMiddleware:会针对响应不是200错误进行逻辑处理;
OffsiteMiddleware:如果Spider中定义了
allowed_domains,会自动过滤初次之外的域名请求;
RefererMiddleware:追加
Referer头信息;
UrlLengthMiddleware:控制过滤URL长度超过配置的请求;
DepthMiddleware:过滤超过配置深入的抓取请求;
当然,你也可以定义自己的爬虫中间件,来处理自己需要的逻辑。
Pipeline管理器
爬虫中间件管理器初始化完之后,然后就是Pipeline组件的初始化,默认的
Pipeline组件是
ItemPipelineManager:
123456789101112131415161718 | class ItemPipelineManager(MiddlewareManager): component_name = 'item pipeline' @classmethod def _get_mwlist_from_settings(cls, settings): # 从配置文件加载ITEM_PIPELINES_BASE和ITEM_PIPELINES类 return build_component_list(settings.getwithbase('ITEM_PIPELINES')) def _add_middleware(self, pipe): super(ItemPipelineManager, self)._add_middleware(pipe) # 定义默认的pipeline处理逻辑 if hasattr(pipe, 'process_item'): self.methods['process_item'].append(pipe.process_item) def process_item(self, item, spider): # 依次调用所有子类的process_item方法 return self._process_chain('process_item', item, spider) |
ItemPipelineManager也是一个中间件管理器的子类,由于它的行为非常类似于中间件,但由于功能较为独立,所以属于核心组件之一。
从
Scraper的初始化能够看到,它管理着
Spiders和
Pipeline相关的交互逻辑。
总结
到这里,所有组件:引擎、下载器、调度器、爬虫类、输出处理器都依次初始化完成,每个核心组件下其实都包含一些小的组件在里面,帮助处理某一环节的各种流程。
相关文章推荐
- Scrapy阅读源码分析<三>
- Scrapy阅读源码分析<二>
- Scrapy阅读源码分析<四>
- Scrapy源码阅读分析<一>
- Hadoop-datanode存储结构及源码分析<转>
- caffe源码分析 vector<Blob<Dtype>*>& bottom
- .net源码分析 - ConcurrentDictionary<TKey, TValue>
- Java集合源码分析(六)TreeSet<E>
- u-boot-2010.06 源码分析<3>--第二阶段
- ceph-deploy源码分析(一)——源码结构与cli <转>
- JDK8中的Comparable<T>接口源码分析
- Android开发-数据存储SharedPreferences工具类、Set<String>保存问题、源码分析
- Android大图加载,缩放,滑动浏览--SubsamplingScaleImageView 源码分析<一>大图加载
- 招聘数据分析<三>
- android Ipc----Binder<Aldl>源码分析
- Java集合源码分析(五)HashSet<E>
- QUnit系列 -- 5.QUnit源码分析之<大致结构>
- Nimbus<二>storm启动nimbus源码分析-nimbus.clj
- jquery Selector 源码分析<转>
- 雄踞AppStore榜首的游戏<别踩到白块儿>源码分析和下载(一)