您的位置:首页 > 编程语言 > Python开发

流畅python学习笔记:第十七章:并发处理二

2018-03-14 22:44 531 查看

本章讨论python3.2引入的concurrent.futures模块。future是中文名叫期物。期物是一种对象,表示异步执行的操作

在很多任务中,特别是处理网络I/O。需要使用并发,因为网络有很高的延迟。所以为了不浪费CPU周期去等待,最好在收到网络响应之前做些其他的事。

首先来看下并发和非并发的两个脚本,来对比下各自的运行效率。在这个程序中,我们通过脚本去网站下载各个国家的国旗。网址是http://flupy.org/data/flags/cn/cn.gif

这里http://flupy.org/是基本URL,后面接/data/flags/国家名称/国家名称.gif

首先来看下顺序下载的代码:

country=('CN IN US ID BR PK NG BD JP MX PH VN ET EG DE IR TR CD FR').split()

 

BASE_URL='http://flupy.org/data/flags'

 

DEST_URL='downloads/'

 

def save_flag(img,filename):

    path=os.path.join(DEST_URL,filename)

    with open(path,'wb') as f:

        f.write(img)

 

def get_flag(cc):

    url='{}/{cc}/{cc}.gif'.format(BASE_URL,cc=cc.lower())

    resp=requests.get(url)

    return resp.content

 

def show(text):

    print(text)

    sys.stdout.flush()

 

def download_many(cc_list):

    for cc in sorted(cc_list):

        img=get_flag(cc)

        show(cc)

        save_flag(img,cc.lower()+'.gif')

    return len(cc_list)

 

def main(download_many):

    t1=time.time()

    count=download_many(country)

    elapsed=time.time()-t1

    msg='\n{} flags downloaded in {:.2f}s'

    print(msg.format(count,elapsed))

 

 

 

if __name__=="__main__":

    main(download_many)

 

代码中通过requests对图片进行下载并且保存到downloads文件夹下面。并且在main中统计代码运行的时间。运行结果如下:

/usr/bin/python3.6 /home/zhf/py_prj/function_test/test.py

BD

BR

CD

CN

DE

EG

ET

FR

ID

IN

IR

JP

MX

NG

PH

PK

TR

US

VN

19 flags downloaded in 15.29s

下载了19个图片总共花费15.29秒。接下来我们用concurrent.futures模块来对代码进行改造。在这里添加download_one和download_many_futures两个函数

在ThreadPoolExecutor中设置最大运行的线程max_workers为3个

executor.submit中传入单个的回调函数和参数

future.result()返回的是每个线程运行完后的结果,在这里就是download_one的返回值

futures.as_completed是一个迭代器,在期物运行结束后产出期物

def download_one(cc):

    image=get_flag(cc)

    show(cc)

    save_flag(image,cc.lower()+'.gif')

    return cc

 

def download_many_futures(cc_list):

    with futures.ThreadPoolExecutor(max_workers=3) as executor:

        to_do=[]

        for cc in sorted(cc_list):

            future=executor.submit(download_one,cc)

            to_do.append(future)

            msg='Scheduled for {}:{}'

            print(msg.format(cc,future))

 

        results=[]

        for future in futures.as_completed(to_do):

            res=future.result()

            msg='{} result:{!r}'

            print(msg.format(future,res))

            results.append(res)

return len(results)

来看下运行的结果:总共耗时6.72秒,比之前的顺序下载节省了一半的时间

Scheduled for BD:<Future at 0x7f51f9c7e908 state=running>

Scheduled for BR:<Future at 0x7f51f9c7ef60 state=running>

Scheduled for CD:<Future at 0x7f51f8a20518 state=running>

Scheduled for CN:<Future at 0x7f51f8a20ef0 state=pending>

Scheduled for DE:<Future at 0x7f51f8a20f60 state=pending>

Scheduled for EG:<Future at 0x7f51f8a2c048 state=pending>

Scheduled for ET:<Future at 0x7f51f8a2c0f0 state=pending>

Scheduled for FR:<Future at 0x7f51f8a2c198 state=pending>

Scheduled for ID:<Future at 0x7f51f8a2c278 state=pending>

Scheduled for IN:<Future at 0x7f51f8a2c358 state=pending>

Scheduled for IR:<Future at 0x7f51f8a2c438 state=pending>

Scheduled for JP:<Future at 0x7f51f8a2c518 state=pending>

Scheduled for MX:<Future at 0x7f51f8a2c5f8 state=pending>

Scheduled for NG:<Future at 0x7f51f8a2c6d8 state=pending>

Scheduled for PH:<Future at 0x7f51f8a2c7b8 state=pending>

Scheduled for PK:<Future at 0x7f51f8a2c898 state=pending>

Scheduled for TR:<Future at 0x7f51f8a2c978 state=pending>

Scheduled for US:<Future at 0x7f51f8a2ca58 state=pending>

Scheduled for VN:<Future at 0x7f51f8a2cb38 state=pending>

BD

<Future at 0x7f51f9c7e908 state=finished returned str> result:'BD'

BR

<Future at 0x7f51f9c7ef60 state=finished returned str> result:'BR'

CD

<Future at 0x7f51f8a20518 state=finished returned str> result:'CD'

CN

<Future at 0x7f51f8a20ef0 state=finished returned str> result:'CN'

DE

<Future at 0x7f51f8a20f60 state=finished returned str> result:'DE'

EG

<Future at 0x7f51f8a2c048 state=finished returned str> result:'EG'

FR

<Future at 0x7f51f8a2c198 state=finished returned str> result:'FR'

ID

ET

<Future at 0x7f51f8a2c0f0 state=finished returned str> result:'ET'

<Future at 0x7f51f8a2c278 state=finished returned str> result:'ID'

IN

<Future at 0x7f51f8a2c358 state=finished returned str> result:'IN'

JP

<Future at 0x7f51f8a2c518 state=finished returned str> result:'JP'

IR

<Future at 0x7f51f8a2c438 state=finished returned str> result:'IR'

NG

<Future at 0x7f51f8a2c6d8 state=finished returned str> result:'NG'

MX

<Future at 0x7f51f8a2c5f8 state=finished returned str> result:'MX'

PK

<Future at 0x7f51f8a2c898 state=finished returned str> result:'PK'

PH

<Future at 0x7f51f8a2c7b8 state=finished returned str> result:'PH'

TR

<Future at 0x7f51f8a2c978 state=finished returned str> result:'TR'

US

<Future at 0x7f51f8a2ca58 state=finished returned str> result:'US'

VN

<Future at 0x7f51f8a2cb38 state=finished returned str> result:'VN'

 

19 flags downloaded in 6.72s。如果我们将max_workers设置为更大的值,比如设置为10, 得到的运行时间将会更快。通过运行的结果最快的时候达到1.9秒。

 

我们都知道python有全局解释器锁GIL,一次只允许使用一个线程执行python字节码。因此一个Python进程通过不能同时使用多个CPU核。关于GIL的解释可以参考http://cenalulu.github.io/python/gil-in-python/这篇帖子。

那么如果我们有多个CPU核(os.cpu_count可以查看有多少个CPU核),我们该怎么利用呢。这里就要用到futures.ProcessPoolExecutor,ProcessPoolExecutor将工作分配给多个python进程处理。因此如果需要做CPU密集型处理,使用这个模块能够绕开GIL,利用所有可用的CPU核。我们将代码修改为ProcessPoolExecutor来执行看下结果

19 flags downloaded in 4.92s。这个比设置

futures.ThreadPoolExecutor(max_workers=10)的时候还要慢一些。这是由于我的电脑只有4个CPU核,因此限制只能4个并发下载。但是线程版本使用的是10个线程。在用线程运行的时候,所有阻塞型的I/O函数都会释放GIL,允许其他线程运行。time.sleep()也会释放GIL,因此尽管有GIL,python的线程还是能发挥作用。

在一章中我们将介绍asynchio包处理并发

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: