您的位置:首页 > 其它

在tornado中使用celery实现异步任务处理之中的一个

2016-01-14 20:11 791 查看


一、简单介绍

tornado-celery是用于Tornado web框架的非堵塞 celeryclient。

通过tornado-celery能够将耗时任务增加到任务队列中处理,

在celery中创建任务。tornado中就能够像调用AsyncHttpClient一样调用这些任务。



Celery中两个主要的概念:Broker、Backend

Broker : 事实上就是一開始说的 消息队列 ,用来发送和接受消息。

Broker有几个方案可供选择:RabbitMQ,Redis。数据库等

Backend: 作用是保存结果和状态,能够是Database backend。也能够是Cache backend

Backend有几个方案可供选择:
http://docs.celeryproject.org/en/latest/configuration.html#celery-result-backend


二、tornado结合Redis实现异步任务处理


2.1 安装环境


1. 安装tornado

见文章《CentOS6.4安装python2.7.3环境和Tornado》


2. 安装 tornoda-celery

tornado-celery的安装非常easy:

$ pip install tornado-celery

Downloading/unpacking tornado-celery

Downloading tornado-celery-0.3.4.tar.gz

Running setup.py egg_info for package tornado-celery

...

Successfully installed tornado-celery celery pika pytz billiard kombu anyjson amqp

Cleaning up...


3. 安装 celery

$ pip install celery

Requirement already satisfied (use --upgrade to upgrade): celery in /usr/local/python2.7.3/lib/python2.7/site-packages

Requirement already satisfied (use --upgrade to upgrade): pytz>dev in /usr/local/python2.7.3/lib/python2.7/site-packages (from celery)

Requirement already satisfied (use --upgrade to upgrade): billiard>=3.3.0.19,<3.4 in /usr/local/python2.7.3/lib/python2.7/site-packages (from celery)

Requirement already satisfied (use --upgrade to upgrade): kombu>=3.0.24,<3.1 in /usr/local/python2.7.3/lib/python2.7/site-packages (from celery)

Requirement already satisfied (use --upgrade to upgrade): anyjson>=0.3.3 in /usr/local/python2.7.3/lib/python2.7/site-packages (from kombu>=3.0.24,<3.1->celery)

Requirement already satisfied (use --upgrade to upgrade): amqp>=1.4.6,<2.0 in /usr/local/python2.7.3/lib/python2.7/site-packages (from kombu>=3.0.24,<3.1->celery)

Cleaning up...


4. 安装 tornado-redis

$ pip install tornado-redis

Downloading/unpacking tornado-redis

Downloading tornado-redis-2.4.18.tar.gz

Running setup.py egg_info for package tornado-redis

Installing collected packages: tornado-redis

Running setup.py install for tornado-redis

Successfully installed tornado-redis

Cleaning up...


5. 安装redis

$ yum install redis



redis作为NoSQL数据库的一种应用。响应速度和命中率上还是比較高效的。

项目中须要用集中式可横向扩展的缓存框架。做了一点调研。

即便redis、memcached存在效率上的差异(详细比較參考http://timyang.net/data/mcdb-tt-redis/)。

但事实上都能满足眼下项目的需求。可是redis还是比較风骚的。支持链表和集合操作。支持正則表達式查找key,

眼下项目缓存的结果大多是链表,假设链表新增或者改动数据的话,

redis就体现出了极大的优势(memcached仅仅能又一次载入链表。redis能够对链表新增或者改动)

1). 下载redis

下载地址 http://code.google.com/p/redis/downloads/list
2). 安装redis

下载后解压

$ tar zxvf redis-2.6.14.tar.gz

到随意文件夹,比如/usr/local/redis-2.6.14

解压后,进入redis文件夹

$ cd /usr/local/redis-2.6.14

$ make

复制文件

#这个文件是redis启动的配置文件

$ cp redis.conf /etc/

#以下的非常实用,这样就不用再运行时加上./了。并且能够在不论什么地方运行

$ cp ./src/redis-benchmark ./src/redis-cli ./src/redis-server /usr/bin/

设置内存分配策略(可选,依据server的实际情况进行设置)

$ echo 1 > /proc/sys/vm/overcommit_memory

可选值:0、1、2。

0。 表示内核将检查是否有足够的可用内存供应用进程使用。

假设有足够的可用内存。内存申请同意。否则,内存申请失败,并把错误返回给应用进程。

1, 表示内核同意分配全部的物理内存,而无论当前的内存状态怎样。

2, 表示内核同意分配超过全部物理内存和交换空间总和的内存

或改动/etc/sysctl.conf 加入例如以下选项后就不会内存持续添加

vm.dirty_ratio = 1

vm.dirty_background_ratio=1

vm.dirty_writeback_centisecs=2

vm.dirty_expire_centisecs=3

vm.drop_caches=3

vm.swappiness =100

vm.vfs_cache_pressure=163

vm.overcommit_memory=1

vm.lowmem_reserve_ratio=32 32 8

kern.maxvnodes=3

$ sysctl -p

值得注意的一点是,redis在dump数据的时候,会fork出一个子进程,理论上child进程所占用的内存和parent是一样的,

比方parent占用的内存为8G,这个时候也要相同分配8G的内存给child,假设内存无法负担。

往往会造成redisserver的down机或者IO负载过高,效率下降。

所以这里比較优化的内存分配策略应该设置为 1(表示内核同意分配全部的物理内存。而无论当前的内存状态怎样)

开启redisport,改动防火墙配置文件

$ vi /etc/sysconfig/iptables

...

# 增加port配置

# For redis

-A INPUT -m state --state NEW -m tcp -p tcp --dport 6379 -j ACCEPT

...

COMMIT

【保存并退出】

又一次载入规则

service iptables restart

3). 启动redis服务

[root@Architect redis-2.6.14]# pwd

/usr/local/redis-2.6.14

[root@Architect redis-2.6.14]# redis-server /etc/redis.conf &

redis-server /etc/redis.conf

_._

_.-``__ ''-._

_.-`` `. `_. ''-._ Redis 2.6.14 (00000000/0) 64 bit

.-`` .-```. ```\/ _.,_ ''-._

( ' , .-` | `, ) Running in stand alone mode

|`-._`-...-` __...-.``-._|'` _.-'| Port: 6379

| `-._ `._ / _.-' | PID: 5897

`-._ `-._ `-./ _.-' _.-'

|`-._`-._ `-.__.-' _.-'_.-'|

| `-._`-._ _.-'_.-' | http://redis.io
`-._ `-._`-.__.-'_.-' _.-'

|`-._`-._ `-.__.-' _.-'_.-'|

| `-._`-._ _.-'_.-' |

`-._ `-._`-.__.-'_.-' _.-'

`-._ `-.__.-' _.-'

`-._ _.-'

`-.__.-'

[5897] 10 Dec 17:00:42.661 # Server started, Redis version 2.6.14

[5897] 10 Dec 17:00:42.661 * The server is now ready to accept connections on port 6379

查看进程,确认redis已经启动

[root@Architect redis-2.6.14]# ps -ef | grep redis

root 401 29222 0 18:06 pts/3 00:00:00 grep redis

root 29258 1 0 16:23 ? 00:00:00 redis-server /etc/redis.conf



$ redis-cli ping

PONG

假设这里启动redis服务失败,普通情况下是由于redis.conf文件有问题,

建议检查或找个可用的配置文件进行覆盖,避免少走弯路。这里建议。

改动/etc/redis.conf,设置redis进程为后台守护进程

# By default Redis does not run as a daemon. Use 'yes' if you need it.

# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.

daemonize yes

4). 測试redis,启动实例

[root@Architect redis-2.6.14]# redis-cli

redis> set name songbin

OK

redis> get name

"songbin"

5). 关闭redis服务

redis-cli shutdown

redis服务关闭后,缓存数据会自己主动dump到硬盘上,硬盘地址为redis.conf中的配置项dbfilename

dump.rdb所设定强制备份数据到磁盘,使用例如以下命令

$ redis-cli save 或者

$ redis-cli -p 6380 save(指定port)


2.2 演示样例程序


1. python安装redis环境

$ pip install redis


2. 编辑 tasks.py

在当前文件夹下编辑

# vim tasks.py

#!/usr/bin/env python

# File: task.py

#

from time import sleep

from celery import Celery

backend = 'redis://127.0.0.1:6379/0'

broker = 'redis://127.0.0.1:6379/1'

app = Celery('tasks', backend=backend, broker=broker)

@app.task

def add(x, y):

sleep(10)

return x + y


3. 执行celelry worker

$ celery -A tasks worker --loglevel=info

Running a worker with superuser privileges when the

worker accepts messages serialized with pickle is a very bad idea!

If you really want to continue then you have to set the C_FORCE_ROOT

environment variable (but please think about this before you do).

User information: uid=0 euid=0 gid=0 egid=0

出现这种信息是表示redis的服务启动失败。处理办法:

[root]$ export C_FORCE_ROOT="true"

[root ]$ celery -A tasks worker --loglevel=debug

celery -A tasks worker --loglevel=info

/usr/local/python2.7.3/lib/python2.7/site-packages/celery/platforms.py:766: RuntimeWarning:

You are running the worker with superuser privileges, which is

absolutely not recommended!

Please specify a different user using the -u option.

User information: uid=0 euid=0 gid=0 egid=0

uid=uid, euid=euid, gid=gid, egid=egid,

[2014-12-10 23:02:26,993: WARNING/MainProcess] /usr/local/python2.7.3/lib/python2.7/site-packages/celery/apps/worker.py:161:

CDeprecationWarning:

Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers

the ability to execute any command. It's important to secure

your broker from unauthorized access when using pickle, so we think

that enabling pickle should require a deliberate action and not be

the default choice.

If you depend on pickle then you should set a setting to disable this

warning and to be sure that everything will continue working

when you upgrade to Celery 3.2::

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

-------------- celery@ltv_13 v3.1.17 (Cipater)

---- **** -----

--- * *** * -- Linux-2.6.18-194.el5-x86_64-with-glibc2.3

-- * - **** ---

- ** ---------- [config]

- ** ---------- .> app: tasks:0x15ea8250

- ** ---------- .> transport: redis://127.0.0.1:6379/1

- ** ---------- .> results: redis://127.0.0.1:6379/0

- *** --- * --- .> concurrency: 8 (prefork)

-- ******* ----

--- ***** ----- [queues]

-------------- .> celery exchange=celery(direct) key=celery

[tasks]

. tasks.add

[2014-12-10 23:02:27,516: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1

[2014-12-10 23:02:27,524: INFO/MainProcess] mingle: searching for neighbors

[2014-12-10 23:02:29,074: INFO/MainProcess] mingle: all alone

[2014-12-10 23:02:29,080: WARNING/MainProcess] celery@ltv_13 ready.

这就表示启动成功了

假设出以下的提示:

[2014-12-11 16:04:08,223: WARNING/MainProcess] /usr/local/python2.7.3/lib/python2.7/site-packages/celery/apps/worker.py:161:

CDeprecationWarning:

Starting from version 3.2 Celery will refuse to accept pickle by default.

The pickle serializer is a security concern as it may give attackers

the ability to execute any command. It's important to secure

your broker from unauthorized access when using pickle, so we think

that enabling pickle should require a deliberate action and not be

the default choice.

If you depend on pickle then you should set a setting to disable this

warning and to be sure that everything will continue working

when you upgrade to Celery 3.2::

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

You must only enable the serializers that you will actually use.

warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))

则表示须要在tasks中进行配置:

# vim tasks.py

#!/usr/bin/env python

# File: task.py

#

from time import sleep

from celery import Celery

backend = 'redis://127.0.0.1:6379/0'

broker = 'redis://127.0.0.1:6379/1'

app = Celery('tasks', backend=backend, broker=broker)

#app.conf.CELERY_TASK_SERIALIZER='json'

#app.conf.CELERY_ACCEPT_CONTENT=['json']

app.conf.update(

CELERY_TASK_SERIALIZER='json',

CELERY_ACCEPT_CONTENT=['json'],

CELERY_RESULT_SERIALIZER='json',

)

@app.task

def add(x, y):

sleep(10)

return x + y

測试代码:

[root]$ cat test.py

from tasks import add

if __name__ == '__main__':

for i in range(100):

for j in range(100):

kk=add.delay(i, j)

kk.ready()

kk.get()

[root]$ python ./test.py

能够在celelry worker看到消息被消费了

[2014-12-11 15:43:04,136: INFO/MainProcess] Received task: tasks.add[a0d1facd-39e8-44f6-9dd9-8980dbfca41b]

[2014-12-11 15:43:14,138: INFO/MainProcess] Task tasks.add[a0d1facd-39e8-44f6-9dd9-8980dbfca41b] succeeded in 10.0008870028s: 0

[2014-12-11 15:43:14,638: INFO/MainProcess] Received task: tasks.add[6357f049-ae5a-4690-8ac7-2ff91b9d21c9]

[2014-12-11 15:43:24,639: INFO/MainProcess] Task tasks.add[6357f049-ae5a-4690-8ac7-2ff91b9d21c9] succeeded in 10.0008919984s: 1

[2014-12-11 15:43:25,140: INFO/MainProcess] Received task: tasks.add[787039c5-bf6d-49e3-980b-912c0b743351]

[2014-12-11 15:43:35,141: INFO/MainProcess] Task tasks.add[787039c5-bf6d-49e3-980b-912c0b743351] succeeded in 10.0006869994s: 2

[2014-12-11 15:43:35,642: INFO/MainProcess] Received task: tasks.add[71826656-1b25-425d-884d-423d642ad6fe]

[2014-12-11 15:43:45,643: INFO/MainProcess] Task tasks.add[71826656-1b25-425d-884d-423d642ad6fe] succeeded in 10.000723999s: 3

[2014-12-11 15:43:46,144: INFO/MainProcess] Received task: tasks.add[eea8cbb3-c526-4c27-94b2-2cb1446b78f1]

[2014-12-11 15:43:56,145: INFO/MainProcess] Task tasks.add[eea8cbb3-c526-4c27-94b2-2cb1446b78f1] succeeded in 10.0006980002s: 4

[2014-12-11 15:43:56,646: INFO/MainProcess] Received task: tasks.add[b04058d7-9ac1-4979-a4ce-eb262c9ad2a4]

[2014-12-11 15:44:06,647: INFO/MainProcess] Task tasks.add[b04058d7-9ac1-4979-a4ce-eb262c9ad2a4] succeeded in 10.0008420013s: 5

[2014-12-11 15:44:07,148: INFO/MainProcess] Received task: tasks.add[ca5ebf48-591b-43dc-b542-a36a5bdc66b5]

[2014-12-11 15:44:17,149: INFO/MainProcess] Task tasks.add[ca5ebf48-591b-43dc-b542-a36a5bdc66b5] succeeded in 10.0005079992s: 6

[2014-12-11 15:44:17,649: INFO/MainProcess] Received task: tasks.add[0ec250b1-07b5-4df6-a06e-94ad232d5e73]

[2014-12-11 15:44:27,650: INFO/MainProcess] Task tasks.add[0ec250b1-07b5-4df6-a06e-94ad232d5e73] succeeded in 10.0003799982s: 7

...


4. 使用python測试

$ python

>>> from tasks import add

>>> r = add.delay(4, 4)

>>> r.ready() # 10s内运行,会输出False,由于add中sleep了10s。10s之后返回True

>>>False

>>> r.status #任务状态

'PENDING'

>>> r.result # 输出你的hostname

>>>8

>>> r.status

'SUCCESS'

假设以上測试没通过,则表示redis的工作不正常。


5. 使用Tornado測试

完整的測试代码例如以下:

/usr/bin/env python

#

# -*- coding:utf-8 -*-

#

# Author : Hank

# E-mail :

# File : tornado_redis.py

#

import json

import time

import tornado.httpserver

import tornado.ioloop

import tornado.web

import tornado.gen

import tornado.httpclient

import tcelery

import tasks

from tasks import add

class MainHandler(tornado.web.RequestHandler):

@tornado.web.asynchronous

@tornado.gen.coroutine

def get(self):

print "CALLING get()"

xxx = 10

yyy = 2

tasks.add.apply_async(args=[xxx,yyy])

application = tornado.web.Application([

(r"/", MainHandler),

])

if __name__ == "__main__":

application.listen(10001)

tornado.ioloop.IOLoop.instance().start()

调用:

$ curl -i "http://10.2.175.13:10001" &

$ curl -i "http://10.2.175.13:10001" &

celelry worker中查看执行结果:

在celery的消息端能够看到消息已发送了:

[2014-12-15 11:33:52,869: INFO/MainProcess] Received task: tasks.add[8336c4cc-84f4-4f5c-92b1-a54289526f56]

[2014-12-15 11:34:00,686: INFO/MainProcess] Received task: tasks.add[4971754a-3d62-40e6-8bfa-6666f83a2d2d]

[2014-12-15 11:34:02,871: INFO/MainProcess] Task tasks.add[8336c4cc-84f4-4f5c-92b1-a54289526f56] succeeded in 10.0005960017s: 12

[2014-12-15 11:34:10,687: INFO/MainProcess] Task tasks.add[4971754a-3d62-40e6-8bfa-6666f83a2d2d] succeeded in 10.0011309981s: 12


6. tornado和celery异步使用的局限

我查了非常多资料,也測试了非常多方法,眼下得到的结果是这种(欢迎有人找到方法否定我的结论):

tornado和celery结合使用时。没有办法设置从celery work返回给tornado的回调函数。

像这种形式:

tasks.add.apply_async(args=[xxx,yyy], callback=on_result())

def on_result(self, resp)

self.write(resp)

self.finish()

所以。仅仅能是将耗时的任务异步发送给celery。celery收到后就会处理任务,

之后再以某种方式去检查任务的运行结果。

要想实现上面这种功能,能够见我写总结《Tornado异步任务的实现》


2.3 celery 3.1 的文档

http://docs.jinkan.org/docs/celery/userguide/calling.html#guide-calling

Calling Tasks

文件夹:

. Basics

. Linking (callbacks/errbacks)

. ETA and countdown

. Expiration

. Message Sending Retry

. Serializers

. Compression

. Connections

. Routing options


1. Basics

This document describes Celery’s uniform “Calling API” used by task instances and the canvas.

The API defines a standard set of execution options, as well as three methods:

. apply_async(args[, kwargs[, …]])

Sends a task message.

. delay(*args, **kwargs)

Shortcut to send a task message, but does not support execution options.

. calling (__call__)

Applying an object supporting the calling API (e.g. add(2, 2)) means that the task will be executed

in the current process, and not by a worker (a message will not be sent).


2. Quick Cheat Sheet

. T.delay(arg, kwarg=value)

always a shortcut to .apply_async.

. T.apply_async((arg, ), {'kwarg': value})

. T.apply_async(countdown=10)

executes 10 seconds from now.

. T.apply_async(eta=now + timedelta(seconds=10))

executes 10 seconds from now, specifed using eta

. T.apply_async(countdown=60, expires=120)

executes in one minute from now, but expires after 2 minutes.

. T.apply_async(expires=now + timedelta(days=2))

expires in 2 days, set using datetime.


3.Example

The delay() method is convenient as it looks like calling a regular function:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

Using apply_async() instead you have to write:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

So delay is clearly convenient, but if you want to set additional execution

options you have to use apply_async.

The rest of this document will go into the task execution options in detail.

All examples use a task called add, returning the sum of two arguments:

@app.task

def add(x, y):

return x + y

Tip

If the task is not registered in the current process you can use send_task() to call the task by name instead.

There’s another way…

You will learn more about this later while reading about the Canvas,

but subtask‘s are objects used to pass around the signature of a task invocation,

(for example to send it over the network), and they also support the Calling API:

task.s(arg1, arg2, kwarg1='x', kwargs2='y').apply_async()


4. Linking (callbacks/errbacks)

Celery supports linking tasks together so that one task follows another.

The callback task will be applied with the result of the parent task as a partial argument:

add.apply_async((2, 2), link=add.s(16))

What is s?

The add.s call used here is called a subtask, I talk more about subtasks in the canvas guide,

where you can also learn about chain, which is a simpler way to chain tasks together.

In practice the link execution option is considered an internal primitive,

and you will probably not use it directly, but rather use chains instead.

Here the result of the first task (4) will be sent to a new task that adds 16 to the previous result,

forming the expression (2 + 2) + 16 = 20

You can also cause a callback to be applied if task raises an exception (errback),

but this behaves differently from a regular callback in that it will be passed the

id of the parent task, not the result. This is because it may not always be possible

to serialize the exception raised, and so this way the error callback requires a result

backend to be enabled, and the task must retrieve the result of the task instead.

This is an example error callback:

@app.task

def error_handler(uuid):

result = AsyncResult(uuid)

exc = result.get(propagate=False)

print('Task {0} raised exception: {1!r}\n{2!r}'.format(

uuid, exc, result.traceback))

it can be added to the task using the link_error execution option:

add.apply_async((2, 2), link_error=error_handler.s())

In addition, both the link and link_error options can be expressed as a list:

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

The callbacks/errbacks will then be called in order, and all callbacks will

be called with the return value of the parent task as a partial argument.


5. ETA and countdown

The ETA (estimated time of arrival) lets you set a specific date and time

that is the earliest time at which your task will be executed. countdown is a shortcut

to set eta by seconds into the future.

>>> result = add.apply_async((2, 2), countdown=3)

>>> result.get() # this takes at least 3 seconds to return

20

The task is guaranteed to be executed at some time after the specified date and time,

but not necessarily at that exact time. Possible reasons for broken deadlines may include

many items waiting in the queue, or heavy network latency. To make sure your

tasks are executed in a timely manner you should monitor the queue for congestion.

Use Munin, or similar tools, to receive alerts, so appropriate action can be

taken to ease the workload. See Munin.

While countdown is an integer, eta must be a datetime object,

specifying an exact date and time (including millisecond precision,

and timezone information):

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)

>>> add.apply_async((2, 2), eta=tomorrow)

Expiration

The expires argument defines an optional expiry time, either as seconds after task publish,

or a specific date and time using datetime:

>>> # Task expires after one minute from now.

>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime

>>> from datetime import datetime, timedelta

>>> add.apply_async((10, 10), kwargs,

... expires=datetime.now() + timedelta(days=1)

When a worker receives an expired task it will mark the task as REVOKED (TaskRevokedError).


6. Message Sending Retry

Celery will automatically retry sending messages in the event of connection failure,

and retry behavior can be configured – like how often to retry, or a maximum number

of retries – or disabled all together.

To disable retry you can set the retry execution option to False:

add.apply_async((2, 2), retry=False)

Related Settings:

CELERY_TASK_PUBLISH_RETRY

CELERY_TASK_PUBLISH_RETRY_POLICY

Retry Policy

A retry policy is a mapping that controls how retries behave, and can contain the following keys:

. max_retries

Maximum number of retries before giving up, in this case

the exception that caused the retry to fail will be raised.

A value of 0 or None means it will retry forever.

The default is to retry 3 times.

. interval_start

Defines the number of seconds (float or integer) to wait between retries.

Default is 0, which means the first retry will be instantaneous.

. interval_step

On each consecutive retry this number will be added to

the retry delay (float or integer). Default is 0.2.

. interval_max

Maximum number of seconds (float or integer) to wait between retries. Default is 0.2.

For example, the default policy correlates to:

add.apply_async((2, 2), retry=True, retry_policy={

'max_retries': 3,

'interval_start': 0,

'interval_step': 0.2,

'interval_max': 0.2,

})

the maximum time spent retrying will be 0.4 seconds.

It is set relatively short by default because a connection failure could lead

to a retry pile effect if the broker connection is down:

e.g. many web server processes waiting to retry blocking other incoming requests.


7. Serializers

Security

The pickle module allows for execution of arbitrary functions, please see the security guide.

Celery also comes with a special serializer that uses cryptography to sign your messages.

Data transferred between clients and workers needs to be serialized, so every message

in Celery has a content_type header that describes the serialization method used to encode it.

The default serializer is pickle, but you can change this using the CELERY_TASK_SERIALIZER setting,

or for each individual task, or even per message.

There’s built-in support for pickle, JSON, YAML and msgpack, and you can also add your own

custom serializers by registering them into the Kombu serializer registry (see ref:kombu:guide-serialization).

Each option has its advantages and disadvantages.

json – JSON is supported in many programming languages, is now

a standard part of Python (since 2.6), and is fairly fast to decode using the modern Python

libraries such as cjson or simplejson.

The primary disadvantage to JSON is that it limits you to the following data types:

strings, Unicode, floats, boolean, dictionaries, and lists. Decimals and dates are notably missing.

Also, binary data will be transferred using Base64 encoding, which will cause the

transferred data to be around 34% larger than an encoding which supports native binary types.

However, if your data fits inside the above constraints and you need cross-language support,

the default setting of JSON is probably your best choice.

See http://json.org for more information.

pickle – If you have no desire to support any language other than

Python, then using the pickle encoding will gain you the support of all

built-in Python data types (except class instances), smaller messages

when sending binary files, and a slight speedup over JSON processing.

See http://docs.python.org/library/pickle.html for more information.

yaml – YAML has many of the same characteristics as json,

except that it natively supports more data types (including dates, recursive references, etc.)

However, the Python libraries for YAML are a good bit slower than the libraries for JSON.

If you need a more expressive set of data types and need to maintain cross-language compatibility,

then YAML may be a better fit than the above.

See http://yaml.org/ for more information.

msgpack – msgpack is a binary serialization format that is closer to JSON

in features. It is very young however, and support should be considered experimental at this point.

See http://msgpack.org/ for more information.

The encoding used is available as a message header, so the worker knows how to deserialize any task.

If you use a custom serializer, this serializer must be available for the worker.

The following order is used to decide which serializer to use when sending a task:

1) The serializer execution option.

2) The Task.serializer attribute

3) The CELERY_TASK_SERIALIZER setting.

Example setting a custom serializer for a single task invocation:

>>> add.apply_async((10, 10), serializer='json')

8. Compression

Celery can compress the messages using either gzip, or bzip2.

You can also create your own compression schemes and register them in the kombu compression registry.

The following order is used to decide which compression scheme to use when sending a task:

1). The compression execution option.

2). The Task.compression attribute.

3). The CELERY_MESSAGE_COMPRESSION attribute.

Example specifying the compression used when calling a task:

>>> add.apply_async((2, 2), compression='zlib')

9. Connections

Automatic Pool Support

Since version 2.3 there is support for automatic connection pools,

so you don’t have to manually handle connections and publishers to reuse connections.

The connection pool is enabled by default since version 2.5.

See the BROKER_POOL_LIMIT setting for more information.

You can handle the connection manually by creating a publisher:

results = []

with add.app.pool.acquire(block=True) as connection:

with add.get_publisher(connection) as publisher:

try:

for args in numbers:

res = add.apply_async((2, 2), publisher=publisher)

results.append(res)

print([res.get() for res in results])

Though this particular example is much better expressed as a group:

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]

>>> res = group(add.subtask(n) for i in numbers).apply_async()

>>> res.get()

[4, 8, 16, 32]

10. Routing options

Celery can route tasks to different queues.

Simple routing (name <-> name) is accomplished using the queue option:

add.apply_async(queue='priority.high')

You can then assign workers to the priority.high queue by using the workers -Q argument:

$ celery worker -l info -Q celery,priority.high

參见

Hard-coding queue names in code is not recommended,

the best practice is to use configuration routers (CELERY_ROUTES).

To find out more about routing, please see Routing Tasks.

11. Advanced Options

These options are for advanced users who want to take use of AMQP’s full routing capabilities.

Interested parties may read the routing guide.

. exchange

Name of exchange (or a kombu.entity.Exchange) to send the message to.

. routing_key

Routing key used to determine.

. priority

A number between 0 and 9, where 0 is the highest priority.

Supported by: redis, beanstalk
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: