您的位置:首页 > 其它

多任务爬虫

2020-07-07 12:24 246 查看
  1. 基于线程的生产者与消费者模式爬虫案例
    1.1 进程与线程
    进程:进程是资源(cpu,内存)分配的基本单位,它是程序执行时的一个实例。程序运行时系统就会创建一个进程,并为它分配资源,然后把该进程放入进程就绪队列,进程调度器选中它的时候就会为它分配CPU时间,程序开始真正运行。
    线程:线程是一条执行路径,是程序执行时的最小单位,它是进程的一个执行流,是CPU调度和分派的基本单位,一个进程可以由很多个线程组成,线程间共享进程的所有资源,每个线程有自己的堆栈和局部变量。线程由CPU独立调度执行,在多CPU环境下就允许多个线程同时运行。同样多线程也可以实现并发操作,每个请求分配一个线程来处理。
    一个正在运行的应用(如迅雷)就是一个进程,一个进程可以同时运行多个任务( 迅雷软件可以同时下载多个文件,每个下载任务就是一个线程), 可以简单的认为进程是线程的集合。
    协程:是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。 协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。
    1.2 多线程加对列在生产者与消费者模式下爬虫的应用案例(王者荣耀壁纸爬取)
from urllib import parse
from urllib import request
import requests
import os
import threading
import queue
import time

class Producer(threading.Thread):
"""生产者"""
headers = {
'User-Agent': '',
'referer': 'https://pvp.qq.com/web201605/wallpaper.shtml'
}
def __init__(self,page_q,img_q,*args,**kwargs):
super(Producer,self).__init__(*args,**kwargs)
# 实例化两个对列
self.page_q = page_q
self.img_q = img_q

@staticmethod
def get_url(data):
"""获取图片url"""
urls=[]
for x in range(1,9):
# 获取每种图片的标号
img_no=f'sProdImgNo_{x}'
# 从data中获取对应标号的加密url后再进行解码,然后将200替换成0
# "sProdImgNo_1":"http%3A%2F%2Fshp%2Eqpic%2Ecn%2
# Fishow%2F2735032619%2F1585220459%5F84828260%5F9035%5FsProdImgNo%5F1%2Ejpg%2F200"
url=parse.unquote(data.get(img_no)).replace('200','0')
urls.append(url)
return urls

def run(self):
"""拿到图片列表页url对列中的url后遍历获取每页的图片地址"""
# 判断对列是否为空
while not self.page_q.empty():
# 取出每页的url
url=self.page_q.get()
# 提取字典格式的数据
text = requests.get(url, headers=self.headers).json()
# 取出key为List的所有数据
datas = text["List"]
for data in datas:
# 提取图片地址
urls =Producer.get_url(data)
# 提取名称 "sProdName":"%E9%98%BF%E8%BD%B2%2D%E8%BF%B7%E8%B8%AA%E4%B8%BD%E5%BD%B1%20"
prod_name = parse.unquote(data['sProdName']).strip()
# 创建存储路径
img_path = os.path.join('images', prod_name)
if not os.path.exists(img_path):
os.mkdir(img_path)
# 遍历出所有url并将对应索引及名称创建成字典形式添加到对列
for index, img_url in enumerate(urls):
self.img_q.put({'img_url':img_url,'prod_name':prod_name,'index':index})
# 打印当前执行的线程
print('%s线程执行完毕' %threading.current_thread().name)

class Consumer(threading.Thread):
def __init__(self,page_q,img_q,lock,*args,**kwargs):
super(Consumer,self).__init__(*args,**kwargs)
self.page_q=page_q
self.img_q=img_q
self.lock=threading.Lock()

def run(self) -> None:
"""循环获取图片队列中字典中的数据"""
while 1:
try:
# 从队中取出字典
img_obj=self.img_q.get(timeout=60)
img_url=img_obj.get('img_url')
name=img_obj.get('prod_name')
index=img_obj.get('index')
# 创建存储路径
img_path = os.path.join('images',name)
try:
request.urlretrieve(img_url, os.path.join(img_path, f"{index+1}.jpg"))
print((img_url,os.path.join(img_path,f"{index+1}.jpg"))+'下载完成')
except Exception as e:
print(e)
# 对列为空时返回报错信息
except  queue.Empty as e:
print(e)
time.sleep(0.1)
continue

def main():
# 存放每页url的对列
page_q=queue.Queue(20)
# 存放图片的对列
img_q=queue.Queue(1000)
# 实例化一个线程锁
lock=threading.Lock()
for x in range(18):
# 获取图片的接口
url='https://apps.game.qq.com/cgi-bin/ams/module/ishow/V1.0/query/workList_inc.cgi?activityId=2735&sVerifyCode=ABCD' \
'&sDataType=JSON&iListNum=4&totalpage=0&page={page}&iOrder=0&iSortNumClose=1&iAMSActivityId=51991&_everyRead=true' \
'&iTypeId=1&iFlowId=267733&iActId=2735&iModuleId=2735&_=1587033640385'.format(page=x)
page_q.put(url)

for x in range(10):
# 生产者线程,解析数据
t1=Producer(page_q,img_q,name=f"生产者线程{x}")
t1.start()
for x in range(5):
# 消费者线程,保存数据
t2=Consumer(page_q,img_q,lock,name=f"消费者线程{x}")
t2.start()

if __name__ == '__main__':
main()
  1. 基于异步协程的多任务爬虫案例
    2.1 asyncio模块
    特殊函数:函数被asyncio模块中async关键字修饰后内部操作不会立即执行,调用后返回协程对象,特殊函数内部不能存在不支持异步的模块的代码;
# 特殊函数
async def test():
print("Hello World")

任务对象:对协程对象进一步封装,可以绑定回调函数,在任务结束后执行,回调函数携带参数为其调用者(任务对象),任务对象调用result()返回特殊函数的return结果;

# 协程对象
c=test()
# 任务对象
task=asyncio.ensure_future(c)
# 绑定回调函数
task.add_done_callback(fun_callback)

循环事件对象:将多个任务对象注册到事件对象后启动事件可异步执行多个任务;

# 创建循环事件对象
loop=asyncio.get_event_loop()
# 注册并启动事件
loop.run_until_complete(asyncio.wait(tasks))

wait方法:赋予将任务对象挂起的权限并释放cpu的使用
await关键字:在特殊函数中的阻塞操作用此关键字修饰

2.2 aiohttp异步请求模块

import aiohttp
async def get_request(url):
# 实例化一个请求对象
async with aiohttp.ClientSession() as sess:
# 发起请求,返回响应对象
# get/post(url,headers,params/data,proxy="http://ip:port")
async with await sess.get(url,headers=headers) as response:
# text()获取字符串类型的响应数据
# read()获取byte类型的响应数据
res_text=await response.text()
return res_text

异步多任务网易云歌曲下载案例:

import requests
from fake_useragent import UserAgent
from lxml import etree
import asyncio
import aiohttp
from urllib import request

ua=UserAgent().random
headers={'user-agent':ua}
def get_urls():
url=" https://music.163.com/discover/playlist"
text=requests.get(url,headers=headers).text
html=etree.HTML(text)
url_list=html.xpath("//ul[@id='m-pl-container']/li/div/a/@href")
# print(url_list)
return url_list
# print(url_list)

async def get_request(url):
# 实例化一个请求对象
async with aiohttp.ClientSession() as sess:
# 发起请求,返回响应对象
async with await sess.get(url,headers=headers) as response:
# 获取网页响应数据
res_text=await response.text()
# print(res_text)
return res_text

def parse(t):
"""
解析数据
:param t: t为parse的调用者(特殊函数)
:return:
"""
# 特殊函数获取的源代码
text=t.result()
html=etree.HTML(text)
# 歌曲信息需在隐藏标签定位,可在response中分析
names=html.xpath("//ul[@class='f-hide']/li//text()")
song_urls=html.xpath("//ul[@class='f-hide']/li/a/@href")
# print(names,song_urls)
for name,song_url in zip(names,song_urls):
# 下载外链
url=f"http://music.163.com/song/media/outer/url?id={song_url.strip('/song?id=')}"+'.mp3'
request.urlretrieve(url, f'./musics/{name}.mp3')
print(f"{name}下载完成")

if __name__=='__main__':
tasks=[]
for herf in get_urls():
url="https://music.163.com/"+herf
# print(url)
c=get_request(url)
# 创建一个任务对象
task=asyncio.ensure_future(c)
# 给任务对象绑定回调函数
task.add_done_callback(parse)
tasks.append(task)
# 创建一个时间循环对象
loop=asyncio.get_event_loop()
# 注册并启动时间循环
loop.run_until_complete(asyncio.wait(tasks))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: