zeroMQ初体验-18.应答模式进阶(四)-定制路由3
2011-10-10 15:30
591 查看
从经典到超越经典。
首先,先回顾下经典:
然后,扩展:
然后,变异:
Python代码
import threading
import time
import zmq
NBR_CLIENTS = 10
NBR_WORKERS = 3
def worker_thread(worker_url, context, i):
""" Worker using REQ socket to do LRU routing """
socket = context.socket(zmq.REQ)
identity = "Worker-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #set worker identity
socket.connect(worker_url)
# Tell the borker we are ready for work
socket.send("READY")
try:
while True:
# python binding seems to eat empty frames
address = socket.recv()
request = socket.recv()
print("%s: %s\n" %(identity, request))
socket.send(address, zmq.SNDMORE)
socket.send("", zmq.SNDMORE)
socket.send("OK")
except zmq.ZMQError, zerr:
# context terminated so quit silently
if zerr.strerror == 'Context was terminated':
return
else:
raise zerr
def client_thread(client_url, context, i):
""" Basic request-reply client using REQ socket """
socket = context.socket(zmq.REQ)
identity = "Client-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier
socket.connect(client_url)
# Send request, get reply
socket.send("HELLO")
reply = socket.recv()
print("%s: %s\n" % (identity, reply))
return
def main():
""" main method """
url_worker = "inproc://workers"
url_client = "inproc://clients"
client_nbr = NBR_CLIENTS
# Prepare our context and sockets
context = zmq.Context(1)
frontend = context.socket(zmq.XREP)
frontend.bind(url_client)
backend = context.socket(zmq.XREP)
backend.bind(url_worker)
# create workers and clients threads
for i in range(NBR_WORKERS):
thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, ))
thread.start()
for i in range(NBR_CLIENTS):
thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, ))
thread_c.start()
# Logic of LRU loop
# - Poll backend always, frontend only if 1+ worker ready
# - If worker replies, queue worker as ready and forward reply
# to client if necessary
# - If client requests, pop next worker and send request to it
# Queue of available workers
available_workers = 0
workers_list = []
# init poller
poller = zmq.Poller()
# Always poll for worker activity on backend
poller.register(backend, zmq.POLLIN)
# Poll front-end only if we have available workers
poller.register(frontend, zmq.POLLIN)
while True:
socks = dict(poller.poll())
# Handle worker activity on backend
if (backend in socks and socks[backend] == zmq.POLLIN):
# Queue worker address for LRU routing
worker_addr = backend.recv()
assert available_workers < NBR_WORKERS
# add worker back to the list of workers
available_workers += 1
workers_list.append(worker_addr)
# Second frame is empty
empty = backend.recv()
assert empty == ""
# Third frame is READY or else a client reply address
client_addr = backend.recv()
# If client reply, send rest back to frontend
if client_addr != "READY":
# Following frame is empty
empty = backend.recv()
assert empty == ""
reply = backend.recv()
frontend.send(client_addr, zmq.SNDMORE)
frontend.send("", zmq.SNDMORE)
frontend.send(reply)
client_nbr -= 1
if client_nbr == 0:
break # Exit after N messages
# poll on frontend only if workers are available
if available_workers > 0:
if (frontend in socks and socks[frontend] == zmq.POLLIN):
# Now get next client request, route to LRU worker
# Client request is [address][empty][request]
client_addr = frontend.recv()
empty = frontend.recv()
assert empty == ""
request = frontend.recv()
# Dequeue and drop the next worker address
available_workers -= 1
worker_id = workers_list.pop()
backend.send(worker_id, zmq.SNDMORE)
backend.send("", zmq.SNDMORE)
backend.send(client_addr, zmq.SNDMORE)
backend.send(request)
#out of infinite loop: do some housekeeping
time.sleep (1)
frontend.close()
backend.close()
context.term()
if name == "main":
main()
client发出的数据结构:
路由处理成:
再转给worker成:
工人处理的数据:
由worker到client是一个逆序过程,不过因为两边都是REQ类型,所以其实是一致的。
[补]:
通常,上层的api会帮我们做一些事,免去了逐步封装数据的麻烦,比如在python中,最终代码会是这个样子:
Python代码
import threading
import time
import zmq
NBR_CLIENTS = 10
NBR_WORKERS = 3
def worker_thread(worker_url, context, i):
""" Worker using REQ socket to do LRU routing """
socket = context.socket(zmq.REQ)
identity = "Worker-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #set worker identity
socket.connect(worker_url)
# Tell the borker we are ready for work
socket.send("READY")
try:
while True:
[address, request] = socket.recv_multipart()
print("%s: %s\n" %(identity, request))
socket.send_multipart([address, "", "OK"])
except zmq.ZMQError, zerr:
# context terminated so quit silently
if zerr.strerror == 'Context was terminated':
return
else:
raise zerr
def client_thread(client_url, context, i):
""" Basic request-reply client using REQ socket """
socket = context.socket(zmq.REQ)
identity = "Client-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier
socket.connect(client_url)
# Send request, get reply
socket.send("HELLO")
reply = socket.recv()
print("%s: %s\n" % (identity, reply))
return
def main():
""" main method """
url_worker = "inproc://workers"
url_client = "inproc://clients"
client_nbr = NBR_CLIENTS
# Prepare our context and sockets
context = zmq.Context(1)
frontend = context.socket(zmq.XREP)
frontend.bind(url_client)
backend = context.socket(zmq.XREP)
backend.bind(url_worker)
# create workers and clients threads
for i in range(NBR_WORKERS):
thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, ))
thread.start()
for i in range(NBR_CLIENTS):
thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, ))
thread_c.start()
# Logic of LRU loop
# - Poll backend always, frontend only if 1+ worker ready
# - If worker replies, queue worker as ready and forward reply
# to client if necessary
# - If client requests, pop next worker and send request to it
# Queue of available workers
available_workers = 0
workers_list = []
# init poller
poller = zmq.Poller()
# Always poll for worker activity on backend
poller.register(backend, zmq.POLLIN)
# Poll front-end only if we have available workers
poller.register(frontend, zmq.POLLIN)
while True:
socks = dict(poller.poll())
# Handle worker activity on backend
if (backend in socks and socks[backend] == zmq.POLLIN):
# Queue worker address for LRU routing
message = backend.recv_multipart()
assert available_workers < NBR_WORKERS
worker_addr = message[0]
# add worker back to the list of workers
available_workers += 1
workers_list.append(worker_addr)
# Second frame is empty
empty = message[1]
assert empty == ""
# Third frame is READY or else a client reply address
client_addr = message[2]
# If client reply, send rest back to frontend
if client_addr != "READY":
# Following frame is empty
empty = message[3]
assert empty == ""
reply = message[4]
frontend.send_multipart([client_addr, "", reply])
client_nbr -= 1
if client_nbr == 0:
break # Exit after N messages
# poll on frontend only if workers are available
if available_workers > 0:
if (frontend in socks and socks[frontend] == zmq.POLLIN):
# Now get next client request, route to LRU worker
# Client request is [address][empty][request]
[client_addr, empty, request ] = frontend.recv_multipart()
assert empty == ""
# Dequeue and drop the next worker address
available_workers -= 1
worker_id = workers_list.pop()
backend.send_multipart([worker_id, "", client_addr, request])
#out of infinite loop: do some housekeeping
time.sleep (1)
frontend.close()
backend.close()
context.term()
if name == "main":
main()
首先,先回顾下经典:
然后,扩展:
然后,变异:
Python代码
import threading
import time
import zmq
NBR_CLIENTS = 10
NBR_WORKERS = 3
def worker_thread(worker_url, context, i):
""" Worker using REQ socket to do LRU routing """
socket = context.socket(zmq.REQ)
identity = "Worker-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #set worker identity
socket.connect(worker_url)
# Tell the borker we are ready for work
socket.send("READY")
try:
while True:
# python binding seems to eat empty frames
address = socket.recv()
request = socket.recv()
print("%s: %s\n" %(identity, request))
socket.send(address, zmq.SNDMORE)
socket.send("", zmq.SNDMORE)
socket.send("OK")
except zmq.ZMQError, zerr:
# context terminated so quit silently
if zerr.strerror == 'Context was terminated':
return
else:
raise zerr
def client_thread(client_url, context, i):
""" Basic request-reply client using REQ socket """
socket = context.socket(zmq.REQ)
identity = "Client-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier
socket.connect(client_url)
# Send request, get reply
socket.send("HELLO")
reply = socket.recv()
print("%s: %s\n" % (identity, reply))
return
def main():
""" main method """
url_worker = "inproc://workers"
url_client = "inproc://clients"
client_nbr = NBR_CLIENTS
# Prepare our context and sockets
context = zmq.Context(1)
frontend = context.socket(zmq.XREP)
frontend.bind(url_client)
backend = context.socket(zmq.XREP)
backend.bind(url_worker)
# create workers and clients threads
for i in range(NBR_WORKERS):
thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, ))
thread.start()
for i in range(NBR_CLIENTS):
thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, ))
thread_c.start()
# Logic of LRU loop
# - Poll backend always, frontend only if 1+ worker ready
# - If worker replies, queue worker as ready and forward reply
# to client if necessary
# - If client requests, pop next worker and send request to it
# Queue of available workers
available_workers = 0
workers_list = []
# init poller
poller = zmq.Poller()
# Always poll for worker activity on backend
poller.register(backend, zmq.POLLIN)
# Poll front-end only if we have available workers
poller.register(frontend, zmq.POLLIN)
while True:
socks = dict(poller.poll())
# Handle worker activity on backend
if (backend in socks and socks[backend] == zmq.POLLIN):
# Queue worker address for LRU routing
worker_addr = backend.recv()
assert available_workers < NBR_WORKERS
# add worker back to the list of workers
available_workers += 1
workers_list.append(worker_addr)
# Second frame is empty
empty = backend.recv()
assert empty == ""
# Third frame is READY or else a client reply address
client_addr = backend.recv()
# If client reply, send rest back to frontend
if client_addr != "READY":
# Following frame is empty
empty = backend.recv()
assert empty == ""
reply = backend.recv()
frontend.send(client_addr, zmq.SNDMORE)
frontend.send("", zmq.SNDMORE)
frontend.send(reply)
client_nbr -= 1
if client_nbr == 0:
break # Exit after N messages
# poll on frontend only if workers are available
if available_workers > 0:
if (frontend in socks and socks[frontend] == zmq.POLLIN):
# Now get next client request, route to LRU worker
# Client request is [address][empty][request]
client_addr = frontend.recv()
empty = frontend.recv()
assert empty == ""
request = frontend.recv()
# Dequeue and drop the next worker address
available_workers -= 1
worker_id = workers_list.pop()
backend.send(worker_id, zmq.SNDMORE)
backend.send("", zmq.SNDMORE)
backend.send(client_addr, zmq.SNDMORE)
backend.send(request)
#out of infinite loop: do some housekeeping
time.sleep (1)
frontend.close()
backend.close()
context.term()
if name == "main":
main()
import threading import time import zmq NBR_CLIENTS = 10 NBR_WORKERS = 3 def worker_thread(worker_url, context, i): """ Worker using REQ socket to do LRU routing """ socket = context.socket(zmq.REQ) identity = "Worker-%d" % (i) socket.setsockopt(zmq.IDENTITY, identity) #set worker identity socket.connect(worker_url) # Tell the borker we are ready for work socket.send("READY") try: while True: # python binding seems to eat empty frames address = socket.recv() request = socket.recv() print("%s: %s\n" %(identity, request)) socket.send(address, zmq.SNDMORE) socket.send("", zmq.SNDMORE) socket.send("OK") except zmq.ZMQError, zerr: # context terminated so quit silently if zerr.strerror == 'Context was terminated': return else: raise zerr def client_thread(client_url, context, i): """ Basic request-reply client using REQ socket """ socket = context.socket(zmq.REQ) identity = "Client-%d" % (i) socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier socket.connect(client_url) # Send request, get reply socket.send("HELLO") reply = socket.recv() print("%s: %s\n" % (identity, reply)) return def main(): """ main method """ url_worker = "inproc://workers" url_client = "inproc://clients" client_nbr = NBR_CLIENTS # Prepare our context and sockets context = zmq.Context(1) frontend = context.socket(zmq.XREP) frontend.bind(url_client) backend = context.socket(zmq.XREP) backend.bind(url_worker) # create workers and clients threads for i in range(NBR_WORKERS): thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, )) thread.start() for i in range(NBR_CLIENTS): thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, )) thread_c.start() # Logic of LRU loop # - Poll backend always, frontend only if 1+ worker ready # - If worker replies, queue worker as ready and forward reply # to client if necessary # - If client requests, pop next worker and send request to it # Queue of available workers available_workers = 0 workers_list = [] # init poller poller = zmq.Poller() # Always poll for worker activity on backend poller.register(backend, zmq.POLLIN) # Poll front-end only if we have available workers poller.register(frontend, zmq.POLLIN) while True: socks = dict(poller.poll()) # Handle worker activity on backend if (backend in socks and socks[backend] == zmq.POLLIN): # Queue worker address for LRU routing worker_addr = backend.recv() assert available_workers < NBR_WORKERS # add worker back to the list of workers available_workers += 1 workers_list.append(worker_addr) # Second frame is empty empty = backend.recv() assert empty == "" # Third frame is READY or else a client reply address client_addr = backend.recv() # If client reply, send rest back to frontend if client_addr != "READY": # Following frame is empty empty = backend.recv() assert empty == "" reply = backend.recv() frontend.send(client_addr, zmq.SNDMORE) frontend.send("", zmq.SNDMORE) frontend.send(reply) client_nbr -= 1 if client_nbr == 0: break # Exit after N messages # poll on frontend only if workers are available if available_workers > 0: if (frontend in socks and socks[frontend] == zmq.POLLIN): # Now get next client request, route to LRU worker # Client request is [address][empty][request] client_addr = frontend.recv() empty = frontend.recv() assert empty == "" request = frontend.recv() # Dequeue and drop the next worker address available_workers -= 1 worker_id = workers_list.pop() backend.send(worker_id, zmq.SNDMORE) backend.send("", zmq.SNDMORE) backend.send(client_addr, zmq.SNDMORE) backend.send(request) #out of infinite loop: do some housekeeping time.sleep (1) frontend.close() backend.close() context.term() if name == "main": main()
client发出的数据结构:
路由处理成:
再转给worker成:
工人处理的数据:
由worker到client是一个逆序过程,不过因为两边都是REQ类型,所以其实是一致的。
[补]:
通常,上层的api会帮我们做一些事,免去了逐步封装数据的麻烦,比如在python中,最终代码会是这个样子:
Python代码
import threading
import time
import zmq
NBR_CLIENTS = 10
NBR_WORKERS = 3
def worker_thread(worker_url, context, i):
""" Worker using REQ socket to do LRU routing """
socket = context.socket(zmq.REQ)
identity = "Worker-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #set worker identity
socket.connect(worker_url)
# Tell the borker we are ready for work
socket.send("READY")
try:
while True:
[address, request] = socket.recv_multipart()
print("%s: %s\n" %(identity, request))
socket.send_multipart([address, "", "OK"])
except zmq.ZMQError, zerr:
# context terminated so quit silently
if zerr.strerror == 'Context was terminated':
return
else:
raise zerr
def client_thread(client_url, context, i):
""" Basic request-reply client using REQ socket """
socket = context.socket(zmq.REQ)
identity = "Client-%d" % (i)
socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier
socket.connect(client_url)
# Send request, get reply
socket.send("HELLO")
reply = socket.recv()
print("%s: %s\n" % (identity, reply))
return
def main():
""" main method """
url_worker = "inproc://workers"
url_client = "inproc://clients"
client_nbr = NBR_CLIENTS
# Prepare our context and sockets
context = zmq.Context(1)
frontend = context.socket(zmq.XREP)
frontend.bind(url_client)
backend = context.socket(zmq.XREP)
backend.bind(url_worker)
# create workers and clients threads
for i in range(NBR_WORKERS):
thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, ))
thread.start()
for i in range(NBR_CLIENTS):
thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, ))
thread_c.start()
# Logic of LRU loop
# - Poll backend always, frontend only if 1+ worker ready
# - If worker replies, queue worker as ready and forward reply
# to client if necessary
# - If client requests, pop next worker and send request to it
# Queue of available workers
available_workers = 0
workers_list = []
# init poller
poller = zmq.Poller()
# Always poll for worker activity on backend
poller.register(backend, zmq.POLLIN)
# Poll front-end only if we have available workers
poller.register(frontend, zmq.POLLIN)
while True:
socks = dict(poller.poll())
# Handle worker activity on backend
if (backend in socks and socks[backend] == zmq.POLLIN):
# Queue worker address for LRU routing
message = backend.recv_multipart()
assert available_workers < NBR_WORKERS
worker_addr = message[0]
# add worker back to the list of workers
available_workers += 1
workers_list.append(worker_addr)
# Second frame is empty
empty = message[1]
assert empty == ""
# Third frame is READY or else a client reply address
client_addr = message[2]
# If client reply, send rest back to frontend
if client_addr != "READY":
# Following frame is empty
empty = message[3]
assert empty == ""
reply = message[4]
frontend.send_multipart([client_addr, "", reply])
client_nbr -= 1
if client_nbr == 0:
break # Exit after N messages
# poll on frontend only if workers are available
if available_workers > 0:
if (frontend in socks and socks[frontend] == zmq.POLLIN):
# Now get next client request, route to LRU worker
# Client request is [address][empty][request]
[client_addr, empty, request ] = frontend.recv_multipart()
assert empty == ""
# Dequeue and drop the next worker address
available_workers -= 1
worker_id = workers_list.pop()
backend.send_multipart([worker_id, "", client_addr, request])
#out of infinite loop: do some housekeeping
time.sleep (1)
frontend.close()
backend.close()
context.term()
if name == "main":
main()
相关文章推荐
- zeroMQ初体验-16.应答模式进阶(二)-定制路由1
- zeroMQ初体验-17.应答模式进阶(三)-定制路由2
- zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
- zeroMQ初体验-15.应答模式进阶(一)-数据的封装
- zeroMQ初体验-19.应答模式进阶(五)-异步式应答
- zeroMQ初体验-21.应答模式进阶(七)-云计算
- zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
- zeroMQ初体验-13.发布/订阅模式 进阶
- zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
- zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
- zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
- zeroMQ初体验-29.可靠性-自由模式
- 《BREW进阶与精通――3G移动增值业务的运营、定制与开发》连载之18---商业模式
- ZeroMQ学习 (四)请求-应答模式
- vue-router菜鸟进阶!(路由组件传参 vs H5 History模式)
- RabbitMQ3.7.2入门到进阶之路由模式(Routing)
- ZeroMQ指南:第3章:高级请求-应答模式
- zeroMQ初体验-2.发布订阅模式(pub/sub)
- zeroMQ初体验-3.分而治之模式(push/pull)
- zeroMQ初体验-23.可靠性-懒惰的海盗模式