您的位置:首页 > 编程语言 > Python开发

python--爬虫--积累--多图片网站抓取加速方案和调优记录

2019-07-16 14:27 1216 查看

最近在处理多图片数据网站的抓取案例。

对抓取的速度有一定的要求。短时间内需要获取大量的图片。

在部署分布式抓取时有很多个因素是需要调优的。

1、每次读取数据库mongodb需要处理的记录的条数
2、协程的pool大小。同时处理的个数。
3、代理ip如何使用
4、代理可使用的请求数–部分代理ip有请求数的限制
5、代理ip可用时间(60s)还是几分钟
6、对方的封锁机制–同一ip密集访问会有被封锁的风险。需要调试出 安全的阈值。比如同一个ip 1s内发出多少请求是合理的。
7、对方服务器的承受能力–这方面也需要测试。比如我1s内 同时发3000次请求,对对方服务器来说 类似ddos攻击。把对方的服务器带宽都占完了。 测试的方式是 用正常的网络是否能打开对方的页面 如果正常网络都打不开对方的页面,说明对方的服务达到上限挂了。 这个也需要调试出 安全的阈值。这就是我们抓取速度的极限。
8、对方服务器每次请求断开连接的时长—涉及到我们下载图片时 超时时间的设置—一般请求设置 8s 就已经很长了。但是对于图片下载类型的爬虫来说。比如 我一台机子同时发出50个请求,因为我自身机子的带宽有限,所以 50张图片同时下载时,如果设置了8s超时,50个请求会有很多个超时,起码有30多个请求时超时。所以我们设置成请求超时时间是60s会显著提高每次请求的下载成功率。
当然提高带宽也有一定帮助,问题是我们只能提升自己的带宽,不能提升对方的带宽。
可能60s还是下载不完50个请求的图片。
但对方服务有可能设置 最长连接时长是60s。
这种场景 表现出来的现象是 一到60s后 很多个请求都会同时报超时。

经过反复的调试 得出一个 效果还不错也比较稳定的抓取方案分享如下:

一、首先是代理ip的选择

很多代理ip 要么有个数的限制,要么有可用时间的限制,要么有 同一个ip请求数的限制,要么有获取ip的间隔时间的限制。

每家代理ip不一样。

所以 选用的代理ip不一样 自己的框架架构就不一样。

我选择的代理ip

可选 返回个数 但价格不同 一次请求返回一个ip 每月160元 一次请求返回5个ip 每月720元。价格依次递增。

同一个ip使用时的请求数没有限制。(这点比较不错)

每个ip可用时间2分钟左右。

缺点是

1、每次获取代理ip需要间隔200ms。 也就是1s内不能请求太多次获取代理ip。
2、只能在5个ip出口去获取 代理ip,超过5个ip需要重新购买新的。(这个要求 我们的分布式集群 在有限的几个ip内部署,不能一台机子用一个动态ip)

二、代码框架

基于代理ip 的特点, 每次读取数据库mongodb需要处理的记录的条数为10,协程池的大小也为10。

如果每次需要处理的记录为100,协程池大小为10,使用同一个ip,则 1s内 可能会 发出100个请求。因为协程是io才会等待,发出请求时不会等待的。

这方面 是需要注意的。

协程调度 代码如下,确保每次发出的请求是50个,处理完后再发出第二次50个。

使用python3.x+

引入的包为:

[tool.poetry.dependencies]
python = "^3.5"
pyhocon = "^0.3.48"
pymongo = "^3.7.2"
beautifulsoup4="^4.4.0"
pandas = "0.24.2"
xlrd="^1.0.0"
gevent = "1.4.0"
openpyxl = "2.6.2"
oss2 = "2.6.1"
pypinyin="0.30.0"
from spider.service.download_files_2_oss_multi  import *
from spider.settings import config
import datetime
import _thread

def init():
for i in range(0, 100000000000, 1):
proxy = int(config["proxy"])
if proxy == 0:
start = int(config["start"])
p = pool.Pool(5)
tasks_list = []
tasks_list.append(p.spawn(multi_run, start, 10, 10, 0))
tasks_list.append(p.spawn(multi_run, start + 10, 10, 10, 0))
tasks_list.append(p.spawn(multi_run, start + 20, 10, 10, 0))
tasks_list.append(p.spawn(multi_run, start + 30, 10, 10, 0))
tasks_list.append(p.spawn(multi_run, start + 40, 10, 10, 0))
gevent.joinall(tasks_list)
# multi_run(300, 100, 10)

# multi_run(0, 100, 10)
print("第" + str(i) + "轮" + "finish 100")
else:
start = int(config["start"])
time.sleep(0.2)
ipports = ferch_proxy_ips()
while not ipports:
print("代理ip获取失败重新获取")
ipports = ferch_proxy_ips()
# 创建两个线程
p = pool.Pool(5)
tasks_list = []
tasks_list.append(p.spawn(multi_run, start, 10, 10, ipports[0]))
tasks_list.append(p.spawn(multi_run, start + 10, 10, 10, ipports[1]))
tasks_list.append(p.spawn(multi_run, start + 20, 10, 10, ipports[2]))
tasks_list.append(p.spawn(multi_run, start + 30, 10, 10, ipports[3]))
tasks_list.append(p.spawn(multi_run, start + 40, 10, 10, ipports[4]))
gevent.joinall(tasks_list)
# multi_run(300, 100, 10)

# multi_run(0, 100, 10)
print("第" + str(i) + "轮" + "finish 100")

# init()

协程处理抓取如下:

import gevent

# 打上猴子补丁
from gevent import monkey, pool
monkey.patch_all()

from spider.service.basic import *
from spider.dao.pic import *
import json
from urllib import parse
import os
import datetime
import oss2
import pypinyin
from spider.settings import config
import asyncio

# 阿里云主账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM账号进行API访问或日常运维,请登录 https://ram.console.aliyun.com 创建RAM账号。
auth = oss2.Auth('12341234, '4233334242')
# Endpoint以杭州为例,其它Region请按实际情况填写。
bucket = oss2.Bucket(auth, 'http://oss-cn-hangzhou.aliyuncs.com', 'test-bucket')

def upload_2_oss2(objectname,path):
bucket.put_object_from_file(objectname, path)
print("上传文件%s,命名为%s" % (path, objectname))

def remove_local_file(path):
os.remove(path)
print("删除文件%s" % path)

def get_family_pic_link(skip,size):
names=find_family_pic_link(skip, size)
for c in names:
download_file_family_link(c)

def download_file_family_link(c,ippor
7ff7
t=0):
try:

print("start " + c["pic_id"] + datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'))
print("name"  + " " + str(c["pic_label"]) + " " + str(c["structure_label"]) + " " + str(c["pic_id"]))
print(c["pic_link"])
ids = str(c["fullImgPath"]).split("/")
id = ids[len(ids) - 1]
path = download_file(config["folder.root"],
id + "/" + str(py(str(c["pic_label"]).replace(" ", ""))).replace("?","") + "/" + str(
py(c["structure_label"])).replace("?",""), str(c["pic_id"]), ".jpg", c["pic_link"],ipport)
key = id + str(py(str(c["pic_label"]).replace(" ", ""))).replace("?","") + str(py(c["structure_label"])).replace("?","") + str(
c["pic_id"]) + ".jpg"
upload_2_oss2(key, path)
remove_local_file(path)
deal_family_pic_link_oss(c["_id"], key, id)
print("end " + c["pic_id"] + datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'))
except urllib.error.HTTPError as e:
print(e.code)
print(e.reason)
print(e.read())
if e.code == 404:
error_family_pic_link(c["_id"])
print("404 dealed 2")
except Exception as e:
print(str(e))
if str(e) == "timed out" and c["dealed"]==4:
error_family_pic_link(c["_id"])
print("double timeout dealed 5")
elif str(e) == "timed out":
timeout_family_pic_link(c["_id"])
print("timeout dealed 4")
print("爬取失败")

def mkdir(path):
folder = os.path.exists(path)
if not folder:  # 判断是否存在文件夹如果不存在则创建为文件夹
os.makedirs(path)  # makedirs 创建文件时如果路径不存在会创建这个路径
print("--- create new folder %s  ---" % path)
else:
print("---  already have this folder!  ---")

def download_file(root,folder,id,end,link,ipport=0):
folder = os.path.join(root,folder)
path = os.path.join(root,folder,id+end)
#
# try:
if not os.path.exists(folder):
mkdir(folder)
if not os.path.exists(path):
if ipport == 0:
r = fetch_raw_respone(link)
else:
r = fetch_raw_respone_proxy(link, ipport)
with open(path,'wb') as f:
f.write(r.read())
f.close()
print("文件保存成功")
return path
else :
print("文件已存在")
return path
# except:
#     print("爬取失败")

# 不带声调的(style=pypinyin.NORMAL)
def py(word):
s = ''
for i in pypinyin.pinyin(word, style=pypinyin.NORMAL):
s += ''.join(i)
return s

def download_file_from_oss(key,filename):
# 下载到本地文件
result = bucket.get_object_to_file(key, filename)

def multi_run(skip,limit,poolsize,ipport=0):
print("开始抓取skip" + str(skip) + datetime.datetime.strftime(datetime.datetime.now(), '%Y-%m-%d %H:%M:%S'))

p = pool.Pool(poolsize)
tasks_list = []

names=[]
try:
type = int(config["type"])
if type == 4:
names = find_family_pic_link_time_out(skip, limit)
else:
names = find_family_pic_link(skip, limit)
except Exception as e:
print(str(e))
print("读取mongodb失败")
n = 0
if len(names)>0:
for c in names:
tasks_list.append(p.spawn(download_file_family_link, c,ipport))

gevent.joinall(tasks_list)

mongodb数据库读取常用的几种

import pymongo
from pymongo import MongoClient
from spider.settings import config
from datetime import datetime

def init_mongo_client():
if config['mongo.username']:
uri = "mongodb://"+config['mongo.username']+":"+config['mongo.password']+"@"+config['mongo.host']+":"+config['mongo.port']+"/admin"
else:
uri = "mongodb://"+config[
'mongo.host'] + ":" + config['mongo.port']
print(uri)
client = MongoClient(uri)
return client

def count_no_deal_pic_link(work):
client = init_mongo_client()
db = client['db_pic']
collection = db['family_pic_link']
queryArgs = {'work': work,"dealed":0}
linkcount = collection.count(queryArgs)
return linkcount

def insert_name(insert_record):
client = init_mongo_client()
db = client['db_pic']
collection = db['pic_name']
collection.save(insert_record)

def find_family_pic_link_one(work):
familys = []
client = init_mongo_client()
db = client['db_pic']
collection = db['pic_link']
queryArgs = {'work': work}
searchRes = collection.find(queryArgs).skip(0).limit(1)
for x in searchRes:
# print("work查询为"+str(x))
familys.append(x)
return familys

def deal_pic_list_link(_id):
filterArgs = {'_id': _id}
updateArgs = {'$set': {'dealed': 1}}
client = init_mongo_client()
db = client['db_pic']
collection = db['list_link']
updateRes = collection.update_many(filter=filterArgs, update=updateArgs)

http抓取代码

import urllib
import urllib.request
import time
import socket
import json
from datetime import datetime
import calendar
import gzip
import io

def fetch_raw(link):
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = {'User-Agent': user_agent}
f = urllib.request.Request(link, headers=headers)
response = urllib.request.urlopen(f, timeout=6)
the_page = response.read()
content = the_page.decode("utf8")
time.sleep(1)
#print(content)
return content
#contentjson = json.loads(content)
#print(contentjson)
#print(contentjson['total_pages'])

def fetch_raw_respone(link):
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = {'User-Agent': user_agent}
f = urllib.request.Request(link, headers=headers)
response = urllib.request.urlopen(f, timeout=70)
# time.sleep(0.2)
#print(content)
return response

def fetch_raw_respone_proxy(link,ipport):
proxy_support = urllib.request.ProxyHandler({'http': ipport})
opener = urllib.request.build_opener(proxy_support)
# urllib.request.install_opener(opener)
print("使用代理ip"+ipport+"访问"+link)
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = {'User-Agent': user_agent}
f = urllib.request.Request(link, headers=headers)
response = opener.open(f, timeout=70)
# time.sleep(1)
return response

def fetch_raw_post_json(link, data):
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = {'User-Agent': user_agent, "Content-Type": "application/json; charset=UTF-8"}
print({'data': json.dumps(data)})
print({'headers': json.dumps(headers)})
jsondata= json.dumps(data)
params = jsondata.encode('utf-8')
r = urllib.request.Request(link, data=params, headers=headers)
response = urllib.request.urlopen(r, timeout=8)
the_page = response.read()
content = the_page.decode("utf8")
time.sleep(1)
return content

def fetch_raw_post(link, data):
user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
headers = {'User-Agent': user_agent}
print({'status': json.dumps(data)})
data = urllib.parse.urlencode(data).encode('utf-8')
r = urllib.request.Request(link, data=data, headers=headers)
response = urllib.request.urlopen(r, timeout=8)
the_page = response.read()
content = the_page.decode("utf8")
time.sleep(1)
#print(content)
return content
#contentjson = json.loads(content)
#print(contentjson)
#print(contentjson['total_pages'])

def utc_datetime_to_timestamp(utc_datetime):
"""将 utc 时间 (datetime 格式) 转为 utc 时间戳
:param utc_datetime: {datetime}2016-02-25 20:21:04.242000
:return: 13位 的毫秒时间戳 1456431664242
"""
utc_timestamp = int(calendar.timegm(utc_datetime.timetuple()) * 1000.0 + utc_datetime.microsecond / 1000.0)
return utc_timestamp

def fetch_raw_post_cookie(link, data):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
'Cookie':"12343433"
}
print({'status': json.dumps(data)})
data = urllib.parse.urlencode(data).encode('utf-8')
r = urllib.request.Request(link, data=data, headers=headers)
response = urllib.request.urlopen(r, timeout=8)
the_page = response.read()
content = the_page.decode("utf8")
time.sleep(1)
#print(content)
return content

def fetch_proxy_ip():
try:
api="http://dailiip.com"
response = urllib.request.urlopen(api, timeout=8)
the_page = response.read()
content = the_page.decode("utf8")
print("获取代理ip"+content)
except Exception as e:
print(str("获取代理ip异常"+e))
content = ""
return content

def fetch_proxy_ips():
try:
api = "http://dailiip.com"
response = urllib.request.urlopen(api, timeout=8)
the_page = response.read()
content = the_page.decode("utf8")
print("获取代理ip" + content)
# 按照\n分割获取到的IP
ips = content.split('\n');
return ips
# 利用每一个IP
except Exception as e:
print(str("获取代理ip异常" + str(e)))
content = ""
return content

三、分布式部署抓取

使用10台服务器 使用虚拟环境安装pip3 whl包的方式

使用 poetry触发运行。

单台 一分钟50张图片 * 60 * 24 = 7w2 * 10 = 70w

每天获取的量在70w张左右 去掉 部分少量超时 损坏的 稳定获取60w左右。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: