Celery分布式任务队列快速入门
2018-01-26 15:55
399 查看
一 Celery介绍和基本使用 |
需求场景
1. 对100台命令执行一条批量命令,命令执行需要很长时间,但是不想让主程序等着结果返回,而是给主程序返回一个任务ID,task_id
主程序过一段时间根据task_id,获取执行结果即可,再命令执行期间,主程序 可以继续做其他事情
2. 定时任务,比如每天检测一下所有的客户资料,发现是客户的生日,发个祝福短信
解决方案
1. 逻辑view 中启一个进程
父进程结束,子进程跟着结束,子进程任务没有完成,不符合需求
父进程结束,等着子进程结束,父进程需等着结果返回,不符合需求
小结:该方案解决不了阻塞问题,即需要等待
2. 启动 subprocess,任务托管给操作系统执行
实现task_id,实现异步,解决阻塞
小结:大批量高并发,主服务器会出现问题,解决不了并发
3. celery
celery提供多子节点,解决并发问题
celery介绍
celery是一个基于python开发的分布式异步消息队列,轻松实现任务的异步处理
celery在执行任务时需要一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用RabbitMQ 或 Redis
celery优点
简单:熟悉celery的工作流程后,配置使用简单
高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
快速:一个单进程的celery每分钟可处理上百万个任务
灵活:几乎celery的各个组件都可以被扩展及自定制
celery基本工作流程
其中中间队列用于分配任务以及存储执行结果
celery安装及使用
1. 安装python模块
pip3 install celery pip3 install redis
2. 安装redis服务
wget http://download.redis.io/releases/redis-3.2.8.tar.gz tar -zxvf redis-3.2.8.tar.gz cd redis-3.2.8 make src/redis-server # 启动redis 服务
3. 创建一个celery application 用来定义任务列表
创建一个任务 tasks.py
from celery import Celery app = Celery('TASK', broker='redis://localhost', backend='redis://localhost') @app.task def add(x,y): print("running...",x,y) return x+y
4. 启动celery worker 来开始监听并执行任务
celery -A tasks worker --loglevel=info
tasks 任务文件名,worker 任务角色,--loglevel=info 任务日志级别
5. 调用任务
打开另外终端,进入命令行模式,调用任务
6. celery常用接口
tasks.add(4,6) ---> 本地执行
tasks.add.delay(3,4) --> worker执行
t=tasks.add.delay(3,4) --> t.get() 获取结果,或卡住,阻塞
t.ready()---> False:未执行完,True:已执行完
t.get(propagate=False)
抛出简单异常,但程序不会停止
t.traceback 追踪完整异常
补充:如何使用第三方工具
1. 导入第三方包,如 from celery import Celery
2. 实例化第三方类,如 app = Celery(......)
3. 实例化的对象去关联执行任务的方法,如 @app.task
4. 分区角色 worker 执行任务,broker分配任务
二 项目中使用Celery |
1. 项目目录结构
project |-- __init__.py |-- celery.py # 配置文档 |-- tasks.py # 任务函数 |-- tasks2.py # 任务函数
2. 项目文件
project/celery.py
# from celery import Celery 默认当前路径,更改为绝对路径(当前路径有个celery.py文件啦) from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('project', broker='redis://localhost', backend='redis://localhost', include=['project.tasks','project.tasks2']) # 配置文件和任务文件分开了,可以写多个任务文件 # app 扩展配置 app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
celery.py作用相当于配置文件
project/tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y
project/tasks.py
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def hello(): return 'Hello World'
3. 启动项目worker
celery -A project worker -l info
其中 project 为项目名
另启终端,与project同目录进入python3
celery multi start w1 -A project -l info celery multi start w2 -A project -l info celery multi start w3 -A project -l info celery multi restart w1 -A project -l info celery multi stop w1 w2 w3 # 任务立刻停止 celery multi stopwait w1 w2 w3 # 任务执行完,停止
脚本celery.py
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('project', broker='redis://localhost', backend='redis://localhost', include=['project.periodic_task',]) app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
脚本periodic_task.py
from __future__ import absolute_import, unicode_literals from .celery import app from celery.schedules import crontab @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # 每10s调用 test('hello') sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # 每20s调用 test('world') sender.add_periodic_task(20.0, test.s('world'), expires=10) # 每周一早上7:30 执行 test('Happy Mondays!') sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), # 可灵活修改 test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
也可以在配置文件celery.py 里添加定时任务
app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'project.tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC'
每周1的早上7.30执行project.tasks.add任务
LearnCelery |-- app1 |-- tasks.py |-- models.py |-- app2 |-- tasks.py |-- models.py |-- LearnCelery |-- __init__.py |-- celery.py |-- settings.py
2. 脚本代码
LearnCelery/app/tasks.py # 必须叫这个名字
from __future__ import absolute_import, unicode_literals from celery import shared_task import time # 所有的app都可以调用 @shared_task def add(x, y): time.sleep(10) return x + y @shared_task def mul(x, y): time.sleep(10) return x * y
LearnCelery/LearnCelery/__init__.py
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
LearnCelery/LearnCelery/celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery # 单独脚本调用Django内容时,需配置脚本的环境变量 os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings') app = Celery('mysite') # CELERY_ 作为前缀,在settings中写配置 app.config_from_object('django.conf:settings', namespace='CELERY') # 到Django各个app下,自动发现tasks.py 任务脚本 app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
LearnCelery/LearnCelery/settings.py
# For celery CELERY_BROKER_URL = 'redis://localhost' CELERY_RESULT_BACKEND = 'redis://localhost'
3. 启动celery
celery -A LearnCelery worker -l debug
五 Django中使用计划任务 |
python manage.py migrate
4. 启动 celery beat
celery -A LearnCelery beat -l info -S django
定时任务存到数据库里,启动beat定时取任务放到队列里执行
5. admin管理
相关文章推荐
- python Celery 分布式任务队列快速入门
- Celery 分布式任务队列快速入门
- 分布式任务队列Celery入门与进阶
- 分布式任务队列与任务调度系统Celery入门
- Celery 分布式任务队列入门
- Celery:分布式任务队列 简单上手
- 异步任务神器 Celery 快速入门教程
- Celery ---- 分布式队列神器 ---- 入门
- Python开发【模块】:Celery 分布式异步消息任务队列
- Celery - 分布式任务队列
- 分布式任务队列Celery
- 快速入门分布式消息队列之 RabbitMQ(上)
- 分布式队列Celery入门
- 快速入门分布式消息队列之 RabbitMQ(1)
- 异步任务神器 Celery 快速入门教程
- 快速入门分布式消息队列之 RabbitMQ(下)
- 分布式任务队列 Celery —— 深入 Task
- Celery - 分布式任务队列
- 【Python】分布式任务队列Celery使用参考资料
- 【转】分布式异步任务队列 Celery + rabbitmq (or redis )