您的位置:首页 > 编程语言 > Python开发

Python celery异步框架

2020-12-03 23:07 323 查看

celery

功能描述

   它是一个简单、灵活、可靠的用于处理大量消息的分布式系统。

   功能主要有三个:执行异步任务,执行延迟任务,执行定时任务。

   举个例子,你现在有两个项目、一个项目用于爬取数据,一个项目用于分析数据,如何在数据爬取后将任务交给另一个项目进行分析呢?这种场景下就可以使用

celery
进行处理。

   官网

   英文文档

   一个噩耗消息:

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.

Celery是一个资金较少的项目,因此我们不支持Microsoft Windows。请不要提出与该平台有关的任何问题。

   尽管官方提示不支持

windows
,但是你仍然可以进行使用,这可能需要一些其他模块的辅助。

  

celery
是单独的服务,并不依赖于其他框架,就像
Django
一样你只要安装了它就可以通过自身命令启动服务。

架构介绍

  

celery
架构由三部分组成,分别是消息中间件
message br
56c
oker
,任务执行单元
worker
与任务执行结果存储
task result store
,如下图所示:

  

  

celery
是一个独立运行的服务,内置
socket
,如果想使用它你需要做这几件事情:

  1. 安装celery环境框架,配置broker与backend,启动celery服务
  2. 添加任务到borker,worker就会自动的在后台执行任务
  3. 任务执行完成后,通过backend获取结果

基本使用

安装使用

   安装模块,我装的旧版,新版

5.x
的有些摸不着头脑:

pip3 install celery==4.4.7

   新建一个

python
包,任意名字。

project
├── celery_task  	# celery包
│   ├── __init__.py # 包文件
│   ├── celery.py   # celery连接和配置相关文件,且名字必须叫celery.py
│   └── tasks.py    # 所有任务函数
├── add_task.py  	# 添加任务
└── get_result.py   # 获取结果

   在

celery.py
中配置
borker
backend

from celery import Celery

broker = "redis://127.0.0.1:6379/1"  # broker任务队列
backend = "redis://127.0.0.1:6379/2" # 结构存储,执行完的结果存在这
# 如果有密码:"redis//:password@127.0.0.1:6379/2"

app = Celery(
__name__,   # 取名,随便取
broker=broker,
backend=backend,
include=[
"celery_tasks.task",  # 第一个任务,必须是包名.文件名
]
)

任务书写

   在

tasks.py
中开始书写任务:

from .celery import app
@app.task  # 必须添加该装饰器
def add(x,y):
return x+y

@app.task
def sub(x,y):
return x-y

@app.task
def multi(x,y):
return x*y

任务执行

   在

add_task.py
中开始执行任务,三个任务分别指定三种不同的执行状态:

# 导入定义好的任务
from celery_task import tasks

# 添加异步任务,返回结果。任务号
t1_id = tasks.add.delay(10,20)

# 配置延迟、定时任务的时区为本地,如果延迟任务不生效,则取消本地时区的设置(windows下失效)
from celery_task.celery import app
# 时区
# app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
# app.conf.enable_utc = False

# 添加延迟任务,返回结果。任务号
from datetime import datetime,timedelta
time =  datetime.utcnow() + timedelta(seconds=10)  # 十秒后执行
t2_id = tasks.sub.apply_async(args=(100,50),eta=time)

# 添加定时任务,需要启动定时任务beat服务
from celery.schedules import crontab  # 如果要定义其他的周期日期,导入这个
app.conf.beat_schedule = {
'multi-task': {
'task': 'celery_task.tasks.multi',
'schedule': timedelta(seconds=3),
# 'schedule': crontab(hour=8, day_of_week=1),  # 每周一早八点
'args': (20, 10),
}
}

获取结果

   在

get_result.py
中书写获取结果的代码:

from celery_task.celery import app
from celery.result import AsyncResult

id = 'a9ffd16c-dbe0-44d2-9317-b198b432273c'  # 任务号
if __name__ == '__main__':
async = AsyncResult(id=id, app=app)
if async.successful():
result = async.get()
print(result)
elif async.failed():
print('任务失败')
elif async.status == 'PENDING':
print('任务等待中被执行')
elif async.status == 'RETRY':
print('任务异常后正在重试')
elif async.status == 'STARTED':
print('任务已经开始被执行')

启动服务

   接下来启动服务,首先切换到该包的上级目录中:

# cd project
# Linux
celery worker -A 模块名 -l info
# Windows
需要先安
564
装eventlet模块
pip install eventlet
celery worker -A 包名 -l info -P eventlet

# 如果是定时任务,还需要启动beat服务
celery beat -A 包名 -l info

Django使用

基本使用

   如果在

Django
中要使用
celery
,则需要将
celery
项目建立在
Django
项目的根目录下:

- DjangoProject01
- celery_project
- __init__.py
- celery.py
- django_app_name_task.py
- app01
- djangoproject01

   同时,在任务中还需要导入

Django
环境,一般书写在
celery.py
文件中即可:

import os
import django

from celery import Celery

# 由于celery是独立的项目,所以必须导入django环境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "DjangoProject.settings")
django.setup()

broker = 'redis://127.0.0.1:6379/1'  # broker任务队列
backend = 'redis://127.0.0.1:6379/2'  # 结构存储,执行完的结果存在这

app=Celery(__name__,broker=broker,backend=backend,include=['celery_project.app01_task',])

app.conf.timezone = "Asia/Shanghai"
app.conf.enable_utc = False

from datetime import timedelta
from celery.schedules import crontab
app.conf.beat_schedule = {
'add-task': {
'task': 'celery_project.app01_task.task01',
'schedule': tim
ad8
edelta(hours=4),
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: