您的位置:首页 > 编程语言 > Python开发

python3源码剖析之concurrent.futures.ThreadPoolExecutor

2017-12-15 13:54 465 查看

0x00 文档路径:

Python » 3.6.2 Documentation » The Python Standard Library » 17. Concurrent Execution »

0x01 模块简述:

添加于python3.2

提供更加高效的接口来实现异步执行

通过具体实现来剖析

0x02 具体实现

参考官方文档给出的例子

from concurrent.futures import ThreadPoolExecutor


with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())


通过ThreadPoolExecutor来生成一个Executor对象

源码位置

Lib\concurrent\futures\thread.py 83行


一共两个参数max_workers=None、thread_name_profix=”

max_workers

用来指定最大线程数

if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (os.cpu_count() or 1) * 5


先判断是否是None,即未指定时,默认是cpu数量*5,比如你是四核的cpu,那么默认最大线程20

再判断是否小于0,若小于则抛出ValueError

调用ThreadPoolExecutor对象的submit方法

ThreadPoolExecutor继承了Executor对象,并实现了部分重写

源码位置

Lib\concurrent\futures\thread.py 106行


一共三个参数 fn , *rags , **kwargs

submit方法执行 fn(*args,**kwargs) , 然后返回一个Future对象

调用Future对象的result方法

返回被执行函数的结果

源码位置

Lib\concurrent\futures\_base.py 378行


一个参数timeout = None

不指定时,将不会限制等待时间,即一直等到函数完成

如果在指定时间内函数未完成,则抛出异常TimtoutError

因为with上下文管理器的原因,自动调用Executor对象的shutdown方法来释放资源

直接使用with 语句即可

另一个例子

from telnetlib import Telnet
def detect_port(port):
try:
print('{} testing .. '.format(port))
Telnet('127.0.0.1',port,timeout=5)
print('{} opened '.format(port))
except:
pass
with ThreadPoolExecutor(100)as executor:
executor.map(detect_port,range(500))


map方法

完全继承于Executor

源码位置

Lib\concurrent\futures\_base.py


一共四个参数 fn , *iterables , timeout = None , chunksize = 1

chunsize用于ProcessPoolExecutor,其他的参数同上面的submit

def map(self, fn, *iterables, timeout=None, chunksize=1):

if timeout is not None:
end_time = timeout + time.time()

fs = [self.submit(fn, *args) for args in zip(*iterables)]

def result_iterator():
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()


map里面调用了内置函数zip,zip函数的特点是

In [1]: print(*zip('abc',[1,2,3,4]))
('a', 1) ('b', 2) ('c', 3)


举例说

def detect_port(ip,port):
检测端口
with ThreadPoolExecutor()as ecexutor:
executor.map(detect_port,
a950
['127.0.0.1','192.168.1.10'],['21','22','23'])


原本是打算逐个ip扫描这三个端口是否开放

但是此时只会扫描127.0.0.1的21端口,以及192.168.1.10的23端口

def detect_port(ip,port):
检测端口

def detect_all_ip(ip):
ports = [xx,xx,..]
with ThreadPoolExecutor()as executor:
[executor.submit(detect_port,ip,ports) for port in ports]

def main():
ips = ['xxx','xxx',..]
with ThreadPoolExecutor()as executor:
executor.map(detect_all_ip,ips)


0x03 myself

如下是之前写的通过telnet扫描端口的脚本,便是利用了ThreadPoolExecutor,可供参考

detectPortByTelnet.py下载

新搭了博客,有兴趣的朋友们可以去踩

楼兰’s Blog
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息