您的位置:首页 > 数据库 > MySQL

Scrapy之md5加密,多线程mysql写入,item分步处理。

2018-02-01 22:14 597 查看

导入itemloader加载器

1.默认的xpath()/css()这种数据的提取方式是将数据的提取和过滤放在一起,代码比较凌乱。

2.可以将数据处理的函数单独定义,实现代码的重用。

from scrapy.loader import Itemloader
# 参数1:实例化的item对象,表明itemloader对象解析的数据,最终保存到item对象的哪一个字段中;
# 参数2:网页源码
item_loader = ItemLoader(item=item文件中返回item的类(), response=response)

# 对item_loader赋值的方法有三种
# 1.item_loader.add_xpath('键','xpath表达式')
# 2.item_loader.add_css(参考例1)
# 3.item_loader.add_value('键','值')

# 将itemloader加载器中保存的每一个field数据,收集起来,复制给item对象的field字段。
item = item_loader.load_item()

# 将item对象返回
yield item


在items文件中对item对象中要保存的数据进行处理。

from scrapy.loader.processors import MapCompose, TakeFirst, Join

class JobbolespiderItem(scrapy.Item):
# 当通过add_xpath()获取到原始数据,通过ItemLoader对象将原始数据传给input_processor()函数进行数据的处理,并将结果保存在ItemLoader中。
# MapCompose()类似于map(函数,列表),MapCompose会将List列表中的每一个元素分别作用于每一个处理函数。传进来的数据依次流过MapCompose()中的函数。
title = scrapy.Field(input_processor=MapCompose(函数1, lambda x:x+'----jobbole---'))
# 对应的还有output_processor=TakeFirst()数据在赋值给item之前进行处理。
# 由于处理过后返回的是列表。TakeFirst()获取列表中的第一个值。

# 如果想代码重用
# 定义类,继承于ItemLoader
class ArticleItemLoader(ItemLoader):
# 给ItemLoader对象指定一个默认的输出处理器
default_output_processor = TakeFirst()
# 在爬虫文件中将Itemloader换成ArticleItemLoader即可。


对于Url的保存

如果url比较长,可以考虑将url进行md5加密然后保存。

1.md5的长度是固定的,可以根据值进行去重

2.可以节省内存空间

import hashlib
def get_md5_by_url(url):
m = hashlib.md5()
m.update(url)
return m.hexdigest()


Mysql数据库的写入

# 数据库写入可以放在pipelines文件中。
import MySQLdb
# 图片的下载路径保存可以放在pipelines中
from scrapy.pipelines.images import ImagesPipeline
# 定义类继承于系统的图片下载类,对其进行内容扩充
class JobboleImagePipeline(ImagesPipeline):
def item_completed(self, results, item, info):
# 该方法会在图片经过管道下载成功之后,被调用。该函数会接收一些图片下载之后的相关信息。
#:param results: 元组列表,第一个元素success, 第二个元素file_info_and_error
#:param item: 爬虫类yield item对象
#:param info: 下载的信息
img_dict = results[0][1]
img_path = img_dict['path']
# 将img_path图片路径,保存到item对象的"img_path"属性中。
item['img_path'] = img_path
# 返回item对象,让后续的管道继续对item进行处理。
return item


1.正常写入

class MySQLPipeline(object):
def __init__(self):
# 初始化数据库对象
self.connect = MySQLdb.connect('localhost', 'root', '123456', 'article_db', charset='utf8')
self.cursor = self.connect.cursor()
def process_item(self, item, spider):
insert_sql = 'insert into jobbole (title, date_time, detail_url, url_object_id, img_src, img_path, ping_lun, shou_cang, dian_zan, tags, content) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
self.cursor.execute(insert_sql, (item['title'], item['date_time'], item['detail_url'], item['url_object_id'], item['img_src'], item['img_path'], item['ping_lun'], item['shou_cang'], item['dian_zan'], item['tags'], item['content']))
self.connect.commit()

def closed_spider(self, spider):
self.cursor.close()
self.connect.close()


2.异步写入

数据库的异步写入操作。因为execute()及commit()提交数据库的方式是同步插入数据,一旦数据量比较大,scrapy的解析是异步多线程的方式,解析速度非常快,而数据库的写入速度比较慢,可能会导致item中的数据插入数据库不及时,造成数据库写入的阻塞,最终导致数据库卡死或者数据丢失。

class MySQLTwistedPipeline(object):
def __init__(self, dbpool):
# 初始化线程池对象,用于维护操作mysql写入操作的线程
self.dbpool = dbpool
# from_settings()函数是固定写法,该函数是用于读取settings.py配置信息的函数,第二个参数settings是一个字典。
# 可以先在setting文件中设置数据库连接的配置信息
# MYSQL_HOST = 'localhost'
# MYSQL_DBNAME = 'article_db'
# MYSQL_USER = 'root'
# MYSQL_PASSWORD = '123456'
# MYSQL_CHARSET = 'utf8'
@classmethod
def from_settings(cls, settings):
args = dict(
host=settings['MYSQL_HOST'],
db=settings['MYSQL_DBNAME'],
user=settings['MYSQL_USER'],
passwd=settings['MYSQL_PASSWORD'],
charset=settings['MYSQL_CHARSET'],
# 指定用用于创建cursor游标的类
cursorclass=MySQLdb.cursors.DictCursor,
)
# 创建一个线程池对象
# 参数1:用于连接MySQL数据库的驱动
# 参数2:数据库的链接信息(host, port, user等)
dbpool = adbapi.ConnectionPool("MySQLdb", **args)

return cls(dbpool)
def process_item(self, item, spider):
# 在线程池dbpool中通过调用runInteraction()函数,来实现异步插入数据的操作。runInteraction()会insert_sql交由线程池中的某一个线程执行具体的插入操作。
query = self.dbpool.runInteraction(self.insert, item)
# addErrorback()数据库异步写入失败时,会执行addErrorback()内部的函数调用。
query.addErrback(self.handler_error)

def handler_error(self, failure):
print u'数据库插入数据失败:',failure

def insert(self, cursor, item):
insert_sql = 'insert into jobbole (title, date_time, detail_url, url_object_id, img_src, img_path, ping_lun, shou_cang, dian_zan, tags, content) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'
cursor.execute(insert_sql, (item['title'], item['date_time'], item['detail_url'], item['url_object_id'], item['img_src'], item['img_path'], item['ping_lun'], item['shou_cang'], item['dian_zan'], item['tags'], item['content']))
# 不需要执行commit()的操作了,会在线程池中自动指定提交的操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: