您的位置:首页 > 编程语言 > Python开发

multiprocessing在python中的高级应用-IPC 之 Pipe

2015-08-30 13:58 721 查看
作为使用队列的另一种形式,还可以使用管道在进程回见执行消息传递。

Pipe( [ duplex])

在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1和conn2是表示管道两端的Connection对象。默认情况下,管道是双向的。如果将duplex置为False,conn1只能用于接收,而conn2只能用于发送。必须在创建和启动使用管道的Process对象之前调用Pipe()方法。

Pipe()方法返回的Connection对象的实例c具有以下方法和属性。

c.close()

关闭连接。如果c被垃圾收集,将自动调用此方法。

c.fileno()

返回连接使用的证书文件描述符。

c.poll( [timeout] )

如果连接上的数据可用,返回True.timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout置为None,操作将无限期地等待数据到达。

c.recv()

接收c.send()方法返回的对象。如果连接的另一端已经关闭,再也不存在任何数据,将引发EOFError异常。

c.recv_bytes( [maxlength] )

接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息超过了这个最大数,将引发IOEError异常。

c.recv_bytes_into(buffer [,offset] )

接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区放置消息处的字节位移。返回值是接收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferToolShoot异常。

c.send(obj)

通过连接发送对象。obj是与序列化兼容的任意对象。

c.send_bytes(buffer [,offset [,size] ] ) )

通过连接发送字节数据缓冲区。buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收。

可以通过与队列类似的方式使用管道。下面这个例子说明如何使用管道实现前面的生产者-使用者问题:

import multiprocessing

def consumer(pipe):
    output_p,input_p=pipe
    input_p.close() #关闭管道的输入端
    while True:
        try:
            item=output_p.recv()
        except EOFError:
            break
        #处理项目
        print item #可替换有用的工作
        #关闭
        print "Consumer close"

#生产项目并将其放置到队列上,sequence是代表要处理项目的可迭代对象
def producer(sequence,input_p):
    for item in sequence:
        #将项目放置在队列上
        input_p.send(item)
if __name__=="__main__":
    (output_p,input_p)=multiprocessing.Pipe()
    #启动使用者进程
    cons_p=multiprocessing.Process(target=consumer,args=((output_p,input_p),))
    cons_p.start()

    #关闭生产者中的输出管道
    output_p.close()
    #生产项目
    sequence=[1,2,3,4]
    producer(sequence,input_p)
    #关闭输入管道,表示完成
    input_p.close()
    #等待使用者进程关闭
    cons_p.join()


应该特别注意管道端点的正确管理问题。如果是生产者或消费者中都没有使用管道的某个端点,就应将它关闭。这也说明了为何在生产者中关闭了管道的输出端,在消费者中关闭管道的输入端。如果忘记执行这些步骤,程序可能在消费者中的recv()操作上挂起。管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。因此,在生产者中关闭管道不会有任何效果,除非消费者也关闭了相同的管道端点。

管道可用于双向通信。利用通常在客户端/服务器计算中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序,例如:

import multiprocessing

def adder(pipe):
    server_p,client_p=pipe
    client_p.close()
    while True:
        try:
            x,y=server_p.recv()
        except EOFError:
            break
        result=x+y
        server_p.send(result)
    #关闭
        print "server done"
if __name__=="__main__":
    (server_p,client_p)=multiprocessing.Pipe()
    #启动服务器进程
    adder_p=multiprocessing.Process(target=adder,args=((server_p,client_p),))
    adder_p.start()
    #关闭客户端中的服务器管道
    server_p.close()
    #在服务器上提出一些请求
    client_p.send((3,4))
    print client_p.recv()

    client_p.send(("hello","world"))
    print client_p.recv()
    #完成,关闭管道
    client_p.close()
    #等待消费者进程关闭
    adder_p.join()


在这个例子中,adder()函数以服务器的形式运行,等待消息到达管道的端点。收到之后,它会执行一些处理并将结果发送回给管道。要记住,send()和recv()方法使用pickle模块对对象进行序列化。在本例中,服务器接收到原则(x,y)并将其作为输入,然后返回结果x+y。但对于使用远程调用的高级应用程序而言,应该使用下一博客描述的进程池。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: