您的位置:首页 > 其它

celery rabbit mq 详解

2017-10-30 14:39 351 查看

Celery介绍和基本使用

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

1)你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。
2)你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福

Celery 在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果, 一般使用rabbitMQ or Redis,后面会讲

1.1 Celery有以下优点:

  • 简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的
  • 高可用:当任务执行失败或执行过程中发生连接中断,celery 会自动尝试重新执行任务
  • 快速:一个单进程的celery每分钟可处理上百万个任务
  • 灵活: 几乎celery的各个组件都可以被扩展及自定制

指定队列

from kombu import Queue

CELERY_QUEUES = ( # 定义任务队列

Queue('default', routing_key='task.#'), # 路由键以“task.”开头的消息都进default队列

Queue('web_tasks', routing_key='web.#'), # 路由键以“web.”开头的消息都进web_tasks队列

)

CELERY_DEFAULT_EXCHANGE = 'tasks' # 默认的交换机名字为tasks

CELERY_DEFAULT_EXCHANGE_TYPE = 'topic' # 默认的交换类型是topic
CELERY_DEFAULT_ROUTING_KEY = 'task.default' # 默认的路由键是task.default,这个路由键符合上面的default队列
CELERY_ROUTES = {
'projq.test.add': { # tasks.add的消息会进入web_tasks队列
'queue': 'web_tasks',
'routing_key': 'web.add',
}
}

[p]现在用指定队列的方式启动消费者进程:

celery -A projq worker -Q web_tasks -l info

上述worker只会执行web_tasks中的任务,我们可以合理安排消费者数量,让web_tasks中任务的优先级更高。

使用任务调度

之前的例子都是由发布者触发的,本节展示一下使用Celery的Beat进程自动生成任务。基于proj目录下的源码,创建一个projb目录,对projb/celeryconfig.py添加如下配置:[/p]
CELERYBEAT_SCHEDULE = {
'add': {
'task': 'celery_test.add',
'schedule': timedelta(seconds=10), #每10秒执行一次
'args': (16, 16) #执行的参数是
}
}

启动Beat程序:

celery beat -A projb

然后启动Worker进程:

celery -A projb worker -l info

之后可以看到每10秒都会自动执行一次tasks.add。

注:Beat和Worker进程可以一并启动:

celery -B -A projb worker -l info

使用Django可以通过django-celery实现在管理后台创建、删除、更新任务,是因为它使用了自定义的调度类djcelery.schedulers.DatabaseScheduler,我们可以参考它实现Flask或者其他Web框架的管理后台来完成同样的功能。使用自定义调度类还可以实现动态添加任务。

任务绑定、记录日志和重试

任务绑定、记录日志和重试是Celery常用的3个高级属性。现在修改proj/tasks.py文件,添加div函数用于演示:

from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(bind=True)
def div(self, x, y):
logger.info(('Executing task id {0.id}, args: {0.args!r} '
'kwargs: {0.kwargs!r}').format(self.request))
try:
result = x / y
except ZeroDivisionError as e:
raise self.retry(exc=e, countdown=5, max_retries=3)
return result

当使用bind = True后,函数的参数发生变化,多出了参数self(第一个参数),相当于把div变成了一个已绑定的方法,通过self可以获得任务的上下文。

在IPython中调用div:

from proj.tasks import div
r = div.delay(2, 1)
[2017-09-20 15:50:31,853: INFO/Worker-1] proj.tasks.div[1da82fb8-20de-4d5a-9b48-045da6db0cda]: Executing task id 1da82fb8-20de-4d5a-9b48-045da6db0cda, args: [2, 1] kwargs: {}

换成能造成异常的参数:

In : r = div.delay(2, 0)

可以发现每5秒就会重试一次,一共重试3次(默认重复3次),然后抛出异常。

再来一张很给力的图 

 

 

 

 

 

 

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: