您的位置:首页 > 其它

分布式任务队列celery用法详解

2018-09-05 00:00 1306 查看
celery基础介绍:



这个图我们可以看出,celery基本结构也就是三部分
1 第一部分 broker也就是中间件消息队列,作用就是用来接收应用的请求
这一部分常见玩法可以是rabbitmq和redis等
2 第二部分 worker 也就是工作队列 也就是celery本身的任务队列服务,一般情况下大型的生产应用我们会结合supervisor来管理这么多的worker
3 第三部分 result 存储,就是把执行的结果,状态等信息进行存储,常规用法我们可以用rabbitmq redis,mysql,mongodb等来做
环境部署:
1pip install celery
2 安装 rabbitmq 这个我得博客有篇文章做了详细讲解
3 安装redis ,源码安装很简单,不做介绍

首先做一个基础的例子体验什么是celery
[root@localhost www]# cat tasks.py
#!/usr/bin/python
#coding:utf-8
from celery import Celery
app = Celery('tasks', broker='amqp://',backend='redis://')
#app.config_from_object('celeryconfig')

@app.task
def add(x, y):
return x + y
broker是接受的消息队列的地址我这里用的rabbitmq的地址
backend是后端的存储我这里用的是redis

启动task
celery -A tasks worker --loglevel=info
然后我们新开一个终端进入python命令行去调用task
#python
#from tasks import add
#add.delay(2,4)







可以看出我们每次在python终端调用add这个任务 celery的worker 信息里面就会收到操作并记录信息 同时redis里面记录相应的状态

======================================================

celery与tasks分离



[root@localhost test]# cat celery.py
#!/usr/bin/python
#coding:utf-8

from future import absolute_import ,unicode_literals
from celery import Celery

app = Celery(
'test',
broker='amqp://',
backend='redis://',
include=['test.tasks']
)

app.conf.update(
result_expires=3500,
)

if name == 'main':
app.start()

[root@localhost test]# cat tasks.py
#!/usr/bin/python
#coding:utf-8

from future import absolute_import ,unicode_literals

from test.celery import app

@app.task
def add(x,y):
return x + y[url=mailto:br/>@app.task
@app.task
return x * y

后台启动 celery
celery multi start w1 -A proj -l info --logfile=/var/log/celery.log

celery有一堆的配置参数来控制每一个task这里不做解说详情见官网

重点是思路:
1 以前批量执行paramiko脚本的时候,一旦执行的机器多了,后来优化成多线程用threading来做,看到这个异步的任务队列,我们可以把机器按队列进行分组,进行路由限制数量,流量控制等等来对大批量的task任务进行分批量并发操作
2 还可以结合python框架web界面 applicate的过程,celery的worker的过程状态和后端的结果状态都集成到一个页面,实时监测,界面化操作
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  celery任务队列