您的位置:首页 > 编程语言 > Python开发

Python协程-异步编程

2020-06-06 07:47 731 查看

使用python协程实现异步编程

协程的作用

  1. 由于Python全局解释器锁的原因,Python多线程效率不高,同一时刻只运行一个线程,但是在I/O操作
    不用CPU的操作
    频繁时,可以节省时间。
  2. 协程的作用就是在遇到I/O阻塞时,去执行其他协程任务,也可以充分的利用CPU的资源,相比线程,还减少了上下文切换的开销。
  3. CPU密集型任务应该使用多进程,多进程配合协程==
    起飞~

python3.4中加入了asyncio,然后可以通过asyncio.coroutine装饰器实现协程;
python3.5中引入了async 和await关键字,使得协程的实现更加简便;
python3.7之后,可以使用asyncio.run()完成事件循环。

协程的使用

await关键字:后面可以接

IO等待
:协程对象,Future,Task对象
示例1:

import asyncio
async def func():
print('开始I/O')
response = await asyncio.sleep(2)
# 使用sleep来模拟IO阻塞,在等待时,会去执行其他任务。
print('结束',response)
cor = func()
asyncio.run(cor)

实例2:

import asyncio
async def func():
print('开始I/O')
response = await asyncio.sleep(2)
# 使用sleep来模拟IO阻塞,在等待时,会去执行其他任务。
return ('结束')

async def mfun():
print('执行协程函数内部代码')
# 这里的func()是协程对象
response = await func()
# func的返回值就是response
print("IO请求结果:",response)

cor = mfunc()
asyncio.run(cor)

执行结果:
>>>执行协程函数内部代码
>>>开始I/O
>>>结束
>>>IO请求结果:结束
Task对象

asyncio.create_task()
可以在事件循环中添加多个任务.遇到IO就去执行列表中下一个函数。
示例1:

import asyncio

async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'

async def main():
print('开始')

# 创建一个task对象,将当前func函数立即加入到事件循环中
task1 = asyncio.create_task(func())
task2 = asyncio.create_task(func())

# await会等待任务执行完毕,但是在执行,但是在执行任务的时候遇到io,再去执行其他任务,直到当前任务完毕返回结果给ret
ret1  =await task1
ret2  =await task2
print(ret1,ret2 )
asyncio.run(main())

执行顺序,到await task1,执行func,打印1,阻塞,当前任务列表还有task2,所以去执行task2,print(1),阻塞,当前任务列表无其他任务,等待,task1执行完的时候,task2也执行完,然后返回结果,打印ret1,ret2

实例2:在上面的基础上,将任务放到列表中,让代码更清晰,但是await不能接列表对象,所以asyncio.wait()包装一下,得到的是一个元组

import asyncio

async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'

async def main():
task_list = [
asyncio.create_task(func(),name='帅帅'),
asyncio.create_task(func(),name='俊俊')
]
done,pending = await asyncio.wait(task_list)

asyncio.run(main())

done:是一个Task类型的集合,任务的返回值
pending:在async.wait(task_list,timeout=2),表示最多等2秒,未完成的东西状态之类的放在pending中。

示例3:将人物列表放外边

# 直接将task_list 拉到main函数外面会报错:
# 执行asyncio.create_task会将任务加到事件循环中,在外面还没有创建事件循环。

import asyncio

async def func():
print(1)
await asyncio.sleep(2)
print(2)
return '返回值'

# 所以不添加任务,添加协程对象
task_list = [
func(),
func()
]

asyncio.run(asyncio.wait(task_list))
# 不常用

asyncio Future对象 不太常用,偏向底层,是Task类的基类
示例1:

async def main():
# 获取当前的事件循环
loop = asyncio.get_running_loop()

# 创建一个Fulure对象,这个任务什么也不干
fut = loop.create_future()

# 等待任务最终结果(Funture对象)  没有结果就一直等待,程序一直运行
await fut
asyncio.run(main()

# 但是某一时刻future对象有值了,就可以了。

示例2:

import asyncio

async def set_after(fut):
await asyncio.sleep(2)
fut.set_result('666)

async def main():
loop = asyncio.get_running_loop()

fut = loop.create_future()

#  一个Task对象加入事件循环中,执行await fut阻塞执行此Task任务,为fut赋值,继续运行
await loop.create_task( set_after(fut))

data = await fut
print(data)
asyncio.run(main()

concurrent.futures.Future对象
使用线程池或者进程池实现异步操作时用到的对象

import time
from current.futures import Future
from current.futures.thread import ThreadPoolExecutor
from current.futures.process import ProcessPoolExecutor

def func(value):
time,sleep(1)
print(value)
return ret

# 创建一个线程池
pool = ThreadPoolExecutor(max_workers=5)

# 进程池
# pool = ProcessPoolExecutor(max_workers=5)

for i in range(10):
fut = pool.submit(func,i)  # 等待结果
print(fut)

部分程序才会用到两个Future相结合,例如:项目使用了协程,但是MySQL数据库不支持,就要使用线程和进程做异步编程
示例:

import time
import asyncio
import concurrent.futures
# 这是一个不支持协程的第三方模块
def func1():
# 耗时操作
time.sleep(2)
return 'SB'

# 这时本地的协程函数
async def main():
loop = asyncio.get_running_loop()

# 调用这个函数
fut = loop.run_in_executor(None,func1) # None表示默认创建线程池,要使用进程池及那个None替换为进程对象
# 默认创建一个线程池,返回一个Future对象,转换为asyncio的Future对象
result = await fut
print(result)

案例 requests库不支持异步,使用多线程和协程配合

  • 核心,获取当前loop,执行run_in_executor(None,func,*args)得到线程式的Future。
  • requests.get时,多线程并发执行,耗费的资源会多一些,不过没办法,第三方模块不支持的而出此下策
import asyncio
import requests

async def down_load(url:str):
print('开始下载',url)

loop = asyncio.get_running_loop()

future = loop.run_in_executor(None,requests.get,url)

response = await future
print('下载完成')
file_name = url.split('_')[-1]
with open(file_name,mode='wb') as f:
f.write(response.content)

url = ['https://img.moegirl.org/common/4/46/Shinomiya_Kaguya.jpg',
r'https://img.moegirl.org/common/4/46/Shinomiya_Kaguya.jpg']
tasks = [down_load(u) for u in url]

asyncio.run(asyncio.wait(tasks))

异步迭代器

  • 迭代器实现了__iter__和__next__方法,异步迭代器实现了__aiter__和__anext__方法。

示例:

import asyncio

class Reader():
def __init__(self):
self.count = 0
async def readline(self):
self.count += 1
if self.count ==100:
return None
return self.count
def __aiter__(self):
return self

async def __anext__(self):
val = await readline()
if val is None:
raise StopAsuncIteration
return val

# 使用这个迭代器要是用async for ,但是async必须放在协程中。
async def func():
obj = Reader()
async for i in obj:
print(i)

asyncio.run(func())

异步的上下文管理器
该对象定义了__aenter__()和__aexit__()方法来对async with 语句中的环境进行控制。

import asyncio

class AsyncContextManager:
def __init__(self):
pass

async def do_something(self):
pass
# 返回操作的结果,或者操作打开的对象

async def __aenter__(self):
pass
#返回操作的对象

async def __aexit__(self):
pass
# 退出
# 使用时 也要放在协程函数中
async def func():
async with AsyncContextManager() as f:
ret = await f.do_something()
print(ret)
asyncio.run(func())

uvloop
提高默认的asyncio的事件循环的效率
安装:

pip install uvloop

替换默认的:
async.set_event_loop_policy(uvloop.EventLoopPolicy())
但是这个模块在windows10上是不支持的

import asyncio
import uvloop

# 替换
async.set_event_loop_policy(uvloop.EventLoopPolicy())

asyncio.run( func() )
  • Django有一个asgi模块用于框架支持异步,使用uvicorn来启动。这个样子快的原因就是使用了uvloop()的事件循环。

实战案例

  • 异步连接redis
    pip install aioredis
import asyncio
import aioredis
async def execute(address, password):
print("开始执行", address)
# 网络IO操作:创建redis连接
redis = await aioredis.create_redis(address, password=password)
# 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}}
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 网络IO操作:去redis中获取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
redis.close()
# 网络IO操作:关闭redis连接
await redis.wait_closed()
print("结束", address)
asyncio.run(execute('redis://47.93.4.198:6379', "root!2345"))
  • 异步连接mysql
    pip install aiomysql
import asyncio
import aiomysql
async def execute(host, password):
print("开始", host)
# 网络IO操作:先去连接 47.93.40.197,遇到IO则自动切换任务,去连接47.93.40.198:6379
conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql')
# 网络IO操作:遇到IO会自动切换任务
cur = await conn.cursor()
# 网络IO操作:遇到IO会自动切换任务
await cur.execute("SELECT Host,User FROM user")
# 网络IO操作:遇到IO会自动切换任务
result = await cur.fetchall()
print(result)
# 网络IO操作:遇到IO会自动切换任务
await cur.close()
conn.close()
print("结束", host)
task_list = [
execute('47.93.40.197', "root!2345"),
execute('47.93.40.197', "root!2345")
]
asyncio.run(asyncio.wait(task_list))
  • 异步连接数据库(没有支持的异步模块时)

使用基于线程池,进程池的协程来连接数据库。

  • Fastapi框架
import asyncio
import uvicorn
import aioredis
from aioredis import Redis
from fastapi import FastAPI
app = FastAPI()
REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10)
@app.get("/")
def index():
""" 普通操作接口 """
return {"message": "Hello World"}
@app.get("/red")
async def red():
""" 异步操作接口 """
print("请求来了")
await asyncio.sleep(3)
# 连接池获取一个连接
conn = await REDIS_POOL.acquire()
redis = Redis(conn)
# 设置值
await redis.hmset_dict('car', key1=1, key2=2, key3=3)
# 读取值
result = await redis.hgetall('car', encoding='utf-8')
print(result)
# 连接归还连接池
REDIS_POOL.release(conn)
return result
if __name__ == '__main__':
uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
  • 异步爬虫
    pip3 install aiohttp
import aiohttp
import asyncio
async def fetch(session, url):
print("发送请求:", url)
async with session.get(url, verify_ssl=False) as response:
text = await response.text()
print("得到结果:", url, len(text))
async def main():
async with aiohttp.ClientSession() as session:
url_list = [
'https://python.org',
'https://www.baidu.com',
'https://www.pythonav.com'
]
tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]
await asyncio.wait(tasks)
if __name__ == '__main__':
asyncio.run(main())
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: