您的位置:首页 > 其它

ZeroMQ-Poller

2017-07-24 19:42 155 查看
In this program, we will create a command server that tells when the worker should exit. Workers subscribes to a topic published by a publisher and prints it. It exits when it receives “Exit” message from the command server.
# zmqpolling.py

import zmq
import time
import sys
import random
from  multiprocessing import Process

# PUSH server that sends command to workers to continue working or exit.
def server_push(port="5556"):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:%s" % port)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
if reqnum < 6:
socket.send("Continue")
else:
socket.send("Exit")
break
time.sleep (1)

# Publisher that publishes for topics “8”,”9”,”10” in random order.
def server_pub(port="5558"):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)
publisher_id = random.randrange(0,9999)
print "Running server on port: ", port
# serves only 5 request and dies
for reqnum in range(10):
# Wait for next request from client
topic = random.randrange(8,10)
messagedata = "server#%s" % publisher_id
print "%s %s" % (topic, messagedata)
socket.send("%d %s" % (topic, messagedata))
time.sleep(1)

# Worker that works on messages received for topic “9”.
# We setup zmq poller to poll for messages on the socket connection to both command server and publisher.
def client(port_push, port_sub):
context = zmq.Context()
socket_pull = context.socket(zmq.PULL)
socket_pull.connect ("tcp://localhost:%s" % port_push)
print "Connected to server with port %s" % port_push
socket_sub = context.socket(zmq.SUB)
socket_sub.connect ("tcp://localhost:%s" % port_sub)
socket_sub.setsockopt(zmq.SUBSCRIBE, "9")
print "Connected to publisher with port %s" % port_sub
# Initialize poll set
poller = zmq.Poller()
poller.register(socket_pull, zmq.POLLIN)
poller.register(socket_sub, zmq.POLLIN)

# We poll the sockets to check if we have messages to recv and work on it. Worker continues working until it receives exit condition.
# Work on requests from both server and publisher
should_continue = True
while should_continue:
socks = dict(poller.poll())
if socket_pull in socks and socks[socket_pull] == zmq.POLLIN:
message = socket_pull.recv()
print "Recieved control command: %s" % message
if message == "Exit":
print "Recieved exit command, client will stop recieving messages"
should_continue = False

if socket_sub in socks and socks[socket_sub] == zmq.POLLIN:
string = socket_sub.recv()
topic, messagedata = string.split()
print "Processing ... ", topic, messagedata

# Finally, we fire up all the processes.
if __name__ == "__main__":
# Now we can run a few servers
server_push_port = "5556"
server_pub_port = "5558"
Process(target=server_push, args=(server_push_port,)).start()
Process(target=server_pub, args=(server_pub_port,)).start()
Process(target=client, args=(server_push_port,server_pub_port,)).start()
# result:

(D:\anaconda) C:\Users\admin\Desktop\opt>python zmqpolling.py
Running server on port:  5556
Running server on port:  5558
8 server#8364
Connected to server with port 5556
Connected to publisher with port 5558
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Continue
8 server#8364
Recieved control command: Continue
9 server#8364
Processing ...  9 server#8364
Recieved control command: Exit
Recieved exit command, client will stop recieving messages
8 server#8364
9 server#8364
9 server#8364
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ZeroMQ