您的位置:首页 > 编程语言 > Python开发

Python并行处理——多线程

2016-04-30 11:09 507 查看

Parallel Processing

并行处理利用多个CPU在同一时间处理多条指令,这是个很有用的技术,但是它带来的问题也是很多的,比如多个处理器怎么共享数据?这个会造成很多读写冲突,因此需要学会管理这些因素,写出更强大的代码来进行快速并有意义的数据分析。

Mutability

在python中,有些变量像intergers,是不可变的。这意味着他们的值是不可变的。

def add1(x):
x = x + 1
x = 5
add1(x)
print(x)
'''
5
'''


但是像字典和列表是可变的,可以代表一些会改变的信息,并行处理中可变变量是很有用的,因为我们并行处理中会共享变量,而且几个处理器会编辑同一个变量。下面创建一个Counter类,看看变量是如何变化的:

class Counter():
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
def get_count(self):
return self.count

def count_up_100000(counter):
for i in range(100000):
counter.increment()

counter = Counter()
initial_count = counter.get_count()
count_up_100000(counter)
final_count = counter.get_count()
print(final_count)
'''
100000
'''


Multithreading

上面那个例子中发现counter这个实例的count值已经变为100000了,我们发现在count_up_100000这个函数中,一直在调用Counter的一个方法,对counter的count值进行+1,这个操作实际上可以并行处理,假设我们有100000个CPU,那么我们一次就可以运行所有循环,这就是多线程(multithreading)。

一个线程(thread)就是一条执行路径。在Python中我们可以利用threading.Thread()来创建新的线程,作为一个独立的过程来执行一个给定的函数,比如上一个例子中,我们创建新的线程来执行count_up_100000这个函数,启动一个线程的方法是thread.start()。当这个线程结束执行后才会执行thread.join(),将其join到主线程中。

import threading
counter = Counter()
count_thread = threading.Thread(target=count_up_100000, args=[counter])
count_thread.start()
count_thread.join()
after_join = counter.get_count()
print(after_join)
'''
100000
'''


Determinism

大部分单线程操作的结果是确定的(deterministic ),因为我们是一步一步执行程序,结果是可以预测的。现在考虑一个情况,你让你的一个朋友来数数,数到100000,你是主线程,创建了这样一个数数线程,然后当它数完就会告诉你,这类似与join主线程。但是在他还没有数完的过程中,count的结果是不确定的(nondeterministic)。

import threading
def conduct_trial():
counter = Counter()
count_thread = threading.Thread(target=count_up_100000, args=[counter])
count_thread.start()
intermediate_value = counter.get_count()
count_thread.join()
return intermediate_value

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
24145
24762
24881
'''


在还未join时,也就是线程还未结束时,此时的count的值是不确定的。

Enforcing Determinism

互斥锁同步

线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。其中,锁定方法acquire可以有一个超时时间的可选参数timeout。如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。

同步阻塞

当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”。直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。

import threading
def count_up_100000(counter, lock):
for i in range(10000):
lock.acquire()
for i in range(10):
counter.increment()
lock.release()

def conduct_trial():
counter = Counter()
lock = threading.Lock()
count_thread = threading.Thread(target=count_up_100000, args=[counter, lock])
count_thread.start()
lock.acquire()
intermediate_value = counter.get_count()
lock.release()
count_thread.join()
return intermediate_value

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
20940
21100
21030
'''


上面这段代码将+10锁起来,也就是获取的值都是10的倍数,只有+10了这个线程才可能释放,因此无论何时访问count,都是10的倍数。

Counting Twice

def count_up_100000(counter):
for i in range(100000):
counter.increment()

counter = Counter()
def count_up_100000(counter):
for i in range(100000):
counter.increment()

counter = Counter()
count_up_100000(counter)
count_up_100000(counter)
final_count = counter.get_count()
print(final_count)
'''
200000
'''


Splitting Our Count Into Two Threads

为了完成上面那个功能,我们其实可以引入两个线程,每个计数100000,然后和就是200000.

def count_up_100000(counter):
for i in range(100000):
counter.increment()

def conduct_trial():
counter = Counter()
count_thread1 = threading.Thread(target=count_up_100000, args=[counter])
count_thread2 = threading.Thread(target=count_up_100000, args=[counter])

count_thread1.start()
count_thread2.start()
count_thread1.join()
count_thread2.join()
final_count = counter.get_count()
return final_count

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
200000
200000
134250
'''


Atomicity

在上一个操作中我们发现结果是不确定的,counter.increment()函数内部self.count += 1实际包含两行代码: old_count = self.count以及self.count = old_count + 1,因此当一个线程执行这个函数的时候,可能只是执行到第一行,就被第二个线程调用counter.increment()函数,而此时两个线程同时+1,导致这两个+1操作发生在同一个count上,因此两次+1变成+1不是+2.因此虽然调用了counter.increment()200000次,但是最终的结果可能并不是+200000.

因此需要采用原子操作(atomic operation)。将这个counter.increment()设置为原子操作,其它线程必须等原子操作执行完毕可能执行。A对count+1后B才对其+1,而不会出现前面count=10002时,两个同时对其+1,结果是10003。

import threading
class Counter():
def __init__(self):
self.count = 0
self.lock = threading.Lock()
def increment(self):
self.lock.acquire()
old_count = self.count
self.count = old_count + 1
self.lock.release()
def get_count(self):
return self.count

def count_up_100000(counter):
for i in range(100000):
counter.increment()

def conduct_trial():
counter = Counter()
count_thread1 = threading.Thread(target=count_up_100000, args=[counter])
count_thread2 = threading.Thread(target=count_up_100000, args=[counter])

count_thread1.start()
count_thread2.start()

count_thread1.join()
count_thread2.join()

final_count = counter.get_count()
return final_count

trial1 = conduct_trial()
print(trial1)
trial2 = conduct_trial()
print(trial2)
trial3 = conduct_trial()
print(trial3)
'''
200000
200000
200000
'''
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: