zeroMQ初体验-3.分而治之模式(push/pull)
2011-10-10 15:20
288 查看
push/pull模式:
模型描述:
1.上游(任务发布)
2.工人(中间,具体工作)
3.下游(信号采集或者工作结果收集)
上游代码:
Python代码
工作代码:
Python代码
下游代码:
Python代码
注意点:
这种模式与pub/sub模式一样都是单向的,区别有两点:
1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到
这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的那个"堵塞问题"的一个解决策略吧)
由上面的模型图可以看出,这是一个N:N的模式,在1:N的情况下,各消费者并不是平均消费的,而在N:1的情况下,则有所不同,如下图:
这种模式主要关注点在于,可以扩展中间worker,来到达并发的目的。
模型描述:
1.上游(任务发布)
2.工人(中间,具体工作)
3.下游(信号采集或者工作结果收集)
上游代码:
Python代码
import zmq import random import time context = zmq.Context() # Socket to send messages on sender = context.socket(zmq.PUSH) sender.bind("tcp://*:5557") print "Press Enter when the workers are ready: " _ = raw_input() print "Sending tasks to workers..." # The first message is "0" and signals start of batch sender.send('0') # Initialize random number generator random.seed() # Send 100 tasks total_msec = 0 for task_nbr in range(100): # Random workload from 1 to 100 msecs workload = random.randint(1, 100) total_msec += workload sender.send(str(workload)) print "Total expected cost: %s msec" % total_msec
工作代码:
Python代码
import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.connect("tcp://localhost:5557") # Socket to send messages to sender = context.socket(zmq.PUSH) sender.connect("tcp://localhost:5558") # Process tasks forever while True: s = receiver.recv() # Simple progress indicator for the viewer sys.stdout.write('.') sys.stdout.flush() # Do the work time.sleep(int(s)*0.001) # Send results to sink sender.send('')
下游代码:
Python代码
import sys import time import zmq context = zmq.Context() # Socket to receive messages on receiver = context.socket(zmq.PULL) receiver.bind("tcp://*:5558") # Wait for start of batch s = receiver.recv() # Start our clock now tstart = time.time() # Process 100 confirmations total_msec = 0 for task_nbr in range(100): s = receiver.recv() if task_nbr % 10 == 0: sys.stdout.write(':') else: sys.stdout.write('.') # Calculate and report duration of batch tend = time.time() print "Total elapsed time: %d msec" % ((tend-tstart)*1000)
注意点:
这种模式与pub/sub模式一样都是单向的,区别有两点:
1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到
这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的那个"堵塞问题"的一个解决策略吧)
由上面的模型图可以看出,这是一个N:N的模式,在1:N的情况下,各消费者并不是平均消费的,而在N:1的情况下,则有所不同,如下图:
这种模式主要关注点在于,可以扩展中间worker,来到达并发的目的。
相关文章推荐
- ZeroMQ(java)之push/pull模式
- ZeroMQ之PUSH/PULL模式
- zeromq/jzmq push/pull模式及java代码
- zeromq push-pull 模式
- ZeroMQ之push/pull模式
- ZeroMQ(java)之Push/Pull模式
- linux下用mython语言写的简单zeromq push/pull模式操作mysql数据
- 微博feed系统的推(push)模式和拉(pull)模式和时间分区拉模式架构探讨
- 微博feed系统的push和pull模式和时间分区拉模式架构探讨
- zeroMQ初体验-2.发布订阅模式(pub/sub)
- zeroMQ初体验-21.应答模式进阶(七)-云计算
- 消息队列的pull与push模式理解
- zeroMQ初体验-23.可靠性-懒惰的海盗模式
- zeroMQ初体验-24.可靠性-简单的海盗模式
- 微博feed系统的推(push)模式和拉(pull)模式和时间分区拉模式架构探讨
- pyzmq的4种模式(PUSH/PULL)笔记
- [水晶报表]PUSH与PULL模式
- c# 中的push和pull两种模式
- zeroMQ初体验-6.多模式数据来源处理方案(multi sockets)
- zeroMQ初体验-25.可靠性-偏执的海盗模式