您的位置:首页 > 其它

zeroMQ初体验-7.优雅的卸载工作进程

2011-10-10 15:23 141 查看
关掉一个进程有很多种方式,而在ZeroMQ中则推崇通过使用信号通知,可控的卸载、关闭进程。在这里,要援引之前的"分而治之"例子(具体可以见这里)。

例图:



显然,信号发送是由能够掌握整个进度的"水槽"(下游)来控制,在原有基础上做少许变更即可。

Worker(数据处理):

Python代码

import sys
import time
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.connect("tcp://localhost:5557")

sender = context.socket(zmq.PUSH)
sender.connect("tcp://localhost:5558")

controller = context.socket(zmq.SUB)
controller.connect("tcp://localhost:5559")
controller.setsockopt(zmq.SUBSCRIBE, "")

poller = zmq.Poller()
poller.register(receiver, zmq.POLLIN)
poller.register(controller, zmq.POLLIN)
while True:
socks = dict(poller.poll())

if socks.get(receiver) == zmq.POLLIN:
message = receiver.recv()

workload = int(message)  # Workload in msecs
time.sleep(workload / 1000.0)
sender.send(message)

sys.stdout.write(".")
sys.stdout.flush()

if socks.get(controller) == zmq.POLLIN:
break


水槽(下游):

Python代码
import sys
import time
import zmq

context = zmq.Context()

receiver = context.socket(zmq.PULL)
receiver.bind("tcp://*:5558")

controller = context.socket(zmq.PUB)
controller.bind("tcp://*:5559")

receiver.recv()

tstart = time.time()

for task_nbr in xrange(100):
receiver.recv()
if task_nbr % 10 == 0:
sys.stdout.write(":")
else:
sys.stdout.write(".")
sys.stdout.flush()

tend = time.time()
tdiff = tend - tstart
total_msec = tdiff * 1000
print "Total elapsed time: %d msec" % total_msec

controller.send("KILL")
time.sleep(1)


注意:

正常情况下,即使进程被关闭,可能端口并没有被清除(那是有ZeroMQ维护的),原文中调用了这么两句

zmq_close (server)

zmq_term (context)

python中对应为zmq.close(),zmq.term(),不过python的垃圾回收会替俺们解决后顾之忧的~
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: