Python并行处理——多线程
2016-04-30 11:09
507 查看
Parallel Processing
并行处理利用多个CPU在同一时间处理多条指令,这是个很有用的技术,但是它带来的问题也是很多的,比如多个处理器怎么共享数据?这个会造成很多读写冲突,因此需要学会管理这些因素,写出更强大的代码来进行快速并有意义的数据分析。Mutability
在python中,有些变量像intergers,是不可变的。这意味着他们的值是不可变的。def add1(x): x = x + 1 x = 5 add1(x) print(x) ''' 5 '''
但是像字典和列表是可变的,可以代表一些会改变的信息,并行处理中可变变量是很有用的,因为我们并行处理中会共享变量,而且几个处理器会编辑同一个变量。下面创建一个Counter类,看看变量是如何变化的:
class Counter(): def __init__(self): self.count = 0 def increment(self): self.count += 1 def get_count(self): return self.count def count_up_100000(counter): for i in range(100000): counter.increment() counter = Counter() initial_count = counter.get_count() count_up_100000(counter) final_count = counter.get_count() print(final_count) ''' 100000 '''
Multithreading
上面那个例子中发现counter这个实例的count值已经变为100000了,我们发现在count_up_100000这个函数中,一直在调用Counter的一个方法,对counter的count值进行+1,这个操作实际上可以并行处理,假设我们有100000个CPU,那么我们一次就可以运行所有循环,这就是多线程(multithreading)。一个线程(thread)就是一条执行路径。在Python中我们可以利用threading.Thread()来创建新的线程,作为一个独立的过程来执行一个给定的函数,比如上一个例子中,我们创建新的线程来执行count_up_100000这个函数,启动一个线程的方法是thread.start()。当这个线程结束执行后才会执行thread.join(),将其join到主线程中。
import threading counter = Counter() count_thread = threading.Thread(target=count_up_100000, args=[counter]) count_thread.start() count_thread.join() after_join = counter.get_count() print(after_join) ''' 100000 '''
Determinism
大部分单线程操作的结果是确定的(deterministic ),因为我们是一步一步执行程序,结果是可以预测的。现在考虑一个情况,你让你的一个朋友来数数,数到100000,你是主线程,创建了这样一个数数线程,然后当它数完就会告诉你,这类似与join主线程。但是在他还没有数完的过程中,count的结果是不确定的(nondeterministic)。import threading def conduct_trial(): counter = Counter() count_thread = threading.Thread(target=count_up_100000, args=[counter]) count_thread.start() intermediate_value = counter.get_count() count_thread.join() return intermediate_value trial1 = conduct_trial() print(trial1) trial2 = conduct_trial() print(trial2) trial3 = conduct_trial() print(trial3) ''' 24145 24762 24881 '''
在还未join时,也就是线程还未结束时,此时的count的值是不确定的。
Enforcing Determinism
互斥锁同步线程同步能够保证多个线程安全访问竞争资源,最简单的同步机制是引入互斥锁。互斥锁为资源引入一个状态:锁定/非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为“锁定”,其他线程不能更改;直到该线程释放资源,将资源的状态变成“非锁定”,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。其中,锁定方法acquire可以有一个超时时间的可选参数timeout。如果设定了timeout,则在超时后通过返回值可以判断是否得到了锁,从而可以进行一些其他的处理。
同步阻塞
当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“同步阻塞”。直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
import threading def count_up_100000(counter, lock): for i in range(10000): lock.acquire() for i in range(10): counter.increment() lock.release() def conduct_trial(): counter = Counter() lock = threading.Lock() count_thread = threading.Thread(target=count_up_100000, args=[counter, lock]) count_thread.start() lock.acquire() intermediate_value = counter.get_count() lock.release() count_thread.join() return intermediate_value trial1 = conduct_trial() print(trial1) trial2 = conduct_trial() print(trial2) trial3 = conduct_trial() print(trial3) ''' 20940 21100 21030 '''
上面这段代码将+10锁起来,也就是获取的值都是10的倍数,只有+10了这个线程才可能释放,因此无论何时访问count,都是10的倍数。
Counting Twice
def count_up_100000(counter): for i in range(100000): counter.increment() counter = Counter() def count_up_100000(counter): for i in range(100000): counter.increment() counter = Counter() count_up_100000(counter) count_up_100000(counter) final_count = counter.get_count() print(final_count) ''' 200000 '''
Splitting Our Count Into Two Threads
为了完成上面那个功能,我们其实可以引入两个线程,每个计数100000,然后和就是200000.def count_up_100000(counter): for i in range(100000): counter.increment() def conduct_trial(): counter = Counter() count_thread1 = threading.Thread(target=count_up_100000, args=[counter]) count_thread2 = threading.Thread(target=count_up_100000, args=[counter]) count_thread1.start() count_thread2.start() count_thread1.join() count_thread2.join() final_count = counter.get_count() return final_count trial1 = conduct_trial() print(trial1) trial2 = conduct_trial() print(trial2) trial3 = conduct_trial() print(trial3) ''' 200000 200000 134250 '''
Atomicity
在上一个操作中我们发现结果是不确定的,counter.increment()函数内部self.count += 1实际包含两行代码: old_count = self.count以及self.count = old_count + 1,因此当一个线程执行这个函数的时候,可能只是执行到第一行,就被第二个线程调用counter.increment()函数,而此时两个线程同时+1,导致这两个+1操作发生在同一个count上,因此两次+1变成+1不是+2.因此虽然调用了counter.increment()200000次,但是最终的结果可能并不是+200000.因此需要采用原子操作(atomic operation)。将这个counter.increment()设置为原子操作,其它线程必须等原子操作执行完毕可能执行。A对count+1后B才对其+1,而不会出现前面count=10002时,两个同时对其+1,结果是10003。
import threading class Counter(): def __init__(self): self.count = 0 self.lock = threading.Lock() def increment(self): self.lock.acquire() old_count = self.count self.count = old_count + 1 self.lock.release() def get_count(self): return self.count def count_up_100000(counter): for i in range(100000): counter.increment() def conduct_trial(): counter = Counter() count_thread1 = threading.Thread(target=count_up_100000, args=[counter]) count_thread2 = threading.Thread(target=count_up_100000, args=[counter]) count_thread1.start() count_thread2.start() count_thread1.join() count_thread2.join() final_count = counter.get_count() return final_count trial1 = conduct_trial() print(trial1) trial2 = conduct_trial() print(trial2) trial3 = conduct_trial() print(trial3) ''' 200000 200000 200000 '''
相关文章推荐
- IronPython 设置包路径
- EM算法--python代码和注意事项
- python_正则表达式(一)基础标识符
- Python数据存储
- Python变量及数据类型
- python本地文件
- python 导入自定义模块及使用
- 【工具使用系列】Python FAQs: Spyder(winPython, Python(x,y), Anaconda)下,如何使用ipython --pylab!
- python面向对象的3个特点
- Python多线程下的_strptime问题
- Python多线程下的_strptime问题
- Python调用C库
- Python调用C库
- Python按位运算符,字典,运算符优先级的基本总结
- Python - 字母算术谜题
- Python str内部功能介绍
- python 多线程 join 的 细节问题 注意使用事项
- Python开发环境配置
- 如何在MAC 指定文件夹打开终端(terminal)
- python 函数超时装饰器