您的位置:首页 > 其它

flask使用celery定时器

2017-06-15 15:38 387 查看
# -*- coding: utf-8 -*-
from celery import Celery, Task
from celery.utils.log import get_task_logger
from flask import Flask
from datetime import timedelta

app = Flask(__name__)
app.config['CELERY_NAME'] = 'test_celery'
app.config['CELERY_BROKER_URL'] = 'redis://:@localhost:6379/0'
app.config['CELERY_RESULT_BACKEND'] = 'redis://:@localhost:6379/0'
app.config['CELERYBEAT_SCHEDULE'] = {
'add-every-10-seconds': {
'task': 'test_celery.long_task',
'schedule': timedelta(seconds=3),
'args': ()
},
}
logger = get_task_logger(__name__)
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)

@celery.task(bind=True)
def long_task(self):
"""启动worker: celery -A test_celery.celery worker -B --loglevel=debug"""
logger.debug('================')
logger.debug('此处写需要定时调用的任务')
logger.debug('================')

# 测试client
class Trigger(Task):
def run(self):
task = long_task.apply_async()
print task

if __name__ == '__main__':
trigger = Trigger()
trigger.run()


命令行启动celery即可

celery -A test_celery.celery worker -B --loglevel=debug
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: