您的位置:首页 > 编程语言 > Python开发

Python并行化处理

2020-02-01 04:53 691 查看

Python并行化处理

原文:Parallel Processing in Python
作者:Frank Hofmann
原始翻译:Diwei
说明:原始翻译缺少代码示例,本翻译进行补充和验证

简介

当你在机器上启动某个程序时,它只是在自己的“bubble”里面运行,这个气泡的作用就是用来将同一时刻运行的所有程序进行分离。这个“bubble”也可以称之为进程,包含了管理该程序调用所需要的一切。

例如,这个所谓的进程环境包括该进程使用的内存页,处理该进程打开的文件,用户和组的访问权限,以及它的整个命令行调用,包括给定的参数。

此信息保存在UNIX/Linux系统的流程文件系统中,该系统是一个虚拟文件系统,可通过/proc目录进行访问。条目都已经根据进程ID排过序了,该ID是每个进程的唯一标识符。示例1显示了具有进程ID#177的任意选择的进程。

示例1:可用于进程的信息

root@system:/proc/177# ls
attr         cpuset   limits      net            projid_map   statm
autogroup    cwd      loginuid    ns             root         status
auxv         environ  map_files   numa_maps      sched        syscall
cgroup       exe      maps        oom_adj        sessionid    task
clear_refs   fd       mem         oom_score      setgroups    timers
cmdline      fdinfo   mountinfo   oom_score_adj  smaps        uid_map
comm         gid_map  mounts      pagemap        stack        wchan
coredump_filter       io          mountstats     personality  stat

实际运行结果:

构建程序代码以及数据

程序越复杂越容易将它分成更小的部分。这不仅仅涉及源代码,还涉及在您的计算机上执行的代码。对此的一个解决方案是将子过程与并行执行结合使用。这背后的想法是:

  • 单个流程涵盖了可以单独运行的一段代码
  • 某些代码段可以同时运行,并且原则上允许并行化
  • 使用现代处理器和操作系统的功能,例如处理器的每个核心,我们可以减少程序的总执行时间
  • 减少程序/代码的复杂性,并将工作外包给作为子进程的专用代理
    使用子进程需要重新考虑程序的执行方式,从线性到并行。它类似于将公司的工作视角从普通员工转变为经理——你必须关注谁在做什么,某个步骤需要多长时间,以及中间结果之间的依赖关系。

这有利于将代码分割成更小的部分,这些更小的部分可以由专门用于此任务的代理执行。如果还没有想清楚,试想一下数据集的构造原理,它也是同样的道理,这样就可以由单个代理进行有效的处理。但是这也引出了一些问题:

-为什么要将代码并行化?落实到具体案例中或者在努力的过程中,思考这个问题有意义吗?

  • 程序是否打算只运行一次,还是会定期运行在类似的数据集上?
  • 能把算法分成几个单独的执行步骤吗?
  • 数据是否允许并行化?如果不允许,那么数据组织将以何种方式进行调整?
  • 计算的中间结果是否相互依赖?
  • 需要对硬件进行调整吗?
  • 在硬件或算法中是否存在瓶颈,如何避免或者最小化这些因素的影响?
  • 并行化的其他副作用有哪些?

可能的用例就是主进程,以及后台运行的等待被激活的守护进程(主/从)。此外,这可能是启动按需运行的工作进程的一个主要过程。在实践中,主要的过程是一个馈线过程,它控制两个或多个被馈送数据部分的代理,并在给定的部分进行计算。

请记住,由于操作系统所需要的子进程的开销,并行操作既昂贵又耗时。与以线性方式运行两个或多个任务相比,在并行的情况下,根据您的用例,可以在每个子过程中节省25%到30%的时间。例如,如果在串行中执行了两项消耗5秒的任务,那么总共需要10秒的时间,但是在并行化的情况下,在多核机器上大概需要8秒。有3秒是用于各种开销,即这部分是无法压缩和优化的,所以速度提高是有极限的。

运行于python并行的函数

Python提供了四种可能的处理方式。首先可以使用multiprocessing模块并行执行功能。第二,进程的替代方法是线程。从技术上讲,这些都是轻量级的进程,不在本文的范围之内。想了解更加详细的内容,可以看看Python的线程模块。第三,可以使用

os
模块的
system()
方法或
subprocess
模块提供的方法调用外部程序,然后收集结果。
Python提供了四种可能的处理方式。首先可以使用multiprocessing模块并行执行功能。第二,进程的替代方法是线程。从技术上讲,这些都是轻量级的进程,不在本文的范围之内。想了解更加详细的内容,可以看看Python的线程模块。第三,可以使用os模块的system()方法或subprocess模块提供的方法调用外部程序,然后收集结果。

multiprocessing
模块涵盖了一系列方法来处理并行执行例程。这包括进程,代理池,队列以及管道。

清单1使用了五个代理程序池,同时处理三个值的块。对于代理的数量和对

chunksize
的值都是任意选择的,用于演示目的。根据处理器中核心的数量来调整这些值。

Pool.map()
方法需要三个参数 - 在数据集的每个元素上调用的函数,数据集本身和
chunksize
。在清单1中,我们使用
square
函数,并计算给定整数值的平方。此外,
chunksize
不是必须的。如果未明确设置,则默认
chunksize
为1。

请注意,代理商的执行订单不能保证,但结果集的顺序是正确的。它根据原始数据集的元素的顺序包含平方值。

清单1:并行运行函数

from multiprocessing import Pool

def square(x):
# calculate the square of the value of x
return x*x

if __name__ == '__main__':

# Define the dataset
dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

# Output the dataset
print ('Dataset: ' + str(dataset))

# Run this with a pool of 5 agents having a chunksize of 3 until finished
agents = 5
chunksize = 3
with Pool(processes=agents) as pool:
result = pool.map(square, dataset, chunksize)

# Output the result
print ('Result:  ' + str(result))

运行结果为:

$ python3 pool_multiprocessing.py
Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Result:  [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]

实际操作结果为:

注意:我们使用python3来运行这些例子。

思考:在实际的编程中,如何设置

agents
chunksize
的值?
说明:

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个控制worker pool的process pool对象处理被提交的工作。 它支持带有超时和回调的异步结果,并具有并行映射的功能。
(1)processes是要使用的worker processes数量。如果processes是

None
, 那么将使用
os.cpu_count()
的返回值(即系统环境下可用的CPU数量)。
(2)
map(func, iterable[, chunksize])

一个map()内置函数的并行等效函数,它只支持一个迭代的声明。该函数会关闭阻断知道直到得到结果。
这个方法将迭代变量切成一些块,它们被作为一些独立的任务提交给process pool. 这些块的大小由正整数
chunksize
来指定。
注意:对于很长的变量可能会导致很高的内存使用,可以考虑使用具有明确chunksize的 imap() 或者 imap_unordered()来提高效率。
参考说明:https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool

使用队列运行多个函数

作为数据结构,队列是非常普遍的,并且以多种方式存在。 它被组织为先进先出(FIFO)或先进先出(LIFO)/堆栈,以及有和没有优先级(优先级队列)。 数据结构被实现为具有固定数量条目的数组,或作为包含可变数量的单个元素的列表。

在列表2.1-2.7中,我们使用FIFO队列。 它被实现为已经由来自multiprocessing模块的相应类提供的列表。此外,time模块被加载并用于模拟工作负载。

清单2.1:要使用的模块

import multiprocessing
from time import sleep

接下来,定义一个worker函数(清单2.2)。 该函数实际上代表代理,需要三个参数。进程名称指示它是哪个进程,

tasks
results
都指向相应的队列。

在工作函数里面是一个无限的

while
循环。
tasks
results
都是在主程序中定义的队列。
tasks.get()
从要处理的任务队列中返回当前任务。小于0的任务值退出
while
循环,返回值为-1。任何其他任务值都将执行一个计算(平方),并返回此值。将值返回到主程序实现为
result.put()
。这将在
results
队列的末尾添加计算值。

# define worker function
def calculate(process_name, tasks, results):
print('[%s] evaluation routine starts' % process_name)

while True:
new_value = tasks.get()
if new_value < 0:
print('[%s] evaluation routine quits' % process_name)

# Indicate finished
results.put(-1)
break
else:
# Compute result and mimic a long-running task
compute = new_value * new_value
sleep(0.02*new_value)

# Output which process received the value
# and the calculation result
print('[%s] received value: %i' % (process_name, new_value))
print('[%s] calculated value: %i' % (process_name, compute))

# Add result to the queue
results.put(compute)

return

下一步是主循环(参见清单2.3)。首先,定义了进程间通信(IPC)的经理。接下来,添加两个队列,一个保留任务,另一个用于结果。

清单2.3:IPC和队列

if __name__ == "__main__":
# Define IPC manager
manager = multiprocessing.Manager()

# Define a list (queue) for tasks and computation results
tasks = manager.Queue()
results = manager.Queue()

完成此设置后,我们定义一个具有四个工作进程(代理)的进程池。我们使用类

multiprocessing.Pool()
,并创建一个它的实例。 接下来,我们定义一个空的进程列表(见清单2.4)。

清单2.4:定义一个进程池

# Create process pool with four processes
num_processes = 4
pool = multiprocessing.Pool(processes=num_processes)
processes = []

作为以下步骤,我们启动了四个工作进程(代理)。 为了简单起见,它们被命名为“P0”到“P3”。使用

multiprocessing.Pool()
完成创建四个工作进程。这将它们中的每一个连接到worker功能以及任务和结果队列。 最后,我们在进程列表的末尾添加新初始化的进程,并使用
new_process.start()
启动新进程(参见清单2.5)。
清单2.5:准备worker进程

# Initiate the worker processes
for i in range(num_processes):

# Set process name
process_name = 'P%i' % i

# Create the process, and connect it to the worker function
new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results))

# Add new process to the list of processes
processes.append(new_process)

# Start the process
new_process.start()

我们的工作进程正在等待工作。我们定义一个任务列表,在我们的例子中是任意选择的整数。这些值将使用

tasks.put()
添加到任务列表中。每个工作进程等待任务,并从任务列表中选择下一个可用任务。 这由队列本身处理(见清单2.6)。

清单2.6:准备任务队列

# Fill task queue
task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3]
for single_task in task_list:
tasks.put(single_task)

# Wait while the workers process
sleep(5)

过了一会儿,我们希望我们的代理完成。 每个工作进程对值为-1的任务做出反应。 它将此值解释为终止信号,此后死亡。 这就是为什么我们在任务队列中放置尽可能多的-1,因为我们有进程运行。 在死机之前,终止的进程会在结果队列中放置-1。 这意味着是代理正在终止的主循环的确认信号。

在主循环中,我们从该队列读取,并计数-1。 一旦我们计算了我们有过程的终止确认数量,主循环就会退出。 否则,我们从队列中输出计算结果。

清单2.7:结果的终止和输出

# Quit the worker processes by sending them -1
for i in range(num_processes):
tasks.put(-1)

# Read calculation results
num_finished_processes = 0
while True:
# Read result
new_result = results.get()

# Have a look at the results
if new_result == -1:
# Process has finished
num_finished_processes += 1

if num_finished_processes == num_processes:
break
else:
# Output result
print('Result:' + str(new_result))

示例2显示了Python程序的输出。 运行程序不止一次,您可能会注意到,工作进程启动的顺序与从队列中选择任务的进程本身不可预测。 但是,一旦完成结果队列的元素的顺序与任务队列的元素的顺序相匹配。

示例2:

$ python3 queue_multiprocessing.py
[P0] evaluation routine starts
[P1] evaluation routine starts
[P2] evaluation routine starts
[P3] evaluation routine starts
[P1] received value: 1
[P1] calculated value: 1
[P0] received value: 43
[P0] calculated value: 1849
[P0] received value: 68
[P0] calculated value: 4624
[P1] received value: 142
[P1] calculated value: 20164
result: 1
result: 1849
result: 4624
result: 20164
[P3] received value: 256
[P3] calculated value: 65536
result: 65536
[P0] received value: 183
[P0] calculated value: 33489
result: 33489
[P0] received value: 3
[P0] calculated value: 9
result: 9
[P0] evaluation routine quits
[P1] received value: 334
[P1] calculated value: 111556
result: 111556
[P1] evaluation routine quits
[P3] received value: 325
[P3] calculated value: 105625
result: 105625
[P3] evaluation routine quits
[P2] received value: 780
[P2] calculated value: 608400
result: 608400
[P2] evaluation routine quits

实际测试结果示例如下:

