您的位置:首页 > 编程语言 > Python开发

Python多进程

2011-03-25 23:07 381 查看
在以前只是接触过PYTHON的多线程机制,今天搜了一下多进程,相关文章好像不是特别多。看了几篇,小试了一把。程序如下,主要内容就是通过PRODUCER读一个本地文件,一行一行的放到队列中去。然后会有相应的WORKER从队列中取出这些行。

import multiprocessing

import os

import sys

import Queue

import time

def writeQ(q,obj):

        q.put(obj,True,None)

        print "put size: ",q.qsize()

def readQ(q):

        ret = q.get(True,1)

        print "get size: ",q.qsize()

        return ret

def producer(q):

        time.sleep(5)   #让进行休息几秒 方便ps命令看到相关内容

        pid = os.getpid()

        handle_file = '/home/dwapp/joe.wangh/test/multiprocess/datafile'

        with open(handle_file,'r') as f:      #with...as... 这个用法今天也是第一次看到的

                for line in f:

                        print "producer <" ,pid , "> is doing: ",line

                        writeQ(q,line.strip())

        q.close()

def worker(q):

        time.sleep(5)   #让进行休息几秒 方便ps命令看到相关内容

        pid = os.getpid()

        empty_count = 0

        while True:

                try:

                        task = readQ(q)

                        print "worker <" , pid , "> is doing: " ,task

                        '''

                        如果这里不休眠的话 一般情况下所有行都会被同一个子进程读取到 为了使实验效果更加清楚 在这里让每个进程读取完

一行内容时候休眠5s 这样就可以让其他的进程到队列中进行读取

                        '''

                        time.sleep(5)   

                except Queue.Empty:

                        empty_count += 1

                        if empty_count == 3:

                                print "queue is empty, quit"

                                q.close()

                                sys.exit(0)

def main():

        concurrence = 3

        q = multiprocessing.Queue(10)

        funcs = [producer , worker]

        for i in range(concurrence-1):

                funcs.append(worker)

        for item in funcs:

                print str(item)

        nfuncs = range( len(funcs) )

        processes = []        

        for i in nfuncs:

                p = multiprocessing.Process(target=funcs[i] , args=(q,))

                processes.append(p)

        print "concurrence worker is : ",concurrence," working start"

        for i in nfuncs:

                processes[i].start()

        for i in nfuncs:

                processes[i].join()

        print "all DONE"

if __name__ == '__main__':

        main()

 

实验结果如下

dwapp@pttest1:/home/dwapp/joe.wangh/test/multiprocess>python 1.py 

<function producer at 0xb7b9141c>

<function worker at 0xb7b91454>

<function worker at 0xb7b91454>

<function worker at 0xb7b91454>

concurrence worker is :  3  working start

producer < 28320 > is doing:  line 1

put size:  1

producer < 28320 > is doing:  line 2

put size:  2

producer < 28320 > is doing:  line 3

put size:  3

producer < 28320 > is doing:  line 4

put size:  3

producer < 28320 > is doing:  line 5

get size:  3

put size:  4

worker < 28321 > is doing:  line 1

get size:  3

worker < 28322 > is doing:  line 2

get size:  2

worker < 28323 > is doing:  line 3

get size:  1

worker < 28321 > is doing:  line 4

get size:  0

worker < 28322 > is doing:  line 5

queue is empty, quit

queue is empty, quit

queue is empty, quit

all DONE

 

程序运行期间在另外一个窗口进行ps命令 可以观测到一些进程的信息

dwapp@pttest1:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python

dwapp    13735 11830  0 Nov20 pts/12   00:00:05 python

dwapp    28319 27481  8 14:04 pts/0    00:00:00 python 1.py

dwapp    28320 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28321 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28322 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28323 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28325 27849  0 14:04 pts/13   00:00:00 grep python

dwapp@pttest1:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python

dwapp    13735 11830  0 Nov20 pts/12   00:00:05 python         #此时28320进程 也就是PRODUCER进程已经结束

dwapp    28319 27481  1 14:04 pts/0    00:00:00 python 1.py

dwapp    28321 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28322 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28323 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28328 27849  0 14:04 pts/13   00:00:00 grep python

dwapp@pttest1:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python

dwapp    13735 11830  0 Nov20 pts/12   00:00:05 python

dwapp    28319 27481  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28321 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28322 28319  0 14:04 pts/0    00:00:00 python 1.py

dwapp    28323 28319  0 14:04 pts/0    00:00:00 [python] <defunct>   #这里应该是代表28323进程(WORKER)已经运行结束了

dwapp    28331 27849  0 14:04 pts/13   00:00:00 grep python

dwapp@pttest1:/home/dwapp/joe.wangh/test/multiprocess>ps -ef | grep python

dwapp    13735 11830  0 Nov20 pts/12   00:00:05 python

dwapp    28337 27849  0 14:05 pts/13   00:00:00 grep python
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息