zeroMQ初体验-21.应答模式进阶(七)-云计算
2011-10-10 15:32
627 查看
这里给出了一个最近很火的"云计算"案例。
定义:
在各种各样的硬件设备上运行着N多的worker,而任意一个worker都能够独立解决一个问题。每一个集群有这样的设备成千上百个,而同时又有一打这样的集群互相连接交互,于是,这么一个总的集合称为“云”,而其提供的服务称为“云计算”。
在“云中”的任一设备或集群都可以做到"进出自由"、任何崩溃的worker都能被检测和重启,那么,基本上就可以称为靠谱的云计算了。
首先,是一个独立的集群:
是不是很眼熟?其实这里已经有过介绍。
然后,进行扩展到多个集群:
这张图中有一个很明显的问题:两个集群间的client和worker如何互相访问?
此处有两种解决方案:
1.
这个看起来还不错,不过却有"忙者恒忙"的坏处:一个worker说“我ok了”,两个路由都知道了,同一时刻都分配了任务给他.这不是我们想要的。
2.
这个看上去更加简洁,只有中间商之间互相交换资源以达成目标。这其实是一个较"经济人"算法复杂些的算法--“经济人”互相又是"分包商"的角色。
现在,咱们选择第二种方案,那么两个中间件互联的方案选择又会衍生出好几种方式,在这里,先给出最简单的(也是我们一直在用的)“应答方案”,将中间件再组合成类c/s应答形式:
如此,似乎又产生了一个新问题(太过简单本身也是个问题啊):传统的c/s应答模式一次只能响应一个请求,然后。。就没有然后了。so,这里中间件的连接更靠谱的是使用异步连接。
除此之外,文中还给出了一个类似DNS的方案,中间件之间以“发布/订阅”的方式来交换各自的资源情况,再以“异步应答”来交换task。
在即将的案例前,良好的命名规范是非常必要的。
这里会出现三组“插座”:
1.集群内部的req/res:localfe,localbe
2.集群间的req/res:cloudfe,cloudbe
3.集群间的资源状态:statefe,statebe
最终,这个中间件会是这个样子:
下面,我们会将中间件的插座适当的分离。
1.资源状态:
<pre name="code" class="python">
import zmq
import time
import random
def main(args):
myself = args[1]
print "Hello, I am", myself
context = zmq.Context()
# State Back-End
statebe = context.socket(zmq.PUB)
# State Front-End
statefe = context.socket(zmq.SUB)
statefe.setsockopt(zmq.SUBSCRIBE, '')
bind_address = "ipc://" + myself + "-state.ipc"
statebe.bind(bind_address)
for i in range(len(args) - 2):
endpoint = "ipc://" + args[i + 2] + "-state.ipc"
statefe.connect(endpoint)
time.sleep(1.0)
poller = zmq.Poller()
poller.register(statefe, zmq.POLLIN)
while True:
########## Solution with poll() ##########
socks = dict(poller.poll(1000))
try:
# Handle incoming status message
if socks[statefe] == zmq.POLLIN:
msg = statefe.recv_multipart()
print 'Received:', msg
except KeyError:
# Send our address and a random value
# for worker availability
msg = []
msg.append(bind_address)
msg.append(str(random.randrange(1, 10)))
statebe.send_multipart(msg)
##################################
######### Solution with select() #########
# (pollin, pollout, pollerr) = zmq.select([statefe], [], [], 1)
#
# if len(pollin) > 0 and pollin[0] == statefe:
# # Handle incoming status message
# msg = statefe.recv_multipart()
# print 'Received:', msg
#
# else:
# # Send our address and a random value
# # for worker availability
# msg = []
# msg.append(bind_address)
# msg.append(str(random.randrange(1, 10)))
# statebe.send_multipart(msg)
##################################
poller.unregister(statefe)
time.sleep(1.0)
if name == 'main':
import sys
if len(sys.argv) < 2:
print "Usage: peering.py <myself> <peer_1> … <peer_N>"
raise SystemExit
main(sys.argv)
</pre>
2.异步Req/res:
<pre name="code" class="java">
require"zmq"
require"zmq.poller"
require"zmq.threads"
require"zmsg"
local tremove = table.remove
local NBR_CLIENTS = 10
local NBR_WORKERS = 3
local pre_code = [[
local self, seed = …
local zmq = require"zmq"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
local context = zmq.init(1)
]]
-- Request-reply client using REQ socket
--
local client_task = pre_code .. [[
local client = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(client:connect(endpoint))
while true do
-- Send request, get reply
local msg = zmsg.new ("HELLO")
msg:send(client)
msg = zmsg.recv (client)
printf ("I: client status: %s\n", msg:body())
end
-- We never get here but if we did, this is how we'd exit cleanly
client:close()
context:term()
]]
-- Worker using REQ socket to do LRU routing
--
local worker_task = pre_code .. [[
local worker = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(worker:connect(endpoint))
-- Tell broker we're ready for work
local msg = zmsg.new ("READY")
msg:send(worker)
while true do
msg = zmsg.recv (worker)
-- Do some 'work'
s_sleep (1000)
msg:body_fmt("OK - %04x", randof (0x10000))
msg:send(worker)
end
-- We never get here but if we did, this is how we'd exit cleanly
worker:close()
context:term()
]]
-- First argument is this broker's name
-- Other arguments are our peers' names
--
s_version_assert (2, 1)
if (#arg < 1) then
printf ("syntax: peering2 me doyouend…\n")
os.exit(-1)
end
-- Our own name; in practice this'd be configured per node
local self = arg[1]
printf ("I: preparing broker at %s…\n", self)
math.randomseed(os.time())
-- Prepare our context and sockets
local context = zmq.init(1)
-- Bind cloud frontend to endpoint
local cloudfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-cloud.ipc", self)
cloudfe:setopt(zmq.IDENTITY, self)
assert(cloudfe:bind(endpoint))
-- Connect cloud backend to all peers
local cloudbe = context:socket(zmq.XREP)
cloudbe:setopt(zmq.IDENTITY, self)
local peers = {}
for n=2,#arg do
local peer = arg
-- add peer name to peers list.
peers[#peers + 1] = peer
peers[peer] = true -- map peer's name to 'true' for fast lookup
printf ("I: connecting to cloud frontend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-cloud.ipc", peer)
assert(cloudbe:connect(endpoint))
end
-- Prepare local frontend and backend
local localfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(localfe:bind(endpoint))
local localbe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(localbe:bind(endpoint))
-- Get user to tell us when we can start…
printf ("Press Enter when all brokers are started: ")
io.read('*l')
-- Start local workers
local workers = {}
for n=1,NBR_WORKERS do
local seed = os.time() + math.random()
workers
= zmq.threads.runstring(nil, worker_task, self, seed)
workers
:start(true)
end
-- Start local clients
local clients = {}
for n=1,NBR_CLIENTS do
local seed = os.time() + math.random()
clients
= zmq.threads.runstring(nil, client_task, self, seed)
clients
:start(true)
end
-- Interesting part
-- -------------------------------------------------------------
-- Request-reply flow
-- - Poll backends and process local/cloud replies
-- - While worker available, route localfe to local or cloud
-- Queue of available workers
local worker_queue = {}
local backends = zmq.poller(2)
local function send_reply(msg)
local address = msg:address()
-- Route reply to cloud if it's addressed to a broker
if peers[address] then
msg:send(cloudfe) -- reply is for a peer.
else
msg:send(localfe) -- reply is for a local client.
end
end
backends:add(localbe, zmq.POLLIN, function()
local msg = zmsg.recv(localbe)
-- Use worker address for LRU routing
worker_queue[#worker_queue + 1] = msg:unwrap()
-- if reply is not "READY" then route reply back to client.
if (msg:address() ~= "READY") then
send_reply(msg)
end
end)
backends:add(cloudbe, zmq.POLLIN, function()
local msg = zmsg.recv(cloudbe)
-- We don't use peer broker address for anything
msg:unwrap()
-- send reply back to client.
send_reply(msg)
end)
local frontends = zmq.poller(2)
local localfe_ready = false
local cloudfe_ready = false
frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)
while true do
local timeout = (#worker_queue > 0) and 1000000 or -1
-- If we have no workers anyhow, wait indefinitely
rc = backends:poll(timeout)
assert (rc >= 0)
-- Now route as many clients requests as we can handle
--
while (#worker_queue > 0) do
rc = frontends:poll(0)
assert (rc >= 0)
local reroutable = false
local msg
-- We'll do peer brokers first, to prevent starvation
if (cloudfe_ready) then
cloudfe_ready = false -- reset flag
msg = zmsg.recv (cloudfe)
reroutable = false
elseif (localfe_ready) then
localfe_ready = false -- reset flag
msg = zmsg.recv (localfe)
reroutable = true
else
break; -- No work, go back to backends
end
-- If reroutable, send to cloud 20% of the time
-- Here we'd normally use cloud status information
--
local percent = randof (5)
if (reroutable and #peers > 0 and percent == 0) then
-- Route to random broker peer
local random_peer = randof (#peers) + 1
msg:wrap(peers[random_peer], nil)
msg:send(cloudbe)
else
-- Dequeue and drop the next worker address
local worker = tremove(worker_queue, 1)
msg:wrap(worker, "")
msg:send(localbe)
end
end
end
-- We never get here but clean up anyhow
localbe:close()
cloudbe:close()
localfe:close()
cloudfe:close()
context:term()
</pre>
注意:
这里是lua代码,官方没有给出Python,改天补齐~
3.合并:
<pre name="code" class="java">
require"zmq"
require"zmq.poller"
require"zmq.threads"
require"zmsg"
local tremove = table.remove
local NBR_CLIENTS = 10
local NBR_WORKERS = 5
local pre_code = [[
local self, seed = …
local zmq = require"zmq"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
local context = zmq.init(1)
]]
-- Request-reply client using REQ socket
-- To simulate load, clients issue a burst of requests and then
-- sleep for a random period.
--
local client_task = pre_code .. [[
require"zmq.poller"
local client = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(client:connect(endpoint))
local monitor = context:socket(zmq.PUSH)
local endpoint = string.format("ipc://%s-monitor.ipc", self)
assert(monitor:connect(endpoint))
local poller = zmq.poller(1)
local task_id = nil
poller:add(client, zmq.POLLIN, function()
local msg = zmsg.recv (client)
-- Worker is supposed to answer us with our task id
assert (msg:body() == task_id)
-- mark task as processed.
task_id = nil
end)
local is_running = true
while is_running do
s_sleep (randof (5) * 1000)
local burst = randof (15)
while (burst > 0) do
burst = burst - 1
-- Send request with random hex ID
task_id = string.format("%04X", randof (0x10000))
local msg = zmsg.new(task_id)
msg:send(client)
-- Wait max ten seconds for a reply, then complain
rc = poller:poll(10 * 1000000)
assert (rc >= 0)
if task_id then
local msg = zmsg.new()
msg:body_fmt(
"E: CLIENT EXIT - lost task %s", task_id)
msg:send(monitor)
-- exit event loop
is_running = false
break
end
end
end
-- We never get here but if we did, this is how we'd exit cleanly
client:close()
monitor:close()
context:term()
]]
-- Worker using REQ socket to do LRU routing
--
local worker_task = pre_code .. [[
local worker = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(worker:connect(endpoint))
-- Tell broker we're ready for work
local msg = zmsg.new ("READY")
msg:send(worker)
while true do
-- Workers are busy for 0/1/2 seconds
msg = zmsg.recv (worker)
s_sleep (randof (2) * 1000)
msg:send(worker)
end
-- We never get here but if we did, this is how we'd exit cleanly
worker:close()
context:term()
]]
-- First argument is this broker's name
-- Other arguments are our peers' names
--
s_version_assert (2, 1)
if (#arg < 1) then
printf ("syntax: peering3 me doyouend…\n")
os.exit(-1)
end
-- Our own name; in practice this'd be configured per node
local self = arg[1]
printf ("I: preparing broker at %s…\n", self)
math.randomseed(os.time())
-- Prepare our context and sockets
local context = zmq.init(1)
-- Bind cloud frontend to endpoint
local cloudfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-cloud.ipc", self)
cloudfe:setopt(zmq.IDENTITY, self)
assert(cloudfe:bind(endpoint))
-- Bind state backend / publisher to endpoint
local statebe = context:socket(zmq.PUB)
local endpoint = string.format("ipc://%s-state.ipc", self)
assert(statebe:bind(endpoint))
-- Connect cloud backend to all peers
local cloudbe = context:socket(zmq.XREP)
cloudbe:setopt(zmq.IDENTITY, self)
for n=2,#arg do
local peer = arg
printf ("I: connecting to cloud frontend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-cloud.ipc", peer)
assert(cloudbe:connect(endpoint))
end
-- Connect statefe to all peers
local statefe = context:socket(zmq.SUB)
statefe:setopt(zmq.SUBSCRIBE, "", 0)
local peers = {}
for n=2,#arg do
local peer = arg
-- add peer name to peers list.
peers[#peers + 1] = peer
peers[peer] = 0 -- set peer's initial capacity to zero.
printf ("I: connecting to state backend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-state.ipc", peer)
assert(statefe:connect(endpoint))
end
-- Prepare local frontend and backend
local localfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(localfe:bind(endpoint))
local localbe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(localbe:bind(endpoint))
-- Prepare monitor socket
local monitor = context:socket(zmq.PULL)
local endpoint = string.format("ipc://%s-monitor.ipc", self)
assert(monitor:bind(endpoint))
-- Start local workers
local workers = {}
for n=1,NBR_WORKERS do
local seed = os.time() + math.random()
workers
= zmq.threads.runstring(nil, worker_task, self, seed)
workers
:start(true)
end
-- Start local clients
local clients = {}
for n=1,NBR_CLIENTS do
local seed = os.time() + math.random()
clients
= zmq.threads.runstring(nil, client_task, self, seed)
clients
:start(true)
end
-- Interesting part
-- -------------------------------------------------------------
-- Publish-subscribe flow
-- - Poll statefe and process capacity updates
-- - Each time capacity changes, broadcast new value
-- Request-reply flow
-- - Poll primary and process local/cloud replies
-- - While worker available, route localfe to local or cloud
-- Queue of available workers
local local_capacity = 0
local cloud_capacity = 0
local worker_queue = {}
local backends = zmq.poller(2)
local function send_reply(msg)
local address = msg:address()
-- Route reply to cloud if it's addressed to a broker
if peers[address] then
msg:send(cloudfe) -- reply is for a peer.
else
msg:send(localfe) -- reply is for a local client.
end
end
backends:add(localbe, zmq.POLLIN, function()
local msg = zmsg.recv(localbe)
-- Use worker address for LRU routing
local_capacity = local_capacity + 1
worker_queue[local_capacity] = msg:unwrap()
-- if reply is not "READY" then route reply back to client.
if (msg:address() ~= "READY") then
send_reply(msg)
end
end)
backends:add(cloudbe, zmq.POLLIN, function()
local msg = zmsg.recv(cloudbe)
-- We don't use peer broker address for anything
msg:unwrap()
-- send reply back to client.
send_reply(msg)
end)
backends:add(statefe, zmq.POLLIN, function()
local msg = zmsg.recv (statefe)
-- TODO: track capacity for each peer
cloud_capacity = tonumber(msg:body())
end)
backends:add(monitor, zmq.POLLIN, function()
local msg = zmsg.recv (monitor)
printf("%s\n", msg:body())
end)
local frontends = zmq.poller(2)
local localfe_ready = false
local cloudfe_ready = false
frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)
local MAX_BACKEND_REPLIES = 20
while true do
-- If we have no workers anyhow, wait indefinitely
local timeout = (local_capacity > 0) and 1000000 or -1
local rc, err = backends:poll(timeout)
assert (rc >= 0, err)
-- Track if capacity changes during this iteration
local previous = local_capacity
-- Now route as many clients requests as we can handle
-- - If we have local capacity we poll both localfe and cloudfe
-- - If we have cloud capacity only, we poll just localfe
-- - Route any request locally if we can, else to cloud
--
while ((local_capacity + cloud_capacity) > 0) do
local rc, err = frontends:poll(0)
assert (rc >= 0, err)
if (localfe_ready) then
localfe_ready = false
msg = zmsg.recv (localfe)
elseif (cloudfe_ready and local_capacity > 0) then
cloudfe_ready = false
-- we have local capacity poll cloud frontend for work.
msg = zmsg.recv (cloudfe)
else
break; -- No work, go back to primary
end
if (local_capacity > 0) then
-- Dequeue and drop the next worker address
local worker = tremove(worker_queue, 1)
local_capacity = local_capacity - 1
msg:wrap(worker, "")
msg:send(localbe)
else
-- Route to random broker peer
printf ("I: route request %s to cloud…\n",
msg:body())
local random_peer = randof (#peers) + 1
msg:wrap(peers[random_peer], nil)
msg:send(cloudbe)
end
end
if (local_capacity ~= previous) then
-- Broadcast new capacity
local msg = zmsg.new()
-- TODO: send our name with capacity.
msg:body_fmt("%d", local_capacity)
-- We stick our own address onto the envelope
msg:wrap(self, nil)
msg:send(statebe)
end
end
-- We never get here but clean up anyhow
localbe:close()
cloudbe:close()
localfe:close()
cloudfe:close()
statefe:close()
monitor:close()
context:term()
</pre>
ok,终于,一个完整的“云端”呈现了出来(虽然只用了一个进程)。不过从代码中,可以很清晰的划分各个模块。
不过,这里还是不可避免的涉及到了数据的安全性:如果其他的集群down了怎么办?通过更短时间的状态更新?似乎并不治本。或许一个回复链路可以解决。好吧,那是之后要解决的问题了。
定义:
在各种各样的硬件设备上运行着N多的worker,而任意一个worker都能够独立解决一个问题。每一个集群有这样的设备成千上百个,而同时又有一打这样的集群互相连接交互,于是,这么一个总的集合称为“云”,而其提供的服务称为“云计算”。
在“云中”的任一设备或集群都可以做到"进出自由"、任何崩溃的worker都能被检测和重启,那么,基本上就可以称为靠谱的云计算了。
首先,是一个独立的集群:
是不是很眼熟?其实这里已经有过介绍。
然后,进行扩展到多个集群:
这张图中有一个很明显的问题:两个集群间的client和worker如何互相访问?
此处有两种解决方案:
1.
这个看起来还不错,不过却有"忙者恒忙"的坏处:一个worker说“我ok了”,两个路由都知道了,同一时刻都分配了任务给他.这不是我们想要的。
2.
这个看上去更加简洁,只有中间商之间互相交换资源以达成目标。这其实是一个较"经济人"算法复杂些的算法--“经济人”互相又是"分包商"的角色。
现在,咱们选择第二种方案,那么两个中间件互联的方案选择又会衍生出好几种方式,在这里,先给出最简单的(也是我们一直在用的)“应答方案”,将中间件再组合成类c/s应答形式:
如此,似乎又产生了一个新问题(太过简单本身也是个问题啊):传统的c/s应答模式一次只能响应一个请求,然后。。就没有然后了。so,这里中间件的连接更靠谱的是使用异步连接。
除此之外,文中还给出了一个类似DNS的方案,中间件之间以“发布/订阅”的方式来交换各自的资源情况,再以“异步应答”来交换task。
在即将的案例前,良好的命名规范是非常必要的。
这里会出现三组“插座”:
1.集群内部的req/res:localfe,localbe
2.集群间的req/res:cloudfe,cloudbe
3.集群间的资源状态:statefe,statebe
最终,这个中间件会是这个样子:
下面,我们会将中间件的插座适当的分离。
1.资源状态:
<pre name="code" class="python">
import zmq
import time
import random
def main(args):
myself = args[1]
print "Hello, I am", myself
context = zmq.Context()
# State Back-End
statebe = context.socket(zmq.PUB)
# State Front-End
statefe = context.socket(zmq.SUB)
statefe.setsockopt(zmq.SUBSCRIBE, '')
bind_address = "ipc://" + myself + "-state.ipc"
statebe.bind(bind_address)
for i in range(len(args) - 2):
endpoint = "ipc://" + args[i + 2] + "-state.ipc"
statefe.connect(endpoint)
time.sleep(1.0)
poller = zmq.Poller()
poller.register(statefe, zmq.POLLIN)
while True:
########## Solution with poll() ##########
socks = dict(poller.poll(1000))
try:
# Handle incoming status message
if socks[statefe] == zmq.POLLIN:
msg = statefe.recv_multipart()
print 'Received:', msg
except KeyError:
# Send our address and a random value
# for worker availability
msg = []
msg.append(bind_address)
msg.append(str(random.randrange(1, 10)))
statebe.send_multipart(msg)
##################################
######### Solution with select() #########
# (pollin, pollout, pollerr) = zmq.select([statefe], [], [], 1)
#
# if len(pollin) > 0 and pollin[0] == statefe:
# # Handle incoming status message
# msg = statefe.recv_multipart()
# print 'Received:', msg
#
# else:
# # Send our address and a random value
# # for worker availability
# msg = []
# msg.append(bind_address)
# msg.append(str(random.randrange(1, 10)))
# statebe.send_multipart(msg)
##################################
poller.unregister(statefe)
time.sleep(1.0)
if name == 'main':
import sys
if len(sys.argv) < 2:
print "Usage: peering.py <myself> <peer_1> … <peer_N>"
raise SystemExit
main(sys.argv)
</pre>
2.异步Req/res:
<pre name="code" class="java">
require"zmq"
require"zmq.poller"
require"zmq.threads"
require"zmsg"
local tremove = table.remove
local NBR_CLIENTS = 10
local NBR_WORKERS = 3
local pre_code = [[
local self, seed = …
local zmq = require"zmq"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
local context = zmq.init(1)
]]
-- Request-reply client using REQ socket
--
local client_task = pre_code .. [[
local client = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(client:connect(endpoint))
while true do
-- Send request, get reply
local msg = zmsg.new ("HELLO")
msg:send(client)
msg = zmsg.recv (client)
printf ("I: client status: %s\n", msg:body())
end
-- We never get here but if we did, this is how we'd exit cleanly
client:close()
context:term()
]]
-- Worker using REQ socket to do LRU routing
--
local worker_task = pre_code .. [[
local worker = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(worker:connect(endpoint))
-- Tell broker we're ready for work
local msg = zmsg.new ("READY")
msg:send(worker)
while true do
msg = zmsg.recv (worker)
-- Do some 'work'
s_sleep (1000)
msg:body_fmt("OK - %04x", randof (0x10000))
msg:send(worker)
end
-- We never get here but if we did, this is how we'd exit cleanly
worker:close()
context:term()
]]
-- First argument is this broker's name
-- Other arguments are our peers' names
--
s_version_assert (2, 1)
if (#arg < 1) then
printf ("syntax: peering2 me doyouend…\n")
os.exit(-1)
end
-- Our own name; in practice this'd be configured per node
local self = arg[1]
printf ("I: preparing broker at %s…\n", self)
math.randomseed(os.time())
-- Prepare our context and sockets
local context = zmq.init(1)
-- Bind cloud frontend to endpoint
local cloudfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-cloud.ipc", self)
cloudfe:setopt(zmq.IDENTITY, self)
assert(cloudfe:bind(endpoint))
-- Connect cloud backend to all peers
local cloudbe = context:socket(zmq.XREP)
cloudbe:setopt(zmq.IDENTITY, self)
local peers = {}
for n=2,#arg do
local peer = arg
-- add peer name to peers list.
peers[#peers + 1] = peer
peers[peer] = true -- map peer's name to 'true' for fast lookup
printf ("I: connecting to cloud frontend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-cloud.ipc", peer)
assert(cloudbe:connect(endpoint))
end
-- Prepare local frontend and backend
local localfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(localfe:bind(endpoint))
local localbe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(localbe:bind(endpoint))
-- Get user to tell us when we can start…
printf ("Press Enter when all brokers are started: ")
io.read('*l')
-- Start local workers
local workers = {}
for n=1,NBR_WORKERS do
local seed = os.time() + math.random()
workers
= zmq.threads.runstring(nil, worker_task, self, seed)
workers
:start(true)
end
-- Start local clients
local clients = {}
for n=1,NBR_CLIENTS do
local seed = os.time() + math.random()
clients
= zmq.threads.runstring(nil, client_task, self, seed)
clients
:start(true)
end
-- Interesting part
-- -------------------------------------------------------------
-- Request-reply flow
-- - Poll backends and process local/cloud replies
-- - While worker available, route localfe to local or cloud
-- Queue of available workers
local worker_queue = {}
local backends = zmq.poller(2)
local function send_reply(msg)
local address = msg:address()
-- Route reply to cloud if it's addressed to a broker
if peers[address] then
msg:send(cloudfe) -- reply is for a peer.
else
msg:send(localfe) -- reply is for a local client.
end
end
backends:add(localbe, zmq.POLLIN, function()
local msg = zmsg.recv(localbe)
-- Use worker address for LRU routing
worker_queue[#worker_queue + 1] = msg:unwrap()
-- if reply is not "READY" then route reply back to client.
if (msg:address() ~= "READY") then
send_reply(msg)
end
end)
backends:add(cloudbe, zmq.POLLIN, function()
local msg = zmsg.recv(cloudbe)
-- We don't use peer broker address for anything
msg:unwrap()
-- send reply back to client.
send_reply(msg)
end)
local frontends = zmq.poller(2)
local localfe_ready = false
local cloudfe_ready = false
frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)
while true do
local timeout = (#worker_queue > 0) and 1000000 or -1
-- If we have no workers anyhow, wait indefinitely
rc = backends:poll(timeout)
assert (rc >= 0)
-- Now route as many clients requests as we can handle
--
while (#worker_queue > 0) do
rc = frontends:poll(0)
assert (rc >= 0)
local reroutable = false
local msg
-- We'll do peer brokers first, to prevent starvation
if (cloudfe_ready) then
cloudfe_ready = false -- reset flag
msg = zmsg.recv (cloudfe)
reroutable = false
elseif (localfe_ready) then
localfe_ready = false -- reset flag
msg = zmsg.recv (localfe)
reroutable = true
else
break; -- No work, go back to backends
end
-- If reroutable, send to cloud 20% of the time
-- Here we'd normally use cloud status information
--
local percent = randof (5)
if (reroutable and #peers > 0 and percent == 0) then
-- Route to random broker peer
local random_peer = randof (#peers) + 1
msg:wrap(peers[random_peer], nil)
msg:send(cloudbe)
else
-- Dequeue and drop the next worker address
local worker = tremove(worker_queue, 1)
msg:wrap(worker, "")
msg:send(localbe)
end
end
end
-- We never get here but clean up anyhow
localbe:close()
cloudbe:close()
localfe:close()
cloudfe:close()
context:term()
</pre>
注意:
这里是lua代码,官方没有给出Python,改天补齐~
3.合并:
<pre name="code" class="java">
require"zmq"
require"zmq.poller"
require"zmq.threads"
require"zmsg"
local tremove = table.remove
local NBR_CLIENTS = 10
local NBR_WORKERS = 5
local pre_code = [[
local self, seed = …
local zmq = require"zmq"
local zmsg = require"zmsg"
require"zhelpers"
math.randomseed(seed)
local context = zmq.init(1)
]]
-- Request-reply client using REQ socket
-- To simulate load, clients issue a burst of requests and then
-- sleep for a random period.
--
local client_task = pre_code .. [[
require"zmq.poller"
local client = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(client:connect(endpoint))
local monitor = context:socket(zmq.PUSH)
local endpoint = string.format("ipc://%s-monitor.ipc", self)
assert(monitor:connect(endpoint))
local poller = zmq.poller(1)
local task_id = nil
poller:add(client, zmq.POLLIN, function()
local msg = zmsg.recv (client)
-- Worker is supposed to answer us with our task id
assert (msg:body() == task_id)
-- mark task as processed.
task_id = nil
end)
local is_running = true
while is_running do
s_sleep (randof (5) * 1000)
local burst = randof (15)
while (burst > 0) do
burst = burst - 1
-- Send request with random hex ID
task_id = string.format("%04X", randof (0x10000))
local msg = zmsg.new(task_id)
msg:send(client)
-- Wait max ten seconds for a reply, then complain
rc = poller:poll(10 * 1000000)
assert (rc >= 0)
if task_id then
local msg = zmsg.new()
msg:body_fmt(
"E: CLIENT EXIT - lost task %s", task_id)
msg:send(monitor)
-- exit event loop
is_running = false
break
end
end
end
-- We never get here but if we did, this is how we'd exit cleanly
client:close()
monitor:close()
context:term()
]]
-- Worker using REQ socket to do LRU routing
--
local worker_task = pre_code .. [[
local worker = context:socket(zmq.REQ)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(worker:connect(endpoint))
-- Tell broker we're ready for work
local msg = zmsg.new ("READY")
msg:send(worker)
while true do
-- Workers are busy for 0/1/2 seconds
msg = zmsg.recv (worker)
s_sleep (randof (2) * 1000)
msg:send(worker)
end
-- We never get here but if we did, this is how we'd exit cleanly
worker:close()
context:term()
]]
-- First argument is this broker's name
-- Other arguments are our peers' names
--
s_version_assert (2, 1)
if (#arg < 1) then
printf ("syntax: peering3 me doyouend…\n")
os.exit(-1)
end
-- Our own name; in practice this'd be configured per node
local self = arg[1]
printf ("I: preparing broker at %s…\n", self)
math.randomseed(os.time())
-- Prepare our context and sockets
local context = zmq.init(1)
-- Bind cloud frontend to endpoint
local cloudfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-cloud.ipc", self)
cloudfe:setopt(zmq.IDENTITY, self)
assert(cloudfe:bind(endpoint))
-- Bind state backend / publisher to endpoint
local statebe = context:socket(zmq.PUB)
local endpoint = string.format("ipc://%s-state.ipc", self)
assert(statebe:bind(endpoint))
-- Connect cloud backend to all peers
local cloudbe = context:socket(zmq.XREP)
cloudbe:setopt(zmq.IDENTITY, self)
for n=2,#arg do
local peer = arg
printf ("I: connecting to cloud frontend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-cloud.ipc", peer)
assert(cloudbe:connect(endpoint))
end
-- Connect statefe to all peers
local statefe = context:socket(zmq.SUB)
statefe:setopt(zmq.SUBSCRIBE, "", 0)
local peers = {}
for n=2,#arg do
local peer = arg
-- add peer name to peers list.
peers[#peers + 1] = peer
peers[peer] = 0 -- set peer's initial capacity to zero.
printf ("I: connecting to state backend at '%s'\n", peer)
local endpoint = string.format("ipc://%s-state.ipc", peer)
assert(statefe:connect(endpoint))
end
-- Prepare local frontend and backend
local localfe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localfe.ipc", self)
assert(localfe:bind(endpoint))
local localbe = context:socket(zmq.XREP)
local endpoint = string.format("ipc://%s-localbe.ipc", self)
assert(localbe:bind(endpoint))
-- Prepare monitor socket
local monitor = context:socket(zmq.PULL)
local endpoint = string.format("ipc://%s-monitor.ipc", self)
assert(monitor:bind(endpoint))
-- Start local workers
local workers = {}
for n=1,NBR_WORKERS do
local seed = os.time() + math.random()
workers
= zmq.threads.runstring(nil, worker_task, self, seed)
workers
:start(true)
end
-- Start local clients
local clients = {}
for n=1,NBR_CLIENTS do
local seed = os.time() + math.random()
clients
= zmq.threads.runstring(nil, client_task, self, seed)
clients
:start(true)
end
-- Interesting part
-- -------------------------------------------------------------
-- Publish-subscribe flow
-- - Poll statefe and process capacity updates
-- - Each time capacity changes, broadcast new value
-- Request-reply flow
-- - Poll primary and process local/cloud replies
-- - While worker available, route localfe to local or cloud
-- Queue of available workers
local local_capacity = 0
local cloud_capacity = 0
local worker_queue = {}
local backends = zmq.poller(2)
local function send_reply(msg)
local address = msg:address()
-- Route reply to cloud if it's addressed to a broker
if peers[address] then
msg:send(cloudfe) -- reply is for a peer.
else
msg:send(localfe) -- reply is for a local client.
end
end
backends:add(localbe, zmq.POLLIN, function()
local msg = zmsg.recv(localbe)
-- Use worker address for LRU routing
local_capacity = local_capacity + 1
worker_queue[local_capacity] = msg:unwrap()
-- if reply is not "READY" then route reply back to client.
if (msg:address() ~= "READY") then
send_reply(msg)
end
end)
backends:add(cloudbe, zmq.POLLIN, function()
local msg = zmsg.recv(cloudbe)
-- We don't use peer broker address for anything
msg:unwrap()
-- send reply back to client.
send_reply(msg)
end)
backends:add(statefe, zmq.POLLIN, function()
local msg = zmsg.recv (statefe)
-- TODO: track capacity for each peer
cloud_capacity = tonumber(msg:body())
end)
backends:add(monitor, zmq.POLLIN, function()
local msg = zmsg.recv (monitor)
printf("%s\n", msg:body())
end)
local frontends = zmq.poller(2)
local localfe_ready = false
local cloudfe_ready = false
frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)
local MAX_BACKEND_REPLIES = 20
while true do
-- If we have no workers anyhow, wait indefinitely
local timeout = (local_capacity > 0) and 1000000 or -1
local rc, err = backends:poll(timeout)
assert (rc >= 0, err)
-- Track if capacity changes during this iteration
local previous = local_capacity
-- Now route as many clients requests as we can handle
-- - If we have local capacity we poll both localfe and cloudfe
-- - If we have cloud capacity only, we poll just localfe
-- - Route any request locally if we can, else to cloud
--
while ((local_capacity + cloud_capacity) > 0) do
local rc, err = frontends:poll(0)
assert (rc >= 0, err)
if (localfe_ready) then
localfe_ready = false
msg = zmsg.recv (localfe)
elseif (cloudfe_ready and local_capacity > 0) then
cloudfe_ready = false
-- we have local capacity poll cloud frontend for work.
msg = zmsg.recv (cloudfe)
else
break; -- No work, go back to primary
end
if (local_capacity > 0) then
-- Dequeue and drop the next worker address
local worker = tremove(worker_queue, 1)
local_capacity = local_capacity - 1
msg:wrap(worker, "")
msg:send(localbe)
else
-- Route to random broker peer
printf ("I: route request %s to cloud…\n",
msg:body())
local random_peer = randof (#peers) + 1
msg:wrap(peers[random_peer], nil)
msg:send(cloudbe)
end
end
if (local_capacity ~= previous) then
-- Broadcast new capacity
local msg = zmsg.new()
-- TODO: send our name with capacity.
msg:body_fmt("%d", local_capacity)
-- We stick our own address onto the envelope
msg:wrap(self, nil)
msg:send(statebe)
end
end
-- We never get here but clean up anyhow
localbe:close()
cloudbe:close()
localfe:close()
cloudfe:close()
statefe:close()
monitor:close()
context:term()
</pre>
ok,终于,一个完整的“云端”呈现了出来(虽然只用了一个进程)。不过从代码中,可以很清晰的划分各个模块。
不过,这里还是不可避免的涉及到了数据的安全性:如果其他的集群down了怎么办?通过更短时间的状态更新?似乎并不治本。或许一个回复链路可以解决。好吧,那是之后要解决的问题了。
相关文章推荐
- zeroMQ初体验-15.应答模式进阶(一)-数据的封装
- zeroMQ初体验-16.应答模式进阶(二)-定制路由1
- zeroMQ初体验-17.应答模式进阶(三)-定制路由2
- zeroMQ初体验-18.应答模式进阶(四)-定制路由3
- zeroMQ初体验-19.应答模式进阶(五)-异步式应答
- zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
- zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
- zeroMQ初体验-13.发布/订阅模式 进阶
- zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
- zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
- zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
- zeroMQ初体验-24.可靠性-简单的海盗模式
- zeroMQ初体验-6.多模式数据来源处理方案(multi sockets)
- zeroMQ初体验-25.可靠性-偏执的海盗模式
- zeroMQ初体验-26.可靠性-管家模式
- zeroMQ初体验-27.可靠性-硬盘模式
- zeroMQ初体验-9.优雅的扩展(代理模式)
- zeroMQ初体验-28.可靠性-主从模式
- zeroMQ初体验-29.可靠性-自由模式
- ZeroMQ指南:第3章:高级请求-应答模式