注意:如前所述,由于执行顺序不可预测,您的输出可能与上面显示的输出不一致。

思考:上述结果是如何得出的?涉及到了哪些与并行化处理相关的函数?这些函数分别有哪些功能?

使用os.system()方法

system()
方法是os模块的一部分,它允许在与Python程序的单独进程中执行外部命令行程序。
system()
方法是一个阻塞调用,你必须等到调用完成并返回。 作为UNIX / Linux拜物教徒,您知道可以在后台运行命令,并将计算结果写入重定向到这样的文件的输出流(参见示例3):
示例3:带有输出重定向的命令

$ ./program >> outputfile &

在Python程序中,您只需简单地封装此调用,如下所示:
清单3:使用os模块进行简单的系统调用

import os

os.system("./program >> outputfile &")

此系统调用创建一个与当前Python程序并行运行的进程。 获取结果可能会变得有点棘手,因为这个调用可能会在你的Python程序结束后终止 - 你永远都不会知道。

使用这种方法比我描述的先前方法要贵得多。 首先,开销要大得多(进程切换),其次,它将数据写入物理内存,比如一个需要更长时间的磁盘。 虽然这是一个更好的选择,你的内存有限(像RAM),而是可以将大量输出数据写入固态磁盘。

使用子进程模块

该模块旨在替换

os.system()
os.spawn()
调用。子过程的想法是简化产卵过程,通过管道和信号与他们进行通信,并收集他们生成的输出包括错误消息。

从Python 3.5开始,子进程包含方法

subprocess.run()
来启动一个外部命令,它是底层
subprocess.Popen()
类的包装器。 作为示例,我们启动UNIX/Linux命令
df -h
,以查找机器的
/ home
分区上仍然有多少磁盘空间。在Python程序中,您可以执行如下所示的调用(清单4)。

清单4:运行外部命令的基本示例

import subprocess

ret = subprocess.run(["df", "-h", "/home"])
print(ret)

这是基本的调用,非常类似于在终端中执行的命令

df -h / home
。请注意,参数被分隔为列表而不是单个字符串。输出将与示例4相似。与此模块的官方Python文档相比,除了调用的返回值之外,它将调用结果输出到
stdout

示例4显示了我们的呼叫的输出。输出的最后一行显示命令的成功执行。调用

subprocess.run()
返回一个类
CompletedProcess
的实例,它有两个名为
args
(命令行参数)的属性和
returncode
(命令的返回值)。

示例4:运行清单4中的Python脚本

$ python3 diskfree.py
Filesystem   Size   Used  Avail Capacity  iused   ifree %iused  Mounted on
/dev/sda3  233Gi  203Gi   30Gi    88% 53160407 7818407   87%   /home
CompletedProcess(args=['df', '-h', '/home'], returncode=0)

要抑制输出到

stdout
,并捕获输出和返回值进行进一步的评估,
subprocess.run()
的调用必须稍作修改。没有进一步修改,
subprocess.run()
将执行的命令的输出发送到
stdout
,这是底层Python进程的输出通道。 要获取输出,我们必须更改此值,并将输出通道设置为预定义值
subprocess.PIPE
。清单5显示了如何做到这一点。

清单5:抓取管道中的输出

import subprocess

# Call the command
output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE)

# Read the return code and the output data
print ("Return code: %i" % output.returncode)
print ("Output data: %s" % output.stdout)

如前所述,

subprocess.run()
返回一个类
CompletedProcess
的实例。在清单5中,这个实例是一个简单命名为
output
的变量。该命令的返回码保存在属性
output.returncode
中,打印到
stdout
的输出可以在属性
output.stdout
中找到。 请注意,这不包括处理错误消息,因为我们没有更改输出渠道。

结论

由于现在的硬件已经很厉害了,因此也给并行处理提供了绝佳的机会。Python也使得用户即使在非常复杂的级别,也可以访问这些方法。正如在

multiprocessing
subprocess
模块之前看到的那样,可以让你很轻松的对该主题有很深入的了解。

致谢

(1)感谢原来因为文献的作者Frank Hofmann 以及 Gerold Rupprecht
(2)感谢本文原始中文翻译Diwei

  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
LonghaoJia1997 发布了2 篇原创文章 · 获赞 1 · 访问量 1176 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: