您的位置:首页 > 编程语言 > Python开发

Python多线程/进程:os、sys、Queue、multiprocessing、threading

2015-12-08 23:48 856 查看
当涉及到操作系统的时候,免不了要使用os模块,有时还要用到sys模块。

设计到并行程序,一般开单独的进程,而不是线程,原因是python解释器的全局解释器锁GIL(global interpreter lock),本文最后会讲到。使用进程可以实现完全并行,无GIL的限制,可充分利用多cpu多核的环境。

os/sys模块

1、os模块

os.system() 函数可以启动一个进程,执行完之后返回状态码。

os.fork() 复制一个进程,如果是子进程返回0,如果是父进程返回子进程的pid,使用这个函数的时候,建议你学习一下linux编程的知识。

os.popen 以管道的方式创建进程。

os.spawnl 也可以创建进程,并能指定环境变量。

os.kill(pid, sig) 关闭一个进程,pid是进程号,sig是信号。与fork配合使用,例如你刚才用fork创建了一个子进程,它的pid是11990, 那么调用

os.kill( 11990, signal.CTRL_BREAK_EVENT)

就以ctrl+c的方式杀死了这个进程。

os.wait() -> (pid, status)找到任一个僵死子进程,或者等待任一个子进程的SIGCHLD信号

os.waitpid(pid, options) -> (pid, status) 等待给定进程结束

除了利用os模块实现多进程和通信,还有一个模块multiprocessing封装了很多创建进程和进程间通信的操作,发挥多核的威力。

附:

文件操作也是与操作系统相关的操作,也被封装在os模块。

文件操作的详细内容见 /article/5983186.html

2、sys模块

同样是一个与系统相关的模块,它们都表示了程序运行的上下文环境。但是与os模块不同的是,os模块主要封装系统操作,sys模块主要封装系统中的各种环境参数。

比如文件操作、进程线程操作封装在os模块中;

标准输入输出stdout/stdin、命令行参数argv、环境变量path、平台platform等参数封装在sys模块中;

不过sys中也含有一些进程操作,比如sys.exit(n)和sys.exit('Unable to create first child.')

多进程multiprocessing

multiprocessing模块的内容:

multiprocessing.Process(target=run)类的实例表示一个进程,具有字段 pid,方法 start() join()等

multiprocessing.Pool(processes=4) 类的实例表示一个进程池

multiprocessing.Lock类的实例表示一个锁,具有acquire()和release() 方法

multiprocessing.Semaphore(2) 信号量类的实例表示一个信号量,可以指定初始值,具有 acquire() 和 release() 方法

multiprocessing.Event() 表示一个信号,用于实现多进程等待某一个进程的情况

进程间要实现通信,除了锁、信号量、事件,还有队列multiprocessing.Queue。

import multiprocessing

def writer_proc(q):
try:
q.put(1, block = False)
except:
pass

def reader_proc(q):
try:
print q.get(block = False)
except:
pass

if __name__ == "__main__":
q = multiprocessing.Queue()
writer = multiprocessing.Process(target=writer_proc, args=(q,))
writer.start()

reader = multiprocessing.Process(target=reader_proc, args=(q,))
reader.start()

reader.join()
writer.join()


multiprocessing.Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。

get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。

进程同步互斥实例:

#coding:utf-8
import threading, time

#最简单的启动线程的方式
def sayHi():
time.sleep(1)
print 'Hi, linuxapp'

th=threading.Thread(target=sayHi)
th.start()
th.join()

# 使用threading.Thread(),设置线程类实例的target属性,表示一个线程
def T():
print threading.current_thread().getName()

t1 = threading.Thread(target=T, name='tt11')
t1.start()
t1.join()

# 通过threading.Thread类的子类实例表示线程,注意父类构造方法__init__不能省略
class T2(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
print "in run() of T2  " + threading.current_thread().getName()

# threading.Lock类的实例表示一个互斥锁,一个资源被加锁后,其他线程不能访问
class T3(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.counter = 0;
self.mutex = threading.Lock()

def run(self):
time.sleep(1)
if self.mutex.acquire():
self.counter += 1
print self.counter
self.mutex.release()

# 如果同一个线程需要多次获得资源,如果不使用 mutex = threading.RLock() ,就会死锁
class T4(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.counter = 0
self.mutex = threading.Lock()

def run(self):
time.sleep(1)
if self.mutex.acquire():
self.counter += 1
if self.mutex.acquire():
self.counter += 1
self.mutex.release()
self.mutex.release()

def main():
t = T3()
t.start()

if __name__ == '__main__':
main()

# threading.Condition类的实例表示一个条件变量,相当于多功能信号量
condition = threading.Condition()
products = 0

class Producer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
global condition, products
while True:
if condition.acquire():
if products < 10:
products += 1;
print "Producer(%s):deliver one, now products:%s" %(self.name, products)
condition.notify()
else:
print "Producer(%s):already 10, stop deliver, now products:%s" %(self.name, products)
condition.wait();
condition.release()
time.sleep(2)

class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
global condition, products
while True:
if condition.acquire():
if products > 1:
products -= 1
print "Consumer(%s):consume one, now products:%s" %(self.name, products)
condition.notify()
else:
print "Consumer(%s):only 1, stop consume, products:%s" %(self.name, products)
condition.wait();
condition.release()
time.sleep(2)

if __name__ == "__main__":
for p in range(0, 2):
p = Producer()
p.start()

for c in range(0, 10):
c = Consumer()
c.start()

# threading.Event类的实例表示一个信号,如果信号signal为true,那么等待这个signal的所有线程都将可以运行
class MyThread(threading.Thread):
def __init__(self, signal):
threading.Thread.__init__(self)
self.singal = signal

def run(self):
print "I am %s,I will sleep ..."%self.name
# 进入等待状态
self.singal.wait()
print "I am %s, I awake..." %self.name

if __name__ == "__main__":
# 初始 为 False
singal = threading.Event()
for t in range(0, 3):
thread = MyThread(singal)
thread.start()

print "main thread sleep 3 seconds... "
time.sleep(3)
# 唤醒含有signal, 处于等待状态的线程
singal.set()


View Code

python多线程的限制
python多线程有个讨厌的限制,全局解释器锁(global interpreter lock),这个锁的意思是任一时间只能有一个线程使用解释器,跟单cpu跑多个程序也是一个意思,大家都是轮着用的,这叫“并发”,不是“并行”。手册上的解释是为了保证对象模型的正确性!这个锁造成的困扰是如果有一个计算密集型的线程占着cpu,其他的线程都得等着,试想你的多个线程中有这么一个线程,多线程生生被搞成串行;当然这个模块也不是毫无用处,手册上又说了:当用于IO密集型任务时,IO期间线程会释放解释器,这样别的线程就有机会使用解释器了!所以是否使用这个模块需要考虑面对的任务类型。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: