Celery -- 分布式任务队列 及实例
2017-02-21 19:56
393 查看
Celery使用场景及实例
Celery介绍和基本使用在项目中如何使用celery启用多个workersCelery定时任务与django结合通过django配置celeryperiodictask
一、Celery介绍和基本使用Celery是一个基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery,举几个实例场景中可用的例子:
你想对100台机器执行一条批量命令,可能会花很长时间,但你不想让你的程序等着结果返回,而是给你返回一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果,在任务执行ing进行时,你可以继续做其它的事情。你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天是客户的生日,就给他发个短信祝福
Celery在执行任务时需要通过一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用rabbitMQorRedis,后面会讲Celery介绍和基本使用在项目中如何使用celery启用多个workersCelery定时任务与django结合通过django配置celeryperiodictask
一、Celery介绍和基本使用Celery是一个基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用celery,举几个实例场景中可用的例子:
1.1Celery有以下优点:
1.2Celery安装使用Celery的默认broker是RabbitMQ,仅需配置一行就可以
broker_url='amqp://guest:guest@localhost:5672//'
rabbitMQ没装的话请装一下,安装看这里
使用Redis做broker也可以
安装redis组件
$pipinstall-U"celery[redis]"
配置
Configurationiseasy,justconfigurethelocationofyourRedisdatabase:
app.conf.broker_url='redis://localhost:6379/0'
WheretheURLisintheformatof:
redis://:password@hostname:port/db_number
allfieldsaftertheschemeareoptional,andwilldefaultto
localhostonport6379,usingdatabase0.
如果想获取每个任务的执行结果,还需要配置一下把任务结果存在哪
IfyoualsowanttostorethestateandreturnvaluesoftasksinRedis,youshouldconfigurethesesettings:
app.conf.result_backend='redis://localhost:6379/0'
1.3开始使用Celery啦
安装celery模块
$pipinstallcelery
创建一个celeryapplication用来定义你的任务列表
创建一个任务文件就叫task.py!!!
fromceleryimportCelery importsubprocess #注意下方的‘tasks’可以是任意名称并不是必须保持任务脚本名一样 app=Celery('tasks', broker='redis://192.168.14.234', backend='redis://192.168.14.234') @app.task defadd(x,y): print("running...",x,y) returnx+y @app.task defrun_cmd(cmd): cmd_obj=subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE,stderr=subprocess.PIPE)
启动CeleryWorker来开始监听并执行任务
$celery-Ataskworker--loglevel=info
注意:这里的‘task’要和你的任务文件名一致!!!
操作的时候,如果需要重启worker可以按两次Ctrl+C。
调用任务
再打开一个终端,进行命令行模式,调用任务
>>>fromtasksimportadd >>>add.delay(4,4)
看你的worker终端会显示收到一个任务,此时你想看任务结果的话,需要在调用任务时 赋值个变量
>>>result=add.delay(4,4)
The
ready()methodreturnswhetherthetaskhasfinishedprocessingornot:
用来检测任务结果是否实行完毕,即任务执行结果是否准备好。
>>>result.ready() False
Youcanwaitfortheresulttocomplete,butthisisrarelyusedsinceitturnstheasynchronouscallintoasynchronousone:
#get参数可以设置超时时间,如下,如果设置为1,则表示1秒之内没有收到结果,就会报接收超时。
>>>result.get(timeout=1) 8
Incasethetaskraisedanexception,
get()willre-raisetheexception,butyoucanoverridethisbyspecifyingthe
propagateargument:
#propagate=False默认值为True,设置为False作用:可以对异常信息进行格式化输出。
>>>result.get(propagate=False)
Ifthetaskraisedanexceptionyoucanalsogainaccesstotheoriginaltraceback:
#traceback用于追踪错误信息,用于调试的时候,定位错误信息位置。
>>>result.traceback
上述实例练习截图:(调用’df-h’命令查看磁盘空间)
二、在项目中如何使用celery
可以把celery配置成一个应用,实现配置文件和任务文件的分离。
目录格式如下:
proj/__init__.py /celery.py /tasks.py
proj/celery.py内容
下方的broker和backend设置,根据自己的环境写,可以用RabbitMQ,amqp,Redis等;
broker='amqp://',
backend='amqp://',
from__future__importabsolute_import,unicode_literals fromceleryimportCelery app=Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) #Optionalconfiguration,seetheapplicationuserguide. app.conf.update( result_expires=3600, ) if__name__=='__main__': app.start()
proj/tasks.py中的内容
from__future__importabsolute_import,unicode_literals from.celeryimportapp @app.task defadd(x,y): returnx+y @app.task defmul(x,y): returnx*y @app.task defxsum(numbers): returnsum(numbers)
启动worker(注意:这里启动的任务名称,要用项目名:proj,而不是上边的单独任务文件名!!!)
$celery-Aprojworker-linfo
输出:
--------------celery@jessonPCv4.0.2(latentcall)
----****-----
---*****--Linux-3.19.0-25-generic-x86_64-with-Ubuntu-14.04-trusty2017-02-2212:55:17
--*-****---
-**----------[config]
-**----------.>app:proj:0x7fb1183dbdd8
-**----------.>transport:redis://192.168.14.234:6379//
-**----------.>results:redis://192.168.14.234/
-***---*---.>concurrency:1(prefork)
--*******----.>taskevents:OFF(enable-Etomonitortasksinthisworker)
---*****-----
--------------[queues]
.>celeryexchange=celery(direct)key=celery
后台启动worker
Inthebackground
Inproductionyou’llwanttoruntheworkerinthebackground,thisisdescribedindetailinthe
Thedaemonizationscriptsusesthecelerymulticommandtostartoneormoreworkersinthebackground:
#开启后台多个任务进程
$celerymultistartw1-Aproj-linfo
celerymultiv4.0.0(latentcall)
>Startingnodes...
>w1.halcyon.local:OK
Youcanrestartittoo:
#restart重启相关的任务进程
$celerymultirestartw1-Aproj-linfo
celerymultiv4.0.0(latentcall)
>Stoppingnodes...
>w1.halcyon.local:TERM->64024
>Waitingfor1node.....
>w1.halcyon.local:OK
>Restartingnodew1.halcyon.local:OK
celerymultiv4.0.0(latentcall)
>Stoppingnodes...
>w1.halcyon.local:TERM->64052
orstopit:#stop终止相关的任务进程
$celerymultistopw1#后边的参数可以省略-Aproj-linfo
The
stopcommandisasynchronoussoitwon’twaitfortheworkertoshutdown.You’llprobablywanttousethe
stopwaitcommandinstead,thisensuresallcurrentlyexecutingtasksiscompletedbeforeexiting:
$celerymultistopwaitw1-Aproj-linfo
三、Celery定时任务
celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行,这个定时任务模块叫celerybeat。
写一个脚本:periodic_task.py
fromceleryimportCelery
fromcelery.schedulesimportcrontab
app=Celery()
@app.on_after_configure.connect
defsetup_periodic_tasks(sender,**kwargs):
#Callstest('hello')every10seconds.
sender.add_periodic_task(10.0,test.s('hello'),name='addevery10')
#Callstest('world')every30seconds
sender.add_periodic_task(30.0,test.s('world'),expires=10)
#ExecuteseveryMondaymorningat7:30a.m.
sender.add_periodic_task(
crontab(hour=7,minute=30,day_of_week=1),
test.s('HappyMondays!'),
)
@app.task
deftest(arg):
print(arg)
add_periodic_task会添加一条定时任务
上面是通过调用函数添加定时任务,也可以像写配置文件一样的形式添加,下面是每30s执行的任务。
app.conf.beat_schedule={
'add-every-30-seconds':{
'task':'tasks.add',
'schedule':30.0,
'args':(16,16)
},
}
app.conf.timezone='UTC'
任务添加好了,需要让celery单独启动一个进程来定时发起这些任务,注意,这里是发起任务,不是执行,这个进程只会不断的去检查你的任务计划,每发现有任务需要执行了,就发起一个任务调用消息,交给celeryworker去执行。
启动任务调度器celerybeat
$celery-Aperiodic_taskbeat
输出likebelow
celerybeatv4.0.2(latentcall)isstarting.
__-...__-_
LocalTime->2017-02-0818:39:31
Configuration->
.broker->redis://localhost:6379//
.loader->celery.loaders.app.AppLoader
.scheduler->celery.beat.PersistentScheduler
.db->celerybeat-schedule
.logfile->[stderr]@%WARNING
.maxinterval->5.00minutes(300s)
此时还差一步,就是还需要启动一个worker,负责执行celerybeat发起的任务。
启动celeryworker来执行任务
$celery-Aperiodic_taskworker
--------------celery@Alexs-MacBook-Pro.localv4.0.2(latentcall)
----****-----
---*****--Darwin-15.6.0-x86_64-i386-64bit2017-02-0818:42:08
--*-****---
-**----------[config]
-**----------.>app:tasks:0x104d420b8
-**----------.>transport:redis://localhost:6379//
-**----------.>results:redis://localhost/
-***---*---.>concurrency:8(prefork)
--*******----.>taskevents:OFF(enable-Etomonitortasksinthisworker)
---*****-----
--------------[queues]
.>celeryexchange=celery(direct)key=celery
OK,此时观察worker的输出,是不是每隔一小会,就会执行一次定时任务呢!
注意:Beatneedstostorethelastruntimesofthetasksinalocaldatabasefile(namedcelerybeat-schedulebydefault),soitneedsaccesstowriteinthecurrentdirectory,oralternativelyyoucanspecifyacustomlocationforthisfile:
$celery-Aperiodic_taskbeat-s/home/celery/var/run/celerybeat-schedule
更复杂的定时配置
上面的定时任务比较简单,只是每多少s执行一个任务,但如果你想要每周一三五的早上8点给你发邮件怎么办呢?哈,其实也简单,用crontab功能,跟linux自带的crontab功能是一样的,可以个性化定制任务执行时间:
linuxcrontab
fromcelery.schedulesimportcrontab
app.conf.beat_schedule={
#ExecuteseveryMondaymorningat7:30a.m.
'add-every-monday-morning':{
'task':'tasks.add',
'schedule':crontab(hour=7,minute=30,day_of_week=1),
'args':(16,16),
},
}
上面的这条意思是每周1的早上7.30执行tasks.add任务
还有更多定时配置方式如下:
Example | Meaning |
crontab() | Executeeveryminute. |
crontab(minute=0,hour=0) | Executedailyatmidnight. |
crontab(minute=0,hour='*/3') | Executeeverythreehours:midnight,3am,6am,9am,noon,3pm,6pm,9pm. |
crontab(minute=0, hour='0,3,6,9,12,15,18,21') | Sameasprevious. |
crontab(minute='*/15') | Executeevery15minutes. |
crontab(day_of_week='sunday') | Executeeveryminute(!)atSundays. |
crontab(minute='*', hour='*', day_of_week='sun') | Sameasprevious. |
crontab(minute='*/10', hour='3,17,22', day_of_week='thu,fri') | Executeeverytenminutes,butonlybetween3-4am,5-6pm,and10-11pmonThursdaysorFridays. |
crontab(minute=0,hour='*/2,*/3') | Executeeveryevenhour,andeveryhourdivisiblebythree.Thismeans:ateveryhourexcept:1am,5am,7am,11am,1pm,5pm,7pm,11pm |
crontab(minute=0,hour='*/5') | Executehourdivisibleby5.Thismeansthatitistriggeredat3pm,not5pm(since3pmequalsthe24-hourclockvalueof“15”,whichisdivisibleby5). |
crontab(minute=0,hour='*/3,8-17') | Executeeveryhourdivisibleby3,andeveryhourduringofficehours(8am-5pm). |
crontab(0,0,day_of_month='2') | Executeontheseconddayofeverymonth. |
crontab(0,0, day_of_month='2-30/3') | Executeoneveryevennumberedday. |
crontab(0,0, day_of_month='1-7,15-21') | Executeonthefirstandthirdweeksofthemonth. |
crontab(0,0,day_of_month='11', month_of_year='5') | ExecuteontheeleventhofMayeveryyear. |
crontab(0,0, month_of_year='*/3') | Executeonthefirstmonthofeveryquarter. |
四、最佳实践之与django结合
django可以轻松跟celery结合实现异步任务,只需简单配置即可
IfyouhaveamodernDjangoprojectlayoutlike:
-proj/
-proj/__init__.py
-proj/settings.py
-proj/urls.py
-manage.py
django+celery配置注意事项
说生产环境中不建议django项目的settings文件开启debug模式,因为这样可能会导致服务器内存泄露。
PS:
修改建议:将djangosettings中的DEBUG参数设置为False
相关文章推荐
- Celery 分布式任务队列入门
- celery分布式队列任务
- python Celery 分布式任务队列快速入门
- 分布式任务队列Celery入门与进阶
- day21 git & github + Celery 分布式任务队列
- Celery 分布式任务队列快速入门
- 分布式任务队列 Celery —— 深入 Task
- 分布式任务队列 Celery —— 详解工作流
- Celery 分布式任务队列
- Celery - 分布式任务队列
- 分布式任务队列与任务调度系统Celery进阶——分布式爬虫
- Celery 实现分布式任务队列
- 【Python】分布式任务队列Celery使用参考资料
- Python开发【模块】:Celery 分布式异步消息任务队列
- 爬虫系列20.Celery - 分布式任务队列
- 分布式任务队列-celery
- 分布式任务队列celery用法详解
- 【转】分布式异步任务队列 Celery + rabbitmq (or redis )
- 分布式任务队列 Celery —— 应用基础
- 分布式任务队列 Celery