Python协程-异步编程
使用python协程实现异步编程
协程的作用
- 由于Python全局解释器锁的原因,Python多线程效率不高,同一时刻只运行一个线程,但是在I/O操作
不用CPU的操作
频繁时,可以节省时间。 - 协程的作用就是在遇到I/O阻塞时,去执行其他协程任务,也可以充分的利用CPU的资源,相比线程,还减少了上下文切换的开销。
- CPU密集型任务应该使用多进程,多进程配合协程==
起飞~
python3.4中加入了asyncio,然后可以通过asyncio.coroutine装饰器实现协程;
python3.5中引入了async 和await关键字,使得协程的实现更加简便;
python3.7之后,可以使用asyncio.run()完成事件循环。
协程的使用
await关键字:后面可以接
IO等待:协程对象,Future,Task对象
示例1:
import asyncio async def func(): print('开始I/O') response = await asyncio.sleep(2) # 使用sleep来模拟IO阻塞,在等待时,会去执行其他任务。 print('结束',response) cor = func() asyncio.run(cor)
实例2:
import asyncio async def func(): print('开始I/O') response = await asyncio.sleep(2) # 使用sleep来模拟IO阻塞,在等待时,会去执行其他任务。 return ('结束') async def mfun(): print('执行协程函数内部代码') # 这里的func()是协程对象 response = await func() # func的返回值就是response print("IO请求结果:",response) cor = mfunc() asyncio.run(cor)
执行结果:
>>>执行协程函数内部代码
>>>开始I/O
>>>结束
>>>IO请求结果:结束
Task对象
asyncio.create_task()可以在事件循环中添加多个任务.遇到IO就去执行列表中下一个函数。
示例1:
import asyncio async def func(): print(1) await asyncio.sleep(2) print(2) return '返回值' async def main(): print('开始') # 创建一个task对象,将当前func函数立即加入到事件循环中 task1 = asyncio.create_task(func()) task2 = asyncio.create_task(func()) # await会等待任务执行完毕,但是在执行,但是在执行任务的时候遇到io,再去执行其他任务,直到当前任务完毕返回结果给ret ret1 =await task1 ret2 =await task2 print(ret1,ret2 ) asyncio.run(main())
执行顺序,到await task1,执行func,打印1,阻塞,当前任务列表还有task2,所以去执行task2,print(1),阻塞,当前任务列表无其他任务,等待,task1执行完的时候,task2也执行完,然后返回结果,打印ret1,ret2
实例2:在上面的基础上,将任务放到列表中,让代码更清晰,但是await不能接列表对象,所以asyncio.wait()包装一下,得到的是一个元组
import asyncio async def func(): print(1) await asyncio.sleep(2) print(2) return '返回值' async def main(): task_list = [ asyncio.create_task(func(),name='帅帅'), asyncio.create_task(func(),name='俊俊') ] done,pending = await asyncio.wait(task_list) asyncio.run(main())
done:是一个Task类型的集合,任务的返回值
pending:在async.wait(task_list,timeout=2),表示最多等2秒,未完成的东西状态之类的放在pending中。
示例3:将人物列表放外边
# 直接将task_list 拉到main函数外面会报错: # 执行asyncio.create_task会将任务加到事件循环中,在外面还没有创建事件循环。 import asyncio async def func(): print(1) await asyncio.sleep(2) print(2) return '返回值' # 所以不添加任务,添加协程对象 task_list = [ func(), func() ] asyncio.run(asyncio.wait(task_list)) # 不常用
asyncio Future对象 不太常用,偏向底层,是Task类的基类
示例1:
async def main(): # 获取当前的事件循环 loop = asyncio.get_running_loop() # 创建一个Fulure对象,这个任务什么也不干 fut = loop.create_future() # 等待任务最终结果(Funture对象) 没有结果就一直等待,程序一直运行 await fut asyncio.run(main() # 但是某一时刻future对象有值了,就可以了。
示例2:
import asyncio async def set_after(fut): await asyncio.sleep(2) fut.set_result('666) async def main(): loop = asyncio.get_running_loop() fut = loop.create_future() # 一个Task对象加入事件循环中,执行await fut阻塞执行此Task任务,为fut赋值,继续运行 await loop.create_task( set_after(fut)) data = await fut print(data) asyncio.run(main()
concurrent.futures.Future对象
使用线程池或者进程池实现异步操作时用到的对象
import time from current.futures import Future from current.futures.thread import ThreadPoolExecutor from current.futures.process import ProcessPoolExecutor def func(value): time,sleep(1) print(value) return ret # 创建一个线程池 pool = ThreadPoolExecutor(max_workers=5) # 进程池 # pool = ProcessPoolExecutor(max_workers=5) for i in range(10): fut = pool.submit(func,i) # 等待结果 print(fut)
部分程序才会用到两个Future相结合,例如:项目使用了协程,但是MySQL数据库不支持,就要使用线程和进程做异步编程
示例:
import time import asyncio import concurrent.futures # 这是一个不支持协程的第三方模块 def func1(): # 耗时操作 time.sleep(2) return 'SB' # 这时本地的协程函数 async def main(): loop = asyncio.get_running_loop() # 调用这个函数 fut = loop.run_in_executor(None,func1) # None表示默认创建线程池,要使用进程池及那个None替换为进程对象 # 默认创建一个线程池,返回一个Future对象,转换为asyncio的Future对象 result = await fut print(result)
案例 requests库不支持异步,使用多线程和协程配合
- 核心,获取当前loop,执行run_in_executor(None,func,*args)得到线程式的Future。
- requests.get时,多线程并发执行,耗费的资源会多一些,不过没办法,第三方模块不支持的而出此下策
import asyncio import requests async def down_load(url:str): print('开始下载',url) loop = asyncio.get_running_loop() future = loop.run_in_executor(None,requests.get,url) response = await future print('下载完成') file_name = url.split('_')[-1] with open(file_name,mode='wb') as f: f.write(response.content) url = ['https://img.moegirl.org/common/4/46/Shinomiya_Kaguya.jpg', r'https://img.moegirl.org/common/4/46/Shinomiya_Kaguya.jpg'] tasks = [down_load(u) for u in url] asyncio.run(asyncio.wait(tasks))
异步迭代器
- 迭代器实现了__iter__和__next__方法,异步迭代器实现了__aiter__和__anext__方法。
示例:
import asyncio class Reader(): def __init__(self): self.count = 0 async def readline(self): self.count += 1 if self.count ==100: return None return self.count def __aiter__(self): return self async def __anext__(self): val = await readline() if val is None: raise StopAsuncIteration return val # 使用这个迭代器要是用async for ,但是async必须放在协程中。 async def func(): obj = Reader() async for i in obj: print(i) asyncio.run(func())
异步的上下文管理器
该对象定义了__aenter__()和__aexit__()方法来对async with 语句中的环境进行控制。
import asyncio class AsyncContextManager: def __init__(self): pass async def do_something(self): pass # 返回操作的结果,或者操作打开的对象 async def __aenter__(self): pass #返回操作的对象 async def __aexit__(self): pass # 退出 # 使用时 也要放在协程函数中 async def func(): async with AsyncContextManager() as f: ret = await f.do_something() print(ret) asyncio.run(func())
uvloop
提高默认的asyncio的事件循环的效率
安装:
pip install uvloop
替换默认的:
async.set_event_loop_policy(uvloop.EventLoopPolicy())但是这个模块在windows10上是不支持的
import asyncio import uvloop # 替换 async.set_event_loop_policy(uvloop.EventLoopPolicy()) asyncio.run( func() )
- Django有一个asgi模块用于框架支持异步,使用uvicorn来启动。这个样子快的原因就是使用了uvloop()的事件循环。
实战案例
- 异步连接redis
pip install aioredis
import asyncio import aioredis async def execute(address, password): print("开始执行", address) # 网络IO操作:创建redis连接 redis = await aioredis.create_redis(address, password=password) # 网络IO操作:在redis中设置哈希值car,内部在设三个键值对,即: redis = { car:{key1:1,key2:2,key3:3}} await redis.hmset_dict('car', key1=1, key2=2, key3=3) # 网络IO操作:去redis中获取值 result = await redis.hgetall('car', encoding='utf-8') print(result) redis.close() # 网络IO操作:关闭redis连接 await redis.wait_closed() print("结束", address) asyncio.run(execute('redis://47.93.4.198:6379', "root!2345"))
- 异步连接mysql
pip install aiomysql
import asyncio import aiomysql async def execute(host, password): print("开始", host) # 网络IO操作:先去连接 47.93.40.197,遇到IO则自动切换任务,去连接47.93.40.198:6379 conn = await aiomysql.connect(host=host, port=3306, user='root', password=password, db='mysql') # 网络IO操作:遇到IO会自动切换任务 cur = await conn.cursor() # 网络IO操作:遇到IO会自动切换任务 await cur.execute("SELECT Host,User FROM user") # 网络IO操作:遇到IO会自动切换任务 result = await cur.fetchall() print(result) # 网络IO操作:遇到IO会自动切换任务 await cur.close() conn.close() print("结束", host) task_list = [ execute('47.93.40.197', "root!2345"), execute('47.93.40.197', "root!2345") ] asyncio.run(asyncio.wait(task_list))
- 异步连接数据库(没有支持的异步模块时)
使用基于线程池,进程池的协程来连接数据库。
- Fastapi框架
import asyncio import uvicorn import aioredis from aioredis import Redis from fastapi import FastAPI app = FastAPI() REDIS_POOL = aioredis.ConnectionsPool('redis://47.193.14.198:6379', password="root123", minsize=1, maxsize=10) @app.get("/") def index(): """ 普通操作接口 """ return {"message": "Hello World"} @app.get("/red") async def red(): """ 异步操作接口 """ print("请求来了") await asyncio.sleep(3) # 连接池获取一个连接 conn = await REDIS_POOL.acquire() redis = Redis(conn) # 设置值 await redis.hmset_dict('car', key1=1, key2=2, key3=3) # 读取值 result = await redis.hgetall('car', encoding='utf-8') print(result) # 连接归还连接池 REDIS_POOL.release(conn) return result if __name__ == '__main__': uvicorn.run("luffy:app", host="127.0.0.1", port=5000, log_level="info")
- 异步爬虫
pip3 install aiohttp
import aiohttp import asyncio async def fetch(session, url): print("发送请求:", url) async with session.get(url, verify_ssl=False) as response: text = await response.text() print("得到结果:", url, len(text)) async def main(): async with aiohttp.ClientSession() as session: url_list = [ 'https://python.org', 'https://www.baidu.com', 'https://www.pythonav.com' ] tasks = [asyncio.create_task(fetch(session, url)) for url in url_list] await asyncio.wait(tasks) if __name__ == '__main__': asyncio.run(main())
- python——asyncio模块实现协程、异步编程(三)
- python——asyncio模块实现协程、异步编程(一)
- python——asyncio模块实现协程、异步编程(二)
- 利用python yielding创建协程将异步编程同步化
- 利用 Python yield 创建协程将异步编程同步化
- Python异步编程之协程任务的调度操作实例分析
- Python中的协程和异步编程
- python并发编程&协程
- Python的协程,线程,进程的理解及实现
- python协程的实现(greenlet源码分析)
- python之协程
- python --- 协程
- python中的协程
- Python 多进程 多线程 协程 I/O多路复用
- 【python学习笔记】Python实现协程yield方法和gevent库
- python异步编程之线程异步
- python多任务——协程的使用
- 基础入门_Python-进线协程.分分钟玩转multiprocessing多进程编程?
- python3-----多进程、多线程、多协程
- Python协程与asyncio