您的位置:首页 > 其它

爬虫性能相关

2017-10-24 17:56 232 查看
一、单进程,单线程引起等待
import requests

def fetch_async(url):
response = requests.get(url)
return response

url_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']

for url in url_list:
fetch_async(url)

多线程执行
from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_async(url):
response = requests.get(url)
return response

url_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
pool.submit(fetch_async,url)
pool.shutdown(wait=True)

多线程+回调函数执行
from concurrent.futures import ThreadPoolExecutor
import requests

def fetch_async(url):
response = requests.get(url)
return response

def callback(future):
print(future.result())

url_list = ['http://www.github.com', 'http://www.baidu.com','http://www.bing.com']
pool = ThreadPoolExecutor(5)
for url in url_list:
v = pool.submit(fetch_async,url)
v.add_done_callback(callback)

pool.shutdown(wait=True)

多进程执行
from concurrent.futures import ProcessPoolExecutor
import requests

def fetch_async(url):
response = requests.get(url)
return response

if __name__ == '__main__':
url_list = ['http://www.github.com', 'http://www.baidu.com', 'http://www.bing.com']
pool = ProcessPoolExecutor(5)
for url in url_list:
pool.submit(fetch_async, url)
pool.shutdown(wait=True)

示例2:
from concurrent.futures import ThreadPoolExecutor
import requests
urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']
def load_url(url):
res = requests.get(url)
print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)

for url in urls:
future = executor.submit(load_url,url)
print(future.done())

print('主线程')

结果:
False
False
False
主线程 http://www.163.com page is 646107 bytes https://www.baidu.com/ page is 2443 bytes https://github.com/ page is 51203 bytes

我们使用submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。由于线程池异步提交了任务,主线程并不会等待线程池里创建的线程执行完毕,所以执行了print('主线程'),相应的线程池中创建的线程并没有执行完毕,故future.done()返回结果为False

示例3
from concurrent.futures import ThreadPoolExecutor
import requests
urls = urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']

def load_url(url):
res = requests.get(url)
print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)
executor.map(load_url,urls)    #同python内置的map函数用法一样
print('主线程')

示例4
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']

def load_url(url):
res = requests.get(url)
print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in urls:
future = executor.submit(load_url, url)
f_list.append(future)
print(wait(f_list))
print('主线程')

注: wait方法接会返回一个tuple(元组),tuple中包含两个set(集合),一个是completed(已完成的)另外一
个是uncompleted(未完成的)。使用wait方法的一个优势就是获得更大的自由度,
它接收三个参数FIRST_COMPLETED, FIRST_EXCEPTION 和ALL_COMPLETE,默认设置为ALL_COMPLETED
如果采用默认的ALL_COMPLETED,程序会阻塞直到线程池里面的所有任务都完成,再执行主线程.

示例5
from concurrent.futures import ThreadPoolExecutor,wait,as_completed
import requests
urls = ['http://www.163.com', 'https://www.baidu.com/', 'https://github.com/']

def load_url(url):
res = requests.get(url)
print('{0} page is {1} bytes'.format(url, len(res.text)))

executor = ThreadPoolExecutor(max_workers=3)

f_list = []
for url in urls:
future = executor.submit(load_url, url)
f_list.append(future)
print(wait(f_list, return_when='FIRST_COMPLETED'))    #如果采用FIRST_COMPLETED参数,程序并不会等到线程池里面所有的任务都完成
print('主线程')

多进程+回调函数
from concurrent.futures import ProcessPoolExecutor
import requests

def fetch_async(url):
response = requests.get(url)
return response

def callback(future):
print(future.result())

url_list = ['http://www.github.com', 'http://www.baidu.com', 'http://www.bing.com']
if __name__ == '__main__':
pool = ProcessPoolExecutor(5)
for url in url_list:
v = pool.submit(fetch_async, url)
v.add_done_callback(callback)
pool.shutdown(wait=True)

二、异步io库asyncio,asyncio的编程模型就是一个消息循环。我们从asyncio模块中直接获取一个EventLoop的引用,然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO
示例1
import asyncio

@asyncio.coroutine
def func1():
print('before...func1......')
yield from asyncio.sleep(5)
print('end...func1......')

@asyncio.coroutine
def func2():
print('before...func2......')
yield from asyncio.sleep(5)
print('end...func2......')

tasks = [func1(), func2()]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

示例2
import asyncio

@asyncio.coroutine
def fetch_async(host, url='/'):
print(host, url)
reader, writer = yield from asyncio.open_connection(host, 80)

request_header_content = """GET {0} HTTP/1.0\r\nHost: {1}\r\n\r\n""".format(url, host)
request_header_content = bytes(request_header_content, encoding='utf-8')

writer.write(request_header_content)
yield from writer.drain()
text = yield from reader.read()
print(host, url, text)
writer.close()

tasks = [
fetch_async('www.cnblogs.com', '/wupeiqi/'),
fetch_async('dig.chouti.com', '/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

示例3
import threading
import asyncio

@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(5)
print('Hello again! (%s)' % threading.currentThread())

loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

示例4:
import time
import asyncio

now = lambda : time.time()
async def do_some_work(x):        #async关键字定义协程
print('Waiting: ', x)
return 'Done after {0}'.format(x)

def callback(future):
print('Callback: ', future.result())

start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)

aiohttp+asyncio模块

import aiohttp
import asyncio

@asyncio.coroutine
def fetch_async(url):
print(url)
response = yield from aiohttp.request('GET', url)
print(url, response)
response.close()

tasks = [fetch_async('http://www.baidu.com'), fetch_async('http://www.chouti.com')]
event_loop = asyncio.get_event_loop()
results = event_loop.run_until_complete(asyncio.gather(*tasks))
event_loop.close()

asyncio+requests模块
import asyncio
import requests

@asyncio.coroutine
def fetch_async(func, *args):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, func, *args)
response = yield from future
print(response.url, response.content)

tasks = [
fetch_async(requests.get, 'http://www.cnblogs.com/wupeiqi/'),
fetch_async(requests.get, 'http://dig.chouti.com/pic/show?nid=4073644713430508&lid=10273091')
]

loop = asyncio.get_event_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()

gevent+requests模块
import gevent
import requests
from gevent import monkey

monkey.patch_all()

def fetch_async(method, url, req_kwargs):
print(method, url, req_kwargs)
response = requests.request(method=method, url=url, **req_kwargs)
print(response.url, response.content)

gevent.joinall([
gevent.spawn(fetch_async, method='get', url='https://www.python.org/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://www.baidu.com/', req_kwargs={}),
gevent.spawn(fetch_async, method='get', url='https://www.sina.com/', req_kwargs={})
])

grequests模块
import grequests

request_list = [
grequests.get('http://httpbin.org/delay/1',timeout=0.001),
grequests.get('http://fakedomain/'),
grequests.get('http://httpbin.org/status/500')
]

twisted模块
from twisted.web.client import getPage, defer
from twisted.internet import reactor

def all_done(arg):
reactor.stop()

def callback(contents):
print(contents)

deferred_list = []
url_list = ['http://www.bing.com', 'http://www.baidu.com',]
for url in url_list:
deferred = getPage(bytes(url, encoding='utf8'))
deferred.addCallback(callback)
deferred_list.append(deferred)

dlist = defer.DeferredList(deferred_list)
dlist.addBoth(all_done)

reactor.run()

tornado模块
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop

def handle_response(response):
if response.error:
print("Error:", response.error)
else:
print(response.body)

def func():
url_list = [
'http://www.baidu.com',
'http://www.bing.com',
]
for url in url_list:
print(url)
http_client = AsyncHTTPClient()
http_client.fetch(HTTPRequest(url), handle_response)

ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()

tornado更多
from twisted.internet import reactor
from twisted.web.client import getPage
import urllib.parse

def one_done(arg):
print(arg)
reactor.stop()

post_data = urllib.parse.urlencode({'check_data': 'adf'})
post_data = bytes(post_data, encoding='utf8')
headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
response = getPage(bytes('http://dig.chouti.com/login',encoding='utf8'),
method=bytes('POST', encoding='utf8'),
postdata=post_data,
cookies={},
headers=headers)
response.addBoth(one_done)

reactor.run()

以上均是Python内置以及第三方模块提供异步IO请求模块,使用简便大大提高效率,而对于异步IO请求的本质则是【非阻塞Socket】+【IO多路复用】
三、牛逼的select模块
服务端例子
import socket
import select

sk1 = socket.socket()
sk1.bind(('0.0.0.0', 8001))
sk1.listen()

sk2 = socket.socket()
sk2.bind(('0.0.0.0', 8002))
sk2.listen()

sk3 = socket.socket()
sk3.bind(('0.0.0.0', 8003))
sk3.listen()

inputs = [sk1,sk2,sk3]

while True:
r_list, w_list, e_list = select.select(inputs,[],inputs,1)
for sk in r_list:
conn, address = sk.accept()
conn.sendall(bytes('hello', encoding='utf-8'))
conn.close()
for sk in e_list:
inputs.remove(sk)

解释:
# select内部自动监听sk1,sk2,sk3三个对象,监听三个句柄是否发生变化,把发生变化的元素放
入r_list中。
# 如果有人连接sk1,则r_list = [sk1]
# 如果有人连接sk1和sk2,则r_list = [sk1,sk2]
# select中第1个参数表示inputs中发生变化的句柄放入r_list。
# select中第2个参数表示[]中的值原封不动的传递给w_list。
# select中第3个参数表示inputs中发生错误的句柄放入e_list。
# 参数1表示1秒监听一次
# 当有用户连接时,r_list里面的内容[<socket.socket fd=220, family=AddressFamily.AF_INET
, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 8001)>]

客户端
import socket
obj = socket.socket()
obj.connect(('127.0.0.1', 8001))
content = str(obj.recv(1024), encoding='utf-8')
print(content)

obj.close()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  性能 爬虫 相关