您的位置:首页 > 大数据 > 云计算

zeroMQ初体验-21.应答模式进阶(七)-云计算

2011-10-10 15:32 627 查看
这里给出了一个最近很火的"云计算"案例。

定义:

在各种各样的硬件设备上运行着N多的worker,而任意一个worker都能够独立解决一个问题。每一个集群有这样的设备成千上百个,而同时又有一打这样的集群互相连接交互,于是,这么一个总的集合称为“云”,而其提供的服务称为“云计算”。

在“云中”的任一设备或集群都可以做到"进出自由"、任何崩溃的worker都能被检测和重启,那么,基本上就可以称为靠谱的云计算了。

首先,是一个独立的集群:



是不是很眼熟?其实这里已经有过介绍。

然后,进行扩展到多个集群:



这张图中有一个很明显的问题:两个集群间的client和worker如何互相访问?

此处有两种解决方案:

1.



这个看起来还不错,不过却有"忙者恒忙"的坏处:一个worker说“我ok了”,两个路由都知道了,同一时刻都分配了任务给他.这不是我们想要的。

2.



这个看上去更加简洁,只有中间商之间互相交换资源以达成目标。这其实是一个较"经济人"算法复杂些的算法--“经济人”互相又是"分包商"的角色。

现在,咱们选择第二种方案,那么两个中间件互联的方案选择又会衍生出好几种方式,在这里,先给出最简单的(也是我们一直在用的)“应答方案”,将中间件再组合成类c/s应答形式:



如此,似乎又产生了一个新问题(太过简单本身也是个问题啊):传统的c/s应答模式一次只能响应一个请求,然后。。就没有然后了。so,这里中间件的连接更靠谱的是使用异步连接。

除此之外,文中还给出了一个类似DNS的方案,中间件之间以“发布/订阅”的方式来交换各自的资源情况,再以“异步应答”来交换task。

在即将的案例前,良好的命名规范是非常必要的。

这里会出现三组“插座”:

1.集群内部的req/res:localfe,localbe

2.集群间的req/res:cloudfe,cloudbe

3.集群间的资源状态:statefe,statebe

最终,这个中间件会是这个样子:



下面,我们会将中间件的插座适当的分离。

1.资源状态:



<pre name="code" class="python">

import zmq

import time

import random

def main(args):

    myself = args[1]

    print "Hello, I am", myself

    context = zmq.Context()

    # State Back-End

    statebe = context.socket(zmq.PUB)

    # State Front-End

    statefe = context.socket(zmq.SUB)

    statefe.setsockopt(zmq.SUBSCRIBE, '')

    bind_address = "ipc://" + myself + "-state.ipc"

    statebe.bind(bind_address)

    for i in range(len(args) - 2):

        endpoint = "ipc://" + args[i + 2] + "-state.ipc"

        statefe.connect(endpoint)

        time.sleep(1.0)

    poller = zmq.Poller()

    poller.register(statefe, zmq.POLLIN)

    while True:

########## Solution with poll() ##########

        socks = dict(poller.poll(1000))

        try:

            # Handle incoming status message

            if socks[statefe] == zmq.POLLIN:

                msg = statefe.recv_multipart()

                print 'Received:', msg

        except KeyError:

            # Send our address and a random value

            # for worker availability

            msg = []

            msg.append(bind_address)

            msg.append(str(random.randrange(1, 10)))

            statebe.send_multipart(msg)

##################################

######### Solution with select() #########

#        (pollin, pollout, pollerr) = zmq.select([statefe], [], [], 1)

#

#        if len(pollin) > 0 and pollin[0] == statefe:

#            # Handle incoming status message

#            msg = statefe.recv_multipart()

#            print 'Received:', msg

#

#        else:

#            # Send our address and a random value

#            # for worker availability

#            msg = []

#            msg.append(bind_address)

#            msg.append(str(random.randrange(1, 10)))

#            statebe.send_multipart(msg)

##################################

    poller.unregister(statefe)

    time.sleep(1.0)

if name == 'main':

    import sys

    if len(sys.argv) < 2:

        print "Usage: peering.py <myself> <peer_1> … <peer_N>"

        raise SystemExit

    main(sys.argv)

</pre>

2.异步Req/res:



<pre name="code" class="java">

require"zmq"

require"zmq.poller"

require"zmq.threads"

require"zmsg"

local tremove = table.remove

local NBR_CLIENTS  = 10

local NBR_WORKERS  = 3

local pre_code = [[

    local self, seed = …

    local zmq = require"zmq"

    local zmsg = require"zmsg"

    require"zhelpers"

    math.randomseed(seed)

    local context = zmq.init(1)

]]

--  Request-reply client using REQ socket

--

local client_task = pre_code .. [[

    local client = context:socket(zmq.REQ)

    local endpoint = string.format("ipc://%s-localfe.ipc", self)

    assert(client:connect(endpoint))

    while true do

        --  Send request, get reply

        local msg = zmsg.new ("HELLO")

        msg:send(client)

        msg = zmsg.recv (client)

        printf ("I: client status: %s\n", msg:body())

    end

    --  We never get here but if we did, this is how we'd exit cleanly

    client:close()

    context:term()

]]

--  Worker using REQ socket to do LRU routing

--

local worker_task = pre_code .. [[

    local worker = context:socket(zmq.REQ)

    local endpoint = string.format("ipc://%s-localbe.ipc", self)

    assert(worker:connect(endpoint))

    --  Tell broker we're ready for work

    local msg = zmsg.new ("READY")

    msg:send(worker)

    while true do

        msg = zmsg.recv (worker)

        --  Do some 'work'

        s_sleep (1000)

        msg:body_fmt("OK - %04x", randof (0x10000))

        msg:send(worker)

    end

    --  We never get here but if we did, this is how we'd exit cleanly

    worker:close()

    context:term()

]]

--  First argument is this broker's name

--  Other arguments are our peers' names

--

s_version_assert (2, 1)

if (#arg < 1) then

    printf ("syntax: peering2 me doyouend…\n")

    os.exit(-1)

end

--  Our own name; in practice this'd be configured per node

local self = arg[1]

printf ("I: preparing broker at %s…\n", self)

math.randomseed(os.time())

--  Prepare our context and sockets

local context = zmq.init(1)

--  Bind cloud frontend to endpoint

local cloudfe = context:socket(zmq.XREP)

local endpoint = string.format("ipc://%s-cloud.ipc", self)

cloudfe:setopt(zmq.IDENTITY, self)

assert(cloudfe:bind(endpoint))

--  Connect cloud backend to all peers

local cloudbe = context:socket(zmq.XREP)

cloudbe:setopt(zmq.IDENTITY, self)

local peers = {}

for n=2,#arg do

    local peer = arg

    -- add peer name to peers list.

    peers[#peers + 1] = peer

    peers[peer] = true -- map peer's name to 'true' for fast lookup

    printf ("I: connecting to cloud frontend at '%s'\n", peer)

    local endpoint = string.format("ipc://%s-cloud.ipc", peer)

    assert(cloudbe:connect(endpoint))

end

--  Prepare local frontend and backend

local localfe = context:socket(zmq.XREP)

local endpoint = string.format("ipc://%s-localfe.ipc", self)

assert(localfe:bind(endpoint))

local localbe = context:socket(zmq.XREP)

local endpoint = string.format("ipc://%s-localbe.ipc", self)

assert(localbe:bind(endpoint))

--  Get user to tell us when we can start…

printf ("Press Enter when all brokers are started: ")

io.read('*l')

--  Start local workers

local workers = {}

for n=1,NBR_WORKERS do

    local seed = os.time() + math.random()

    workers
 = zmq.threads.runstring(nil, worker_task, self, seed)

    workers
:start(true)

end

--  Start local clients

local clients = {}

for n=1,NBR_CLIENTS do

    local seed = os.time() + math.random()

    clients
 = zmq.threads.runstring(nil, client_task, self, seed)

    clients
:start(true)

end

--  Interesting part

--  -------------------------------------------------------------

--  Request-reply flow

--  - Poll backends and process local/cloud replies

--  - While worker available, route localfe to local or cloud

--  Queue of available workers

local worker_queue = {}

local backends = zmq.poller(2)

local function send_reply(msg)

    local address = msg:address()

    -- Route reply to cloud if it's addressed to a broker

    if peers[address] then

        msg:send(cloudfe) -- reply is for a peer.

    else

        msg:send(localfe) -- reply is for a local client.

    end

end

backends:add(localbe, zmq.POLLIN, function()

    local msg = zmsg.recv(localbe)

    --  Use worker address for LRU routing

    worker_queue[#worker_queue + 1] = msg:unwrap()

    -- if reply is not "READY" then route reply back to client.

    if (msg:address() ~= "READY") then

        send_reply(msg)

    end

end)

backends:add(cloudbe, zmq.POLLIN, function()

    local msg = zmsg.recv(cloudbe)

    --  We don't use peer broker address for anything

    msg:unwrap()

    -- send reply back to client.

    send_reply(msg)

end)

local frontends = zmq.poller(2)

local localfe_ready = false

local cloudfe_ready = false

frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)

frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)

while true do

    local timeout = (#worker_queue > 0) and 1000000 or -1

    --  If we have no workers anyhow, wait indefinitely

    rc = backends:poll(timeout)

    assert (rc >= 0)

    --  Now route as many clients requests as we can handle

    --

    while (#worker_queue > 0) do

        rc = frontends:poll(0)

        assert (rc >= 0)

        local reroutable = false

        local msg

        --  We'll do peer brokers first, to prevent starvation

        if (cloudfe_ready) then

            cloudfe_ready = false -- reset flag

            msg = zmsg.recv (cloudfe)

            reroutable = false

        elseif (localfe_ready) then

            localfe_ready = false -- reset flag

            msg = zmsg.recv (localfe)

            reroutable = true

        else

            break;      --  No work, go back to backends

        end

        --  If reroutable, send to cloud 20% of the time

        --  Here we'd normally use cloud status information

        --

        local percent = randof (5)

        if (reroutable and #peers > 0 and percent == 0) then

            --  Route to random broker peer

            local random_peer = randof (#peers) + 1

            msg:wrap(peers[random_peer], nil)

            msg:send(cloudbe)

        else

            --  Dequeue and drop the next worker address

            local worker = tremove(worker_queue, 1)

            msg:wrap(worker, "")

            msg:send(localbe)

        end

    end

end

--  We never get here but clean up anyhow

localbe:close()

cloudbe:close()

localfe:close()

cloudfe:close()

context:term()

</pre>

注意:

这里是lua代码,官方没有给出Python,改天补齐~

3.合并:

<pre name="code" class="java">

require"zmq"

require"zmq.poller"

require"zmq.threads"

require"zmsg"

local tremove = table.remove

local NBR_CLIENTS  = 10

local NBR_WORKERS  = 5

local pre_code = [[

    local self, seed = …

    local zmq = require"zmq"

    local zmsg = require"zmsg"

    require"zhelpers"

    math.randomseed(seed)

    local context = zmq.init(1)

]]

--  Request-reply client using REQ socket

--  To simulate load, clients issue a burst of requests and then

--  sleep for a random period.

--

local client_task = pre_code .. [[

    require"zmq.poller"

    local client = context:socket(zmq.REQ)

    local endpoint = string.format("ipc://%s-localfe.ipc", self)

    assert(client:connect(endpoint))

    local monitor = context:socket(zmq.PUSH)

    local endpoint = string.format("ipc://%s-monitor.ipc", self)

    assert(monitor:connect(endpoint))

    local poller = zmq.poller(1)

    local task_id = nil

    poller:add(client, zmq.POLLIN, function()

        local msg = zmsg.recv (client)

        --  Worker is supposed to answer us with our task id

        assert (msg:body() == task_id)

        -- mark task as processed.

        task_id = nil

    end)

    local is_running = true

    while is_running do

        s_sleep (randof (5) * 1000)

        local burst = randof (15)

        while (burst > 0) do

            burst = burst - 1

            --  Send request with random hex ID

            task_id = string.format("%04X", randof (0x10000))

            local msg = zmsg.new(task_id)

            msg:send(client)

            --  Wait max ten seconds for a reply, then complain

            rc = poller:poll(10 * 1000000)

            assert (rc >= 0)

            if task_id then

                local msg = zmsg.new()

                msg:body_fmt(

                    "E: CLIENT EXIT - lost task %s", task_id)

                msg:send(monitor)

                -- exit event loop

                is_running = false

                break

            end

        end

    end

    --  We never get here but if we did, this is how we'd exit cleanly

    client:close()

    monitor:close()

    context:term()

]]

--  Worker using REQ socket to do LRU routing

--

local worker_task = pre_code .. [[

    local worker = context:socket(zmq.REQ)

    local endpoint = string.format("ipc://%s-localbe.ipc", self)

    assert(worker:connect(endpoint))

    --  Tell broker we're ready for work

    local msg = zmsg.new ("READY")

    msg:send(worker)

    while true do

        --  Workers are busy for 0/1/2 seconds

        msg = zmsg.recv (worker)

        s_sleep (randof (2) * 1000)

        msg:send(worker)

    end

    --  We never get here but if we did, this is how we'd exit cleanly

    worker:close()

    context:term()

]]

--  First argument is this broker's name

--  Other arguments are our peers' names

--

s_version_assert (2, 1)

if (#arg < 1) then

    printf ("syntax: peering3 me doyouend…\n")

    os.exit(-1)

end

--  Our own name; in practice this'd be configured per node

local self = arg[1]

printf ("I: preparing broker at %s…\n", self)

math.randomseed(os.time())

--  Prepare our context and sockets

local context = zmq.init(1)

--  Bind cloud frontend to endpoint

local cloudfe = context:socket(zmq.XREP)

local endpoint = string.format("ipc://%s-cloud.ipc", self)

cloudfe:setopt(zmq.IDENTITY, self)

assert(cloudfe:bind(endpoint))

--  Bind state backend / publisher to endpoint

local statebe = context:socket(zmq.PUB)

local endpoint = string.format("ipc://%s-state.ipc", self)

assert(statebe:bind(endpoint))

--  Connect cloud backend to all peers

local cloudbe = context:socket(zmq.XREP)

cloudbe:setopt(zmq.IDENTITY, self)

for n=2,#arg do

    local peer = arg

    printf ("I: connecting to cloud frontend at '%s'\n", peer)

    local endpoint = string.format("ipc://%s-cloud.ipc", peer)

    assert(cloudbe:connect(endpoint))

end

--  Connect statefe to all peers

local statefe = context:socket(zmq.SUB)

statefe:setopt(zmq.SUBSCRIBE, "", 0)

local peers = {}

for n=2,#arg do

    local peer = arg

    -- add peer name to peers list.

    peers[#peers + 1] = peer

    peers[peer] = 0 -- set peer's initial capacity to zero.

    printf ("I: connecting to state backend at '%s'\n", peer)

    local endpoint = string.format("ipc://%s-state.ipc", peer)

    assert(statefe:connect(endpoint))

end

--  Prepare local frontend and backend

local localfe = context:socket(zmq.XREP)

local endpoint = string.format("ipc://%s-localfe.ipc", self)

assert(localfe:bind(endpoint))

local localbe = context:socket(zmq.XREP)

local endpoint = string.format("ipc://%s-localbe.ipc", self)

assert(localbe:bind(endpoint))

--  Prepare monitor socket

local monitor = context:socket(zmq.PULL)

local endpoint = string.format("ipc://%s-monitor.ipc", self)

assert(monitor:bind(endpoint))

--  Start local workers

local workers = {}

for n=1,NBR_WORKERS do

    local seed = os.time() + math.random()

    workers
 = zmq.threads.runstring(nil, worker_task, self, seed)

    workers
:start(true)

end

--  Start local clients

local clients = {}

for n=1,NBR_CLIENTS do

    local seed = os.time() + math.random()

    clients
 = zmq.threads.runstring(nil, client_task, self, seed)

    clients
:start(true)

end

--  Interesting part

--  -------------------------------------------------------------

--  Publish-subscribe flow

--  - Poll statefe and process capacity updates

--  - Each time capacity changes, broadcast new value

--  Request-reply flow

--  - Poll primary and process local/cloud replies

--  - While worker available, route localfe to local or cloud

--  Queue of available workers

local local_capacity = 0

local cloud_capacity = 0

local worker_queue = {}

local backends = zmq.poller(2)

local function send_reply(msg)

    local address = msg:address()

    -- Route reply to cloud if it's addressed to a broker

    if peers[address] then

        msg:send(cloudfe) -- reply is for a peer.

    else

        msg:send(localfe) -- reply is for a local client.

    end

end

backends:add(localbe, zmq.POLLIN, function()

    local msg = zmsg.recv(localbe)

    --  Use worker address for LRU routing

    local_capacity = local_capacity + 1

    worker_queue[local_capacity] = msg:unwrap()

    -- if reply is not "READY" then route reply back to client.

    if (msg:address() ~= "READY") then

        send_reply(msg)

    end

end)

backends:add(cloudbe, zmq.POLLIN, function()

    local msg = zmsg.recv(cloudbe)

    --  We don't use peer broker address for anything

    msg:unwrap()

    -- send reply back to client.

    send_reply(msg)

end)

backends:add(statefe, zmq.POLLIN, function()

    local msg = zmsg.recv (statefe)

    -- TODO: track capacity for each peer

    cloud_capacity = tonumber(msg:body())

end)

backends:add(monitor, zmq.POLLIN, function()

    local msg = zmsg.recv (monitor)

    printf("%s\n", msg:body())

end)

local frontends = zmq.poller(2)

local localfe_ready = false

local cloudfe_ready = false

frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)

frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)

local MAX_BACKEND_REPLIES = 20

while true do

    -- If we have no workers anyhow, wait indefinitely

    local timeout = (local_capacity > 0) and 1000000 or -1

    local rc, err = backends:poll(timeout)

    assert (rc >= 0, err)

    --  Track if capacity changes during this iteration

    local previous = local_capacity

    --  Now route as many clients requests as we can handle

    --  - If we have local capacity we poll both localfe and cloudfe

    --  - If we have cloud capacity only, we poll just localfe

    --  - Route any request locally if we can, else to cloud

    --

    while ((local_capacity + cloud_capacity) > 0) do

        local rc, err = frontends:poll(0)

        assert (rc >= 0, err)

        if (localfe_ready) then

            localfe_ready = false

            msg = zmsg.recv (localfe)

        elseif (cloudfe_ready and local_capacity > 0) then

            cloudfe_ready = false

            -- we have local capacity poll cloud frontend for work.

            msg = zmsg.recv (cloudfe)

        else

            break;      --  No work, go back to primary

        end

        if (local_capacity > 0) then

            --  Dequeue and drop the next worker address

            local worker = tremove(worker_queue, 1)

            local_capacity = local_capacity - 1

            msg:wrap(worker, "")

            msg:send(localbe)

        else

            --  Route to random broker peer

            printf ("I: route request %s to cloud…\n",

                msg:body())

            local random_peer = randof (#peers) + 1

            msg:wrap(peers[random_peer], nil)

            msg:send(cloudbe)

        end

    end

    if (local_capacity ~= previous) then

        --  Broadcast new capacity

        local msg = zmsg.new()

        -- TODO: send our name with capacity.

        msg:body_fmt("%d", local_capacity)

        --  We stick our own address onto the envelope

        msg:wrap(self, nil)

        msg:send(statebe)

    end

end

--  We never get here but clean up anyhow

localbe:close()

cloudbe:close()

localfe:close()

cloudfe:close()

statefe:close()

monitor:close()

context:term()

</pre>

ok,终于,一个完整的“云端”呈现了出来(虽然只用了一个进程)。不过从代码中,可以很清晰的划分各个模块。

不过,这里还是不可避免的涉及到了数据的安全性:如果其他的集群down了怎么办?通过更短时间的状态更新?似乎并不治本。或许一个回复链路可以解决。好吧,那是之后要解决的问题了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: