您的位置:首页 > 其它

Celery -- 分布式任务队列 及实例

2017-02-21 19:56 393 查看
Celery使用场景及实例 

Celery介绍和基本使用在项目中如何使用celery启用多个workersCelery定时任务与django结合通过django配置celeryperiodictask
一、Celery介绍和基本使用Celery是一个基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery,举几个实例场景中可用的例子:

你想对100台机器执行一条批量命令,可能会花很长时间,但你不想让你的程序等着结果返回,而是给你返回一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果,在任务执行ing进行时,你可以继续做其它的事情。你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天是客户的生日,就给他发个短信祝福
Celery在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用rabbitMQorRedis,后面会讲

1.1Celery有以下优点:

简单:一单熟悉了celery的工作流程后,配置和使用还是比较简单的高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务快速:一个单进程的celery每分钟可处理上百万个任务灵活:几乎celery的各个组件都可以被扩展及自定制
Celery基本工作流程图:





1.2Celery安装使用Celery的默认broker是RabbitMQ,仅需配置一行就可以

broker_url='amqp://guest:guest@localhost:5672//'

rabbitMQ没装的话请装一下,安装看这里http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3

使用Redis做broker也可以

安装redis组件

$pipinstall-U"celery[redis]"

配置

Configurationiseasy,justconfigurethelocationofyourRedisdatabase:

app.conf.broker_url='redis://localhost:6379/0'

WheretheURLisintheformatof:

redis://:password@hostname:port/db_number

allfieldsaftertheschemeareoptional,andwilldefaultto
localhost
onport6379,usingdatabase0.

如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪

IfyoualsowanttostorethestateandreturnvaluesoftasksinRedis,youshouldconfigurethesesettings:

app.conf.result_backend='redis://localhost:6379/0'

1.3开始使用Celery啦  

安装celery模块

$pipinstallcelery

创建一个celeryapplication用来定义你的任务列表

创建一个任务文件就叫task.py!!!

fromceleryimportCelery
importsubprocess

#注意下方的‘tasks’可以是任意名称并不是必须保持任务脚本名一样
app=Celery('tasks',
broker='redis://192.168.14.234',
backend='redis://192.168.14.234')

@app.task
defadd(x,y):
print("running...",x,y)
returnx+y

@app.task
defrun_cmd(cmd):
cmd_obj=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)

启动CeleryWorker来开始监听并执行任务

$celery-Ataskworker--loglevel=info

注意:这里的‘task’要和你的任务文件名一致!!!

操作的时候,如果需要重启worker可以按两次Ctrl+C。

调用任务

再打开一个终端,进行命令行模式,调用任务 

>>>fromtasksimportadd
>>>add.delay(4,4)

看你的worker终端会显示收到一个任务,此时你想看任务结果的话,需要在调用任务时 赋值个变量

>>>result=add.delay(4,4)

The
ready()
methodreturnswhetherthetaskhasfinishedprocessingornot:

用来检测任务结果是否实行完毕,即任务执行结果是否准备好。

>>>result.ready()
False

Youcanwaitfortheresulttocomplete,butthisisrarelyusedsinceitturnstheasynchronouscallintoasynchronousone:

#get参数可以设置超时时间,如下,如果设置为1,则表示1秒之内没有收到结果,就会报接收超时。

>>>result.get(timeout=1)
8

Incasethetaskraisedanexception,
get()
willre-raisetheexception,butyoucanoverridethisbyspecifyingthe
propagate
argument:

#propagate=False默认值为True,设置为False作用:可以对异常信息进行格式化输出。

>>>result.get(propagate=False)

Ifthetaskraisedanexceptionyoucanalsogainaccesstotheoriginaltraceback:

#traceback用于追踪错误信息,用于调试的时候,定位错误信息位置。

>>>result.traceback

上述实例练习截图:(调用’df-h’命令查看磁盘空间)





二、在项目中如何使用celery 
可以把celery配置成一个应用,实现配置文件和任务文件的分离。

目录格式如下:

proj/__init__.py
/celery.py
/tasks.py

proj/celery.py内容

下方的broker和backend设置,根据自己的环境写,可以用RabbitMQ,amqp,Redis等;

broker='amqp://',

backend='amqp://',

from__future__importabsolute_import,unicode_literals
fromceleryimportCelery

app=Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])

#Optionalconfiguration,seetheapplicationuserguide.
app.conf.update(
result_expires=3600,
)

if__name__=='__main__':
app.start()

proj/tasks.py中的内容

from__future__importabsolute_import,unicode_literals
from.celeryimportapp

@app.task
defadd(x,y):
returnx+y
@app.task
defmul(x,y):
returnx*y
@app.task
defxsum(numbers):
returnsum(numbers)

启动worker(注意:这里启动的任务名称,要用项目名:proj,而不是上边的单独任务文件名!!!)

$celery-Aprojworker-linfo

输出:


--------------celery@jessonPCv4.0.2(latentcall)
----****-----
---*****--Linux-3.19.0-25-generic-x86_64-with-Ubuntu-14.04-trusty2017-02-2212:55:17
--*-****---
-**----------[config]
-**----------.>app:proj:0x7fb1183dbdd8
-**----------.>transport:redis://192.168.14.234:6379//
-**----------.>results:redis://192.168.14.234/
-***---*---.>concurrency:1(prefork)
--*******----.>taskevents:OFF(enable-Etomonitortasksinthisworker)
---*****-----
--------------[queues]
.>celeryexchange=celery(direct)key=celery

后台启动worker

Inthebackground
Inproductionyou’llwanttoruntheworkerinthebackground,thisisdescribedindetailinthedaemonizationtutorial.

Thedaemonizationscriptsusesthecelerymulticommandtostartoneormoreworkersinthebackground:

#开启后台多个任务进程

$celerymultistartw1-Aproj-linfo
celerymultiv4.0.0(latentcall)
>Startingnodes...
>w1.halcyon.local:OK

Youcanrestartittoo:

#restart重启相关的任务进程

$celerymultirestartw1-Aproj-linfo
celerymultiv4.0.0(latentcall)
>Stoppingnodes...
>w1.halcyon.local:TERM->64024
>Waitingfor1node.....
>w1.halcyon.local:OK
>Restartingnodew1.halcyon.local:OK
celerymultiv4.0.0(latentcall)
>Stoppingnodes...
>w1.halcyon.local:TERM->64052

orstopit:#stop终止相关的任务进程

$celerymultistopw1#后边的参数可以省略-Aproj-linfo

The
stop
commandisasynchronoussoitwon’twaitfortheworkertoshutdown.You’llprobablywanttousethe
stopwait
commandinstead,thisensuresallcurrentlyexecutingtasksiscompletedbeforeexiting:

$celerymultistopwaitw1-Aproj-linfo

三、Celery定时任务
celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行,这个定时任务模块叫celerybeat。

写一个脚本:periodic_task.py

fromceleryimportCelery
fromcelery.schedulesimportcrontab

app=Celery()

@app.on_after_configure.connect
defsetup_periodic_tasks(sender,**kwargs):
#Callstest('hello')every10seconds.
sender.add_periodic_task(10.0,test.s('hello'),name='addevery10')

#Callstest('world')every30seconds
sender.add_periodic_task(30.0,test.s('world'),expires=10)

#ExecuteseveryMondaymorningat7:30a.m.
sender.add_periodic_task(
crontab(hour=7,minute=30,day_of_week=1),
test.s('HappyMondays!'),
)

@app.task
deftest(arg):
print(arg)

add_periodic_task会添加一条定时任务

上面是通过调用函数添加定时任务,也可以像写配置文件一样的形式添加,下面是每30s执行的任务。

app.conf.beat_schedule={
'add-every-30-seconds':{
'task':'tasks.add',
'schedule':30.0,
'args':(16,16)
},
}
app.conf.timezone='UTC'

任务添加好了,需要让celery单独启动一个进程来定时发起这些任务,注意,这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划,每发现有任务需要执行了,就发起一个任务调用消息,交给celeryworker去执行。

启动任务调度器celerybeat

$celery-Aperiodic_taskbeat

输出likebelow

celerybeatv4.0.2(latentcall)isstarting.
__-...__-_
LocalTime->2017-02-0818:39:31
Configuration->
.broker->redis://localhost:6379//
.loader->celery.loaders.app.AppLoader
.scheduler->celery.beat.PersistentScheduler
.db->celerybeat-schedule
.logfile->[stderr]@%WARNING
.maxinterval->5.00minutes(300s)

此时还差一步,就是还需要启动一个worker,负责执行celerybeat发起的任务。

启动celeryworker来执行任务

$celery-Aperiodic_taskworker

--------------celery@Alexs-MacBook-Pro.localv4.0.2(latentcall)
----****-----
---*****--Darwin-15.6.0-x86_64-i386-64bit2017-02-0818:42:08
--*-****---
-**----------[config]
-**----------.>app:tasks:0x104d420b8
-**----------.>transport:redis://localhost:6379//
-**----------.>results:redis://localhost/
-***---*---.>concurrency:8(prefork)
--*******----.>taskevents:OFF(enable-Etomonitortasksinthisworker)
---*****-----
--------------[queues]
.>celeryexchange=celery(direct)key=celery


OK,此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务呢!
注意:Beatneedstostorethelastruntimesofthetasksinalocaldatabasefile(namedcelerybeat-schedulebydefault),soitneedsaccesstowriteinthecurrentdirectory,oralternativelyyoucanspecifyacustomlocationforthisfile:

$celery-Aperiodic_taskbeat-s/home/celery/var/run/celerybeat-schedule

更复杂的定时配置  

上面的定时任务比较简单,只是每多少s执行一个任务,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?哈,其实也简单,用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间:

linuxcrontabhttp://www.cnblogs.com/peida/archive/2013/01/08/2850483.html
fromcelery.schedulesimportcrontab

app.conf.beat_schedule={
#ExecuteseveryMondaymorningat7:30a.m.
'add-every-monday-morning':{
'task':'tasks.add',
'schedule':crontab(hour=7,minute=30,day_of_week=1),
'args':(16,16),
},
}

上面的这条意思是每周1的早上7.30执行tasks.add任务

还有更多定时配置方式如下:

ExampleMeaning
crontab()
Executeeveryminute.
crontab(minute=0,hour=0)
Executedailyatmidnight.
crontab(minute=0,hour='*/3')
Executeeverythreehours:midnight,3am,6am,9am,noon,3pm,6pm,9pm.
crontab(minute=0,

hour='0,3,6,9,12,15,18,21')
Sameasprevious.
crontab(minute='*/15')
Executeevery15minutes.
crontab(day_of_week='sunday')
Executeeveryminute(!)atSundays.
crontab(minute='*',

hour='*',
day_of_week='sun')
Sameasprevious.
crontab(minute='*/10',

hour='3,17,22',
day_of_week='thu,fri')
Executeeverytenminutes,butonlybetween3-4am,5-6pm,and10-11pmonThursdaysorFridays.
crontab(minute=0,hour='*/2,*/3')
Executeeveryevenhour,andeveryhourdivisiblebythree.Thismeans:ateveryhourexcept:1am,5am,7am,11am,1pm,5pm,7pm,11pm
crontab(minute=0,hour='*/5')
Executehourdivisibleby5.Thismeansthatitistriggeredat3pm,not5pm(since3pmequalsthe24-hourclockvalueof“15”,whichisdivisibleby5).
crontab(minute=0,hour='*/3,8-17')
Executeeveryhourdivisibleby3,andeveryhourduringofficehours(8am-5pm).
crontab(0,0,day_of_month='2')
Executeontheseconddayofeverymonth.
crontab(0,0,

day_of_month='2-30/3')
Executeoneveryevennumberedday.
crontab(0,0,

day_of_month='1-7,15-21')
Executeonthefirstandthirdweeksofthemonth.
crontab(0,0,day_of_month='11',

month_of_year='5')
ExecuteontheeleventhofMayeveryyear.
crontab(0,0,

month_of_year='*/3')
Executeonthefirstmonthofeveryquarter.
上面能满足你绝大多数定时任务需求了,甚至还能根据潮起潮落来配置定时任务,具体看http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html#solar-schedules  

四、最佳实践之与django结合
django可以轻松跟celery结合实现异步任务,只需简单配置即可
IfyouhaveamodernDjangoprojectlayoutlike:
-proj/
-proj/__init__.py
-proj/settings.py
-proj/urls.py
-manage.py








django+celery配置注意事项

说生产环境中不建议django项目的settings文件开启debug模式,因为这样可能会导致服务器内存泄露。

PS:内存泄露和内存溢出的区别





修改建议:将djangosettings中的DEBUG参数设置为False



内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐
章节导航