python日常笔记3--关于分布式任务的分配
2017-10-30 17:40
344 查看
分布式任务分配,目前流行的就是master-worker型,也就是有个master进程(或线程)来分配任务,其余的worker进程(或线程)来处理任务。我们一般都是采取多进程分布式处理任务,因为多进程比多线程要更加的稳定(起码在python中是这样)。
首先写master.py
import queue, random, time
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
task_queue = queue.Queue()
result_queu = queue.Queue()
res_info = False
QueueManager.register('task_queue', callable=lambda :task_queue)
QueueManager.register('result_queue', callable=lambda :result_queu)
QueueManager.register('res_info', callable=lambda :res_info)
manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
manager.start()
task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()
for i in range(10):
n =random.randint(0,1000)
print('put %s into the task queue' % n)
task.put(n)
print('waiting for an task to deal with the data...')
for v in range(10):
r = result.get(timeout=100)
print('the result is : %s',r)
manager.shutdown()
print('The task process has exited')
我们通过让继承BaseManager生成一个我们自己的类QueueManger,用来处理实现分布式服务器之间的网络通信。自己需要先定义各个变量,比如这里的task_queue(任务队列),然后在QueueManager中注册,就相当于是写一个接口函数,方便网络通信的双方可以正确接受数据。
之后即使通过QueueMagner实例化一个manager, 这里需要些必要的参数,ip地址,这里由于我是本机测试,所以ip地址只要是127.x.x.x都可以,但是,如果要部署到服务器上,ip地址最好为空'',这样worker端访问的ip地址就是服务器端ip地址。还有一个authkey, 是用来作为密令使用的, 防止第三方恶意破坏程序.
然后就是start, 之后通过manager获取一些数据,并进行传递。
worker.py
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('task_queue')
QueueManager.register('result_queue')
QueueManager.register('res_info')
manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
print('going to connect...')
manager.connect()
print('connected....')
task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()
for value in range(10):
value = task.get(timeout=100)
k = value * value
print('finish the tast')
result.put('the result is %s' % k)
print('son task finished')
info = True
跟上面一样,实例化出manger时候, 调用manager.connect() 就完成了连接工作
首先写master.py
import queue, random, time
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
task_queue = queue.Queue()
result_queu = queue.Queue()
res_info = False
QueueManager.register('task_queue', callable=lambda :task_queue)
QueueManager.register('result_queue', callable=lambda :result_queu)
QueueManager.register('res_info', callable=lambda :res_info)
manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
manager.start()
task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()
for i in range(10):
n =random.randint(0,1000)
print('put %s into the task queue' % n)
task.put(n)
print('waiting for an task to deal with the data...')
for v in range(10):
r = result.get(timeout=100)
print('the result is : %s',r)
manager.shutdown()
print('The task process has exited')
我们通过让继承BaseManager生成一个我们自己的类QueueManger,用来处理实现分布式服务器之间的网络通信。自己需要先定义各个变量,比如这里的task_queue(任务队列),然后在QueueManager中注册,就相当于是写一个接口函数,方便网络通信的双方可以正确接受数据。
之后即使通过QueueMagner实例化一个manager, 这里需要些必要的参数,ip地址,这里由于我是本机测试,所以ip地址只要是127.x.x.x都可以,但是,如果要部署到服务器上,ip地址最好为空'',这样worker端访问的ip地址就是服务器端ip地址。还有一个authkey, 是用来作为密令使用的, 防止第三方恶意破坏程序.
然后就是start, 之后通过manager获取一些数据,并进行传递。
worker.py
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('task_queue')
QueueManager.register('result_queue')
QueueManager.register('res_info')
manager = QueueManager(address=('127.23.122.11', 5000), authkey=b'abc')
print('going to connect...')
manager.connect()
print('connected....')
task = manager.task_queue()
result = manager.result_queue()
info = manager.res_info()
for value in range(10):
value = task.get(timeout=100)
k = value * value
print('finish the tast')
result.put('the result is %s' % k)
print('son task finished')
info = True
跟上面一样,实例化出manger时候, 调用manager.connect() 就完成了连接工作
相关文章推荐
- Python学习笔记(二)关于Python的IDE选择
- Go学习笔记:关于Java、Python、Go编程思想的不同
- 分布式系统笔记:利用zookeeper实现分布式任务锁(Java)
- 关于python的一些笔记 2018-03-01 00:04:54
- 一些关于Python的基础知识 - 千月的python linux 系统管理指南学习笔记(3)
- Activiti 学习笔记12:分配个人任务
- 【python学习笔记】关于sys.path与pythonpath
- python学习笔记 --- 关于openers和handlers
- 【小甲鱼Python入门笔记】一个文件任务
- Python分布式爬虫前菜(2):关于提取网页源码中特定信息的技巧
- 【实战\聚焦Python分布式爬虫必学框架Scrapy 打造搜索引擎项目笔记】第4章 scrapy爬取知名技术文章网站(2)
- 使用python的分布式任务队列huey实现任务的异步化
- 【实战\聚焦Python分布式爬虫必学框架Scrapy 打造搜索引擎项目笔记】第4章 scrapy爬取知名技术文章网站(1)
- 【Python 学习笔记】关于 Iterator
- Django后台开发笔记①:关于DjangoUeditor与python3存在兼容性问题的解决方案(即解决图片无法上传问题)
- Python学习笔记--关于split的分割问题
- 【Python学习笔记】关于if __name__ == '__main__'
- 数人云|关于分布式任务调度平台,数人云的经验都在这里了
- 关于Python+Opencv实现人脸检测的实验笔记(调用摄像头篇)
- 分配任务,汇报进度,以及日常会议事项