ZeroMQ-Poller
2017-07-24 19:42
155 查看
In this program, we will create a command server that tells when the worker should exit. Workers subscribes to a topic published by a publisher and prints it. It exits when it receives “Exit” message from the command server.
# zmqpolling.py import zmq import time import sys import random from multiprocessing import Process # PUSH server that sends command to workers to continue working or exit. def server_push(port="5556"): context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:%s" % port) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): if reqnum < 6: socket.send("Continue") else: socket.send("Exit") break time.sleep (1) # Publisher that publishes for topics “8”,”9”,”10” in random order. def server_pub(port="5558"): context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) publisher_id = random.randrange(0,9999) print "Running server on port: ", port # serves only 5 request and dies for reqnum in range(10): # Wait for next request from client topic = random.randrange(8,10) messagedata = "server#%s" % publisher_id print "%s %s" % (topic, messagedata) socket.send("%d %s" % (topic, messagedata)) time.sleep(1) # Worker that works on messages received for topic “9”. # We setup zmq poller to poll for messages on the socket connection to both command server and publisher. def client(port_push, port_sub): context = zmq.Context() socket_pull = context.socket(zmq.PULL) socket_pull.connect ("tcp://localhost:%s" % port_push) print "Connected to server with port %s" % port_push socket_sub = context.socket(zmq.SUB) socket_sub.connect ("tcp://localhost:%s" % port_sub) socket_sub.setsockopt(zmq.SUBSCRIBE, "9") print "Connected to publisher with port %s" % port_sub # Initialize poll set poller = zmq.Poller() poller.register(socket_pull, zmq.POLLIN) poller.register(socket_sub, zmq.POLLIN) # We poll the sockets to check if we have messages to recv and work on it. Worker continues working until it receives exit condition. # Work on requests from both server and publisher should_continue = True while should_continue: socks = dict(poller.poll()) if socket_pull in socks and socks[socket_pull] == zmq.POLLIN: message = socket_pull.recv() print "Recieved control command: %s" % message if message == "Exit": print "Recieved exit command, client will stop recieving messages" should_continue = False if socket_sub in socks and socks[socket_sub] == zmq.POLLIN: string = socket_sub.recv() topic, messagedata = string.split() print "Processing ... ", topic, messagedata # Finally, we fire up all the processes. if __name__ == "__main__": # Now we can run a few servers server_push_port = "5556" server_pub_port = "5558" Process(target=server_push, args=(server_push_port,)).start() Process(target=server_pub, args=(server_pub_port,)).start() Process(target=client, args=(server_push_port,server_pub_port,)).start()
# result: (D:\anaconda) C:\Users\admin\Desktop\opt>python zmqpolling.py Running server on port: 5556 Running server on port: 5558 8 server#8364 Connected to server with port 5556 Connected to publisher with port 5558 Recieved control command: Continue 9 server#8364 Processing ... 9 server#8364 Recieved control command: Continue 8 server#8364 Recieved control command: Continue 8 server#8364 Recieved control command: Continue 9 server#8364 Processing ... 9 server#8364 Recieved control command: Continue 8 server#8364 Recieved control command: Continue 9 server#8364 Processing ... 9 server#8364 Recieved control command: Exit Recieved exit command, client will stop recieving messages 8 server#8364 9 server#8364 9 server#8364
相关文章推荐
- zeromq安装手册
- zeroMQ初体验-6.多模式数据来源处理方案(multi sockets)
- zeroMQ初体验-25.可靠性-偏执的海盗模式
- ZeroMQ全面介绍
- zeromq的主要特点.
- zeromq的几种模式
- ZeroMQ消息队列技术研究
- zeromq client-server 异步模式
- ZeroMQ 消息队列 及 jzmq (java binding) 安装及代码测试
- 高性能的通讯库-zeroMQ的几个高性能特征
- 云风开发笔记(3) Redis, Google Protobuffer, ZeroMQ
- [架构] ZeroMQ 深度探索(二)
- ubuntu下安装 ZeroMQ, JZMQ
- ZeroMQ 简介
- zeromq的内部架构
- ZeroMQ(JAVA)中的数据流,SessionBase与SocketBase
- 安装使用zeromq
- ZeroMQ(java)之Publish/Subscribe模式
- [原]zeromq框架测试报告
- zeromq的几种模式