您的位置:首页 > 编程语言 > Python开发

Python-19-协程、迭代器、生成器、asyncio、

2019-07-29 11:28 369 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/mk1843109092/article/details/97238977

协程

迭代器

  • 可迭代(Iterable):直接作用于for循环的变量
  • 迭代器(Iterator):不到可以被for循环调用,还可以被next调用
  • list是典型的可迭代对象,但不是迭代器
# 可迭代
l = [i for i in range(10)]
# l是可迭代的,但不是迭代器
for idx in l:
print(idx)
# range是个迭代器
for i in range(10):
print(i)
  • 通常判断可迭代 isinstance 判断某一个变量是否是一个实例
# 判断是否可迭代
from collections import Iterable
ll = [1,2,3,4,5]
print(isinstance(ll, Iterable))
# 判断是否是迭代器
from collections.abc import Iterator
print(isinstance(ll, Iterator))

>>>
True
False
  • 可迭代和迭代器之间转换 iter函数
from collections import Iterable
from collections.abc import Iterator
# iter函数
s = 'i am liuneng'
print(isinstance(s, Iterable))
print(isinstance(s, Iterator))
s_iter = iter(s)
print(isinstance(s_iter, Iterable))
print(isinstance(s_iter, Iterator))

>>>
True
False
True
True

生成器

  • generator:一边循环一边计算下一个元素的算法
  • 需要满座三个条件 每次调用都可以生成下一个元素
  • 如果达到最后一个后,爆出StopIteration异常
  • 可以被next调用
  • 如何生成一个生成器
      直接使用
    • 如果函数中包含yield,则这个函数就叫做生成器
    • next调用函数,遇到yield返回
    # 直接使用生成器
    L = [x*x for x in range(5)]# 放在中括号中的是列表生成器
    G = (x*x for x in range(5))# 放在小括号中是生成器
    print(type(L))
    print(type(G))
    
    >>>
    <class 'list'>
    <class 'generator'>
    # 在函数中,yield负责返回
    def odd():
    print("Step 1")
    yield 1
    print("Step 2")
    yield 2
    print("Step 3")
    yield 3
    
    g = odd()
    print(next(g))
    print(next(g))
    print(next(g))
    
    >>>
    Step 1
    1
    Step 2
    2
    Step 3
    3
    # for循环调用生成器
    def fib(max):
    n, a, b = 0, 0, 1
    while n < max:
    yield b
    a, b= b, a+b
    n += 1
    #需要注意,爆出的异常的返回值是return的值
    return 'Done'
    
    g = fib(5)
    for i in range(6):
    rst = next(g)
    print(rst)
    
    >>>
    1
    1
    2
    3
    5
    ---------------------------------------------------------------------------
    StopIteration                             Traceback (most recent call last)
    <ipython-input-14-e02f38f4be18> in <module>()
    10 g = fib(5)
    11 for i in range(6):
    ---> 12     rst = next(g)
    13     print(rst)
    
    StopIteration: Done
    # 生成器典型用法
    ge = fib(10)
    for i in ge:
    print(i)

    协程

    • 历史历程 3.4引入协程,用yield实现
    • 3.5引入协程语法
    • 实现协程比较好的包有asyncio, tornado, gevent
  • 定义:协程是为了非抢占式多任务产生子程序的计算机程序组件,写成允许不同入口点在不同位置暂停或开始执行程序
  • 理解,协程就是一个可以暂停执行的函数
  • 协程的实现
      yield返回
    • send调用
    def simple_coroutine():
    print('-> start')
    x = yield
    print('-> recived', x)
    sc = simple_coroutine()
    print(111)
    # 可以使用sc.send(None),效果一样
    next(sc) # 预激
    print(2222)
    sc.send('zhexiao')
    • 协程的四个状态 inspect.getgeneratorstate(…) 函数确定,该函数会返回下述字符串中的一个:
    • GEN_CREATED:等待开始执行
    • GEN_RUNNING: 解释器正在执行
    • GEN_SUSPENED:在yield表达式处暂停
    • GEN_CLOSED:执行结束
    • next预激(prime)
    def simple_corotine(a):
    print("-> start")
    b = yield a
    print("-> recived", a, b)
    c = yield a + b
    print("-> recived", a, b, c)
    
    sc = simple_corotine(5)
    aa = next(sc)
    print(aa)
    bb = sc.send(6)
    print(bb)
    cc = sc.send(7)
    print(cc)
    
    >>>
    -> start
    5
    -> recived 5 6
    11
    -> recived 5 6 7
    StopIteration  # 如果达到最后一个后,爆出StopIteration异常
    • 协程终止 协程中未处理的异常会向上冒泡,传给next函数或send方法的调用方
    • 终止协程的一种方式:发送某个哨兵值,让携程退出,内置的None和Ellipsis等常量经常用作哨兵值
  • yield from
      调用协程为了得到返回值,协程必须正常终止
    • 生成器正常终止会发出StopIteration异常,异常对象的value属性保存返回值
    • yield from从内部捕获StopIteration异常
    def gen():
    for c in 'AB':
    yield c
    
    # list直接用生成器作为参数
    print(list(gen()))
    def gen_new():
    yield from 'AB'
    print(list(gen_new()))
    
    >>>
    ['A', 'B']
    ['A', 'B']
    • 委派生成器 包含yield from表达式的一个生成器
    • 委派生成器在yiled from表达式处暂停,调用方可以直接把数据发送给子生成器
    • 子生成器把产生的值发送给调用方
    • 子生成器在最后,解释器会抛出StopIteration异常,并且把返回值附加到异常对象上
    from collections import namedtuple
    ResClass = namedtuple('Res', 'count average')
    # 子生成器
    def averager():
    total = 0.0
    count = 0
    average = None
    while True:
    term = yield
    # None是哨兵值
    if term is None:
    break
    total += term
    count += 1
    average = total/count
    return ResClass(count, average)
    # 委派生成器
    def grouper(storages, key):
    while True:
    # 获取average返回的值
    storages[key] = yield from averager()
    # 客户端代码
    def client():
    process_data = {
    'boys_2':[23.4,25.6,56.4,6.5,6.75,67,7],
    'boys_1':[1.12,1.23,1.45,1.65,1.98,1.73,1.34]
    }
    storages = {}
    for k,v in process_data.items():
    # 获得协程
    coroutine = grouper(storages, k)
    # 预激协程
    next(coroutine)
    # 发送数据到协程
    for dt in v:
    coroutine.send(dt)
    # 终止协程
    coroutine.send(None)
    print(storages)
    # run
    client()

    asyncio

    • python3.4开始引入标准库中,内置对异步io的支持
    • asyncio本身是一个消息循环
    • 步骤: 创建消息循环
    • 把协程导入
    • 关闭
    import asyncio
    @asyncio.coroutine
    def hello():
    print("Hello world!")
    # 异步调用asyncio.sleep(1):
    print("Start......")
    r = yield from asyncio.sleep(3)
    print("Done....")
    print("Hello again!")
    # 获取EventLoop:
    loop = asyncio.get_event_loop()
    # 执行coroutine
    loop.run_until_complete(hello())
    loop.close()
    import threading
    import asyncio
    @asyncio.coroutine
    def hello():
    print('Hello world! (%s)' % threading.currentThread())
    print('Start..... (%s)' % threading.currentThread())
    yield from asyncio.sleep(10)
    print('Done..... (%s)' % threading.currentThread())
    print('Hello again! (%s)' % threading.currentThread())
    loop = asyncio.get_event_loop()
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    import asyncio
    @asyncio.coroutine
    def wget(host):
    print('wget %s...' % host)
    connect = asyncio.open_connection(host, 80)
    reader, writer = yield from connect
    header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
    writer.write(header.encode('utf-8'))
    yield from writer.drain()
    while True:
    line = yield from reader.readline()
    if line == b'\r\n':
    break
    print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
    # Ignore the body, close the socket
    writer.close()
    loop = asyncio.get_event_loop()
    tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

    async和await

    • 为了根号的使用异步
    • python3.5引入
    • 让协程代码跟简单
    • 使用上,可以简单的进行替换 用async替换@asyncio.coroutine
    • await 替换 yield from
    import threading
    import asyncio
    # @asyncio.coroutine
    async def hello():
    print('Hello world! (%s)' % threading.currentThread())
    print('Start..... (%s)' % threading.currentThread())
    await asyncio.sleep(10)
    print('Done..... (%s)' % threading.currentThread())
    print('Hello again! (%s)' % threading.currentThread())
    loop = asyncio.get_event_loop()
    tasks = [hello(), hello()]
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

    aiohttp

    • asyncio实现单线程的并发io,在客户端用处不大
    • 在服务器端可以asyncio+coroutine配合,因为http是io操作
    • asyncio实现了tcp,udp, ssl等协议
    • aiohttp是给予asyncio实现的http框架
    • pip install aiohttp安装
    import asyncio
    from aiohttp import web
    
    async def index(request):
    await asyncio.sleep(0.5)
    return web.Response(body=b'<h1>Index</h1>')
    
    async def hello(request):
    await asyncio.sleep(0.5)
    text = '<h1>hello, %s!</h1>' % request.match_info['name']
    return web.Response(body=text.encode('utf-8'))
    
    async def init(loop):
    app = web.Application(loop=loop)
    app.router.add_route('GET', '/', index)
    app.router.add_route('GET', '/hello/{name}', hello)
    srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
    print('Server started at http://127.0.0.1:8000...')
    return srv
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init(loop))
    loop.run_forever()

    concurrent.futures

    • python3新增的库
    • 类似其他语言的线程池的概念
    • 利用multiprocessing实现真正的并行计算
    • 核心原理:以子进程的形式,并行运行多个python解释器从而令python程序可以利用多核cpu来提升执行速度,由于子进程于主进程解释器相分离,所以他们的全局解释器锁也是享互独立的,每个子进程都能够完整的收i用一个cpu内核
    • concurrent.futures.Executor ThreadPoolExecutor
    • ProcessPoolExecutor
  • submit(fn, args, kwargs)
      fn:异步执行的函数
    • args,kwargs:参数
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def return_future(msg):
    time.sleep(3)
    return msg
    
    # 创建一个线程池
    pool = ThreadPoolExecutor(max_workers=2)
    # 往线程池加入2个task
    f1 = pool.submit(return_future, 'hello')
    f2 = pool.submit(return_future, 'world')
    print(f1.done())
    time.sleep(3)
    print(f2.done())
    print(f1.result())
    print(f2.result())

    concurrent中map函数

    • map(fn, *iterables, timeout=None)
    • 跟map函数类似
    • 函数需要异步执行
    • timeout: 超时时间
    • map跟submit使用一个就行
    import time,re
    import os,datetime
    from concurrent import futures
    data = ['1','2']
    def wait_on(argument):
    print(argument)
    time.sleep(2)
    return "ok"
    ex = futures.ThreadPoolExecutor(max_workers=2)
    for i in ex.map(wait_on,data):
    print(i)
    from concurrent.futures import ThreadPoolExecutor as Pool
    #import requests
    import urllib
    from urllib import request
    
    URLS = ['http://www.baidu.com', 'http://qq.com', 'http://sina.com']
    def task(url, timeout=20):
    #return requests.get(url, timeout=timeout)
    return request.urlopen(url, timeout=timeout)
    
    pool = Pool(max_workers=3)
    results = pool.map(task, URLS)
    
    import time
    time.sleep(20)
    for ret in results:
    print('%s, %s' % (ret.url, len(ret.read())))
    import time,re,fcntl
    import os,datetime
    from concurrent import futures
    
    count_list = list()
    MinuteNum = 1
    StartTime = datetime.datetime(2018, 5, 1, 19, 31, 0, 484870)
    NowTime = datetime.datetime.now()
    os.system(':>new.txt')
    f_new = open('new.txt','a')
    
    def conc(CountTimeFormat):
    f = open('push_slave.stdout', 'r')
    for line in f.readlines():
    if re.search(CountTimeFormat,line):
    #获得文件专用锁
    fcntl.flock(f_new, fcntl.LOCK_EX)
    f_new.writelines(line)
    f_new.flush()
    #释放文件锁
    fcntl.flock(f_new, fcntl.LOCK_UN)
    break
    
    while 1:
    AfterOneMinute = datetime.timedelta(minutes=MinuteNum)
    CountTime = AfterOneMinute + StartTime
    CountTimeFormat = CountTime.strftime('%Y-%m-%d %H:%M')
    MinuteNum = MinuteNum+1
    count_list.append(CountTimeFormat)
    if CountTimeFormat == "2018-05-2 16:00":
    break
    
    def exec_cmd():
    with futures.ProcessPoolExecutor(max_workers=24) as executor:
    dict(( executor.submit(conc, times), times) for times in count_list)
    
    if __name__ == '__main__':
    exec_cmd()
    f_new.close()

    Future

    • 未来需要完成的任务 future 实例由Excutor.submit创建
    from concurrent.futures import ThreadPoolExecutor as Pool
    from concurrent.futures import as_completed
    import requests
    
    URLS = ['http://qq.com', 'http://sina.com', 'http://www.baidu.com', ]
    
    def task(url, timeout=10):
    return requests.get(url, timeout=timeout)
    
    with Pool(max_workers=3) as executor:
    future_tasks = [executor.submit(task, url) for url in URLS]
    for f in future_tasks:
    if f.running():
    print('%s is running' % str(f))
    for f in as_completed(future_tasks):
    try:
    ret = f.done()
    if ret:
    f_ret = f.result()
    print('%s, done, result: %s, %s' % (str(f), f_ret.url, len(f_ret.content)))
    except Exception as e:
    f.cancel()
    print(str(e))
  • 内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
    标签: