celery rabbit mq 详解
Celery介绍和基本使用
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:
1)你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
2)你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福
Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,后面会讲
1.1 Celery有以下优点:
- 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
- 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
- 快速:一个单进程的celery每分钟可处理上百万个任务
- 灵活: 几乎celery的各个组件都可以被扩展及自定制
指定队列
from kombu import Queue CELERY_QUEUES = ( # 定义任务队列 Queue('default', routing_key='task.#'), # 路由键以“task.”开头的消息都进default队列 Queue('web_tasks', routing_key='web.#'), # 路由键以“web.”开头的消息都进web_tasks队列 ) CELERY_DEFAULT_EXCHANGE = 'tasks' # 默认的交换机名字为tasks CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 默认的交换类型是topic CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默认的路由键是task.default,这个路由键符合上面的default队列 CELERY_ROUTES = { 'projq.test.add': { # tasks.add的消息会进入web_tasks队列 'queue': 'web_tasks', 'routing_key': 'web.add', } }
[p]现在用指定队列的方式启动消费者进程:
celery -A projq worker -Q web_tasks -l info
上述worker只会执行web_tasks中的任务,我们可以合理安排消费者数量,让web_tasks中任务的优先级更高。
使用任务调度
之前的例子都是由发布者触发的,本节展示一下使用Celery的Beat进程自动生成任务。基于proj目录下的源码,创建一个projb目录,对projb/celeryconfig.py添加如下配置:[/p]CELERYBEAT_SCHEDULE = { 'add': { 'task': 'celery_test.add', 'schedule': timedelta(seconds=10), #每10秒执行一次 'args': (16, 16) #执行的参数是 } }
启动Beat程序:
celery beat -A projb
然后启动Worker进程:
celery -A projb worker -l info
之后可以看到每10秒都会自动执行一次tasks.add。
注:Beat和Worker进程可以一并启动:
celery -B -A projb worker -l info
使用Django可以通过django-celery实现在管理后台创建、删除、更新任务,是因为它使用了自定义的调度类djcelery.schedulers.DatabaseScheduler,我们可以参考它实现Flask或者其他Web框架的管理后台来完成同样的功能。使用自定义调度类还可以实现动态添加任务。
任务绑定、记录日志和重试
任务绑定、记录日志和重试是Celery常用的3个高级属性。现在修改proj/tasks.py文件,添加div函数用于演示:
from celery.utils.log import get_task_logger logger = get_task_logger(__name__) @app.task(bind=True) def div(self, x, y): logger.info(('Executing task id {0.id}, args: {0.args!r} ' 'kwargs: {0.kwargs!r}').format(self.request)) try: result = x / y except ZeroDivisionError as e: raise self.retry(exc=e, countdown=5, max_retries=3) return result
当使用bind = True后,函数的参数发生变化,多出了参数self(第一个参数),相当于把div变成了一个已绑定的方法,通过self可以获得任务的上下文。
在IPython中调用div:
from proj.tasks import div r = div.delay(2, 1) [2017-09-20 15:50:31,853: INFO/Worker-1] proj.tasks.div[1da82fb8-20de-4d5a-9b48-045da6db0cda]: Executing task id 1da82fb8-20de-4d5a-9b48-045da6db0cda, args: [2, 1] kwargs: {}
换成能造成异常的参数:
In : r = div.delay(2, 0)
可以发现每5秒就会重试一次,一共重试3次(默认重复3次),然后抛出异常。
再来一张很给力的图
- 利用Celery实现Django博客PV统计功能详解
- Windows下安装Rabbit MQ步骤详解
- Django中使用Celery的教程详解
- rabbit MQ的rpc功能详解
- 分布式任务队列celery用法详解
- celery 分布式框架详解
- 分布式任务队列 Celery —— 详解工作流
- Python并行分布式框架Celery详解
- 详解Oracle DELETE和TRUNCATE 的区别
- 数据泵用户详解
- 关于java基础--日期类与日历类Calendar详解测试
- ORACLE WITH CHECK OPTION子句详解
- ssm框架整合详解
- Hadoop示例程序WordCount运行及详解
- 常用字符集编码详解:ASCII 、GB2312、GBK、GB18030
- HTTP协议详解
- Redis配置详解-yellowcong
- Swift语法-where关键字详解
- 04.21 三剑客之老大awk命令详解
- Go语言 Go1.1新调度器详解