您的位置:首页 > 编程语言 > Python开发

python日常笔记3--关于分布式任务的分配

2017-10-30 17:40 344 查看
分布式任务分配,目前流行的就是master-worker型,也就是有个master进程(或线程)来分配任务,其余的worker进程(或线程)来处理任务。我们一般都是采取多进程分布式处理任务,因为多进程比多线程要更加的稳定(起码在python中是这样)。

首先写master.py

import queue, random, time
from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
pass

task_queue = queue.Queue()
result_queu = queue.Queue()
res_info = False

QueueManager.register('task_queue', callable=lambda :task_queue)
QueueManager.register('result_queue', callable=lambda :result_queu)
QueueManager.register('res_info', callable=lambda :res_info)

manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
manager.start()

task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()
for i in range(10):
n =random.randint(0,1000)
print('put %s into the task queue' % n)
task.put(n)

print('waiting for an task to deal with the data...')

for v in range(10):
r = result.get(timeout=100)
print('the result is : %s',r)

manager.shutdown()
print('The task process has exited')

我们通过让继承BaseManager生成一个我们自己的类QueueManger,用来处理实现分布式服务器之间的网络通信。自己需要先定义各个变量,比如这里的task_queue(任务队列),然后在QueueManager中注册,就相当于是写一个接口函数,方便网络通信的双方可以正确接受数据。
之后即使通过QueueMagner实例化一个manager, 这里需要些必要的参数,ip地址,这里由于我是本机测试,所以ip地址只要是127.x.x.x都可以,但是,如果要部署到服务器上,ip地址最好为空'',这样worker端访问的ip地址就是服务器端ip地址。还有一个authkey, 是用来作为密令使用的, 防止第三方恶意破坏程序.

然后就是start, 之后通过manager获取一些数据,并进行传递。

worker.py

from multiprocessing.managers import BaseManager

class QueueManager(BaseManager):
pass

QueueManager.register('task_queue')
QueueManager.register('result_queue')
QueueManager.register('res_info')

manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
print('going to connect...')
manager.connect()
print('connected....')
task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()

for value in range(10):
value = task.get(timeout=100)
k = value * value
print('finish the tast')
result.put('the result is %s' % k)

print('son task finished')
info = True

跟上面一样,实例化出manger时候, 调用manager.connect() 就完成了连接工作
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: