您的位置:首页 > 移动开发

Celery用户指引--------Application

2016-05-24 23:39 357 查看
ApplicationCelery库在使用前必须先实例化, 这个实例被称为application(或者简称为'app')。application是线程安全的,所以多个配置不同,组件和任务不同的实例可以同时存在于同一个进程空间。下面让我们创建一个实例:
>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>
最后一行是application的文本化表示,包括celery class的类名称(Celery),当前main模块的名称(__main__),和对象的内存地址。Main Name最重要的是主模块的名称,让我们看一下为什么很重要。当你在Celery中发送一个任务的时候,这个任务中不会包含任何代码,只有将要执行的任务名称。工作原理类似网络中的域名:每个worker都含有任务名到实际执行函数的映射,被称作任务注册。任何时候你注册了一个任务,这个任务都会被添加到本地注册中。
>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>
在这里,你又看到了__main__,当Celery无法检测到函数属于哪个模块时,它就会使用main模块名来生成任务的名称。在一些有限的情况下,这样做会有些问题:1.如果定义任务的模块被当作程序运行。2.应用程序在python shell(REPL)中创建。下面的例子中,任务所在的模块也通过app.worker_main() 来启动一个worker。tasks.py:
from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
app.worker_main()
当模块执行的时候,任务将会被命名为以"__main__"开头,但是当模块被其它进程导入时(比如要调用一个任务),这个任务将会被合名为以"tasks"开始(这个模块的真正名称):
>>> from tasks import add
>>> add.name
tasks.add
你也可以为主模块指定一个名字:
>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add
更多的信息请查看:Names配置有一些你可以设置的选项来改变Celery的运行。这些选项可以直接通过app实例来设置,或者通过专门的配置模块来进行设置。配置可以通过app.conf来获取 
>>> app.conf.timezone
'Europe/London'
也可以直接进行设置
>>> app.conf.enable_utc = True
也可以通过update方法来一次更新多个键值:
>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)
配置对象可由多个字典构成,它们之间起作用的顺序如下:1.在运行时配置2.任何配置模块3.默认配置(celery.app.defaults)你甚至可以通过
app.add_defaults()
 方法添加新的默认源。要查看完整的配置列表和它们的默认配置,请查看:Configurationreference config_from_object
app.config_from_object()方法从一个配置对象中加载配置
配置对象可以是一个模块或者任何含有配置属性的对象。
注意,任何先前设置的配置在调用config_from_object后都会被重新设置。如果你想添加额外的配置,你需要在调用这个方法之后。
举例1:使用模块和名称
app.config_from_object方法可以使用合法的python模块名,甚至还可以使用一个python属性,比如:"celeryconfig","myproj.config.celery"或者"myproj.config:CeleryConfig":
from celery import Celeryapp = Celery()app.config_from_object('celeryconfig')
celeryconfig模块可以如下所示:
celeryconfig.py
:
enable_utc = Truetimezone = 'Europe/London'
只要可以import celerconfig,app就可以使用它。
举例2:传递一个实际的模块对象
你也可以传递一个已经import导入的模块对象,但是这种做法是不推荐使用的。
说明:
推荐使用模块的名称,因为这意味着当使用prefork池时这个模块不必被序列化。如果你遇到了配置问题或者pickle错误,请使用模块名称代替使用导入的模块。
import celeryconfigfrom celery import Celeryapp = Celery()app.config_from_object(celeryconfig)
举例3:使用一个配置类或对象
from celery import Celeryapp = Celery()class Config:enable_utc = Truetimezone = 'Europe/London'app.config_from_object(Config)# or using the fully qualified name of the object:#   app.config_from_object('module:Config')
config_from_envvar
app.config_from_envvar()从环境变量中获取配置模块的名称。
比如---从环境变量“CELERY_CONFIG_MODULE”指定的模块名中加载配置。
import osfrom celery import Celery#: Set default configuration module nameos.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')app = Celery()app.config_from_envvar('CELERY_CONFIG_MODULE')
你也可以在命令行中指定模块名使用的环境变量
$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l info
检查配置如果你想通过打印配置来进行调试或者其它类似目的,你可能还希望过滤一些类似API密钥和密码的敏感信息。Celery提供了一些有用的工具来呈现配置信息,其中一个是
humanize()
:
>>> app.conf.humanize(with_defaults=False, censored=True)
这个方法表格的形式返回当前的配置。这里面将只会包含修改了默认配置的选项,但是你也可以通过with_defaults关键字参数来返回包含默认值的配置。如果你希望配置以字典的形式返回,你可以使用table方法:
table()
 method:
>>> app.conf.table(with_defaults=False, censored=True)
你需要注意Celery不能删除所有的敏感信息,它仅仅是使用一个正则表达式来查找常见的关键字。如果你自定义的配置中包含敏感信息,你需要使用Celery能够识别的敏感关键字作为键值。配置中包含下面这些字串,将会被认为是敏感的:
API
TOKEN
KEY
SECRET
PASS
SIGNATURE
DATABASE
惰性机制application实例是惰性的,这意味着只有在需要时才会对其求值。创建一个Celery实例只会做以下几件事:1.创建一个逻辑时钟,用于事件2.创建任务注册表3.将其设为当前app(除非set_as_current设为disabled)4.调用app.on_init()回调(默认情况下什么也不做)app.task装饰器在任务定义时并不会真正创建任务,只有当任务被使用或者application完成时才会创建这个任务。下面的例子说明了在使用任务或者访问任务的属性(这里是repr())之前任务不会被创建:
>>> @app.task>>> def add(x, y):...    return x + y>>> type(add)<class 'celery.local.PromiseProxy'>>>> add.__evaluated__()False>>> add        # <-- causes repr(add) to happen<@task: __main__.add>>>> add.__evaluated__()True
当调用app.finalize()或者访问app.tasks属性时,会触发app的完成。完成app对象将会发生以下几点:1.对必须在app之间共享的任务进行拷贝默认情况下任务在app之前是共享的,但是如果task装饰器里的shared关键字参数被禁用,这个task将会成为所绑定的app的私有任务。2.对所有挂起的task装饰器求值。3.确保所有的任务都绑定到当前的app由于任务被绑定到一个app,所以任务可以从配置中读取默认值。关于"default app":并非Celery的所有版本都有一个application,曾经的版本使用基于模块的API,为了向后兼容,老的API仍然存在。这种情况延续到了Celery5.0Celery总会创建一个特殊的app,被称为"default app"。如果没有其它指定的app实例那么将会使用它。celery.task模块是为了适应老的API,如果你使用一个指定的app则不应该使用它。你应该总是使用app实例的方法,而不应该使用基于模块的API.比如,老的Task基类允许许多兼容性的特性,而这些特性可能与新的特性不兼容,比如task方法:
from celery.task import Task   # << OLD Task base class.from celery import Task        # << NEW base class.
即使你使用老的模块API,我们也建议你使用新的基类。断链尽管我们可能需要依懒当前设置的app实例,最好的办法是在任何我们需要的地方传递它。我称这为“app链”,因为它根据正在传递的app实例创建了一个实例链。下面的例子是一种不好的做法:
from celery import current_appclass Scheduler(object):def run(self):app = current_app
它应该将app作为一个参数:
class Scheduler(object):def __init__(self, app):self.app = app
在Celery内部,它使用celery.app.app_or_default()函数,所以我们也可以通过模块兼容的API来实例我们的功能:
from celery.app import app_or_defaultclass Scheduler(object):def __init__(self, app=None):self.app = app_or_default(app)
在开发环境中,你可以设置CELERY_TRACE_APP环境变量,这样当app链中断时会抛出一个异常:
$ CELERY_TRACE_APP=1 celery worker -l info
API的演化Celery自从创建7年来发生发许多变化。比如,最开始使用回调函数来实现task:
def hello(to):return 'hello {0}'.format(to)>>> from celery.execute import apply_async>>> apply_async(hello, ('world!',))
或者,你可以通过设置一些选项来创建一个task类,或者覆盖类的行为:
from celery.task import Taskfrom celery.registry import tasksclass Hello(Task):send_error_emails = Truedef run(self, to):return 'hello {0}'.format(to)tasks.register(Hello)>>> Hello.delay('world!')
后来,Celery团队认为任意传递可调用的对象是一种“反模式”,因为这使得使用pickle以外的序列化工具十分困难,所以这个特性在2.0中移除了。取而代之的是使用装饰器:
from celery.task import task@task(send_error_emails=True)def hello(x):return 'hello {0}'.format(to)
抽象Tasks所有用task()装饰器装饰的task将会继承自application的task基类。你可以通过base关键字参数来指定一个其它基类:
@app.task(base=OtherTask):def add(x, y):return x + y
要自定义一个task类,你应该继承celery.Task这个中立的基类:
from celery import Taskclass DebugTask(Task):def __call__(self, *args, **kwargs):print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))return super(DebugTask, self).__call__(*args, **kwargs)
注意:如果你覆盖了__call__方法,调用super的__call__是十分重要的,因为这会设置一些直接调用task()时所必需的信息。这个中立的基类比较特殊,因为它不会绑定到任何特定的app实例。一旦一个task绑定到某个实例,它就会读取配置来设置默认值和其它信息。为了使用一个基类,你需要使用app.task()装饰器:
@app.task(base=DebugTask)def add(x, y):return x + y
你甚至可以通过app.Task()属性来改变一个实例的默认基类。
>>> from celery import Celery, Task>>> app = Celery()>>> class MyBaseTask(Task):...    send_error_emails = True>>> app.Task = MyBaseTask>>> app.Task<unbound MyBaseTask>>>> @app.task... def add(x, y):...     return x + y>>> add<@task: __main__.add>>>> add.__class__.mro()[<class add of <Celery __main__:0x1012b4410>>,<unbound MyBaseTask>,<unbound Task>,<type 'object'>]
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息