您的位置:首页 > 编程语言 > Python开发

python学习[4]: 用python celery + rabbitMQ搭建并行分布式框架及验证

2017-08-26 10:50 831 查看
任务解耦(分布式并发处理):假设生产者和消费者分别是两个类。如果让生产者直接调用消费者的某个方法,那么生产者对于消费者就会产生依赖(也就是耦合)。将来如果消费者的代码发生变化,可能会影响到生产者。而如果两者都依赖于某个缓冲区,两者之间不直接依赖,耦合也就相应降低了。生产者直接调用消费者的某个方法,还有另一个弊端:由于函数调用是同步的(或者叫阻塞的),在消费者的方法没有返回之前,生产者只好一直等在那边。万一消费者处理数据很慢,生产者就会白白糟蹋大好时光。缓冲区还有另一个好处。如果制造数据的速度时快时慢,缓冲区的好处就体现出来了。当数据制造快的时候,消费者来不及处理,未处理的数据可以暂时存在缓冲区中。等生产者的制造速度慢下来,消费者再慢慢处理掉。

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery, 举几个实例场景中可用的例子:

你想对100台机器执行一条批量命令,可能会花很长时间 ,但你不想让你的程序等着结果返回,而是给你返回 一个任务ID,你过一段时间只需要拿着这个任务id就可以拿到任务执行结果, 在任务执行ing进行时,你可以继续做其它的事情。 

你想做一个定时任务,比如每天检测一下你们所有客户的资料,如果发现今天 是客户的生日,就给他发个短信祝福。

celery支持定时任务,设定好任务的执行时间,celery就会定时自动帮你执行, 这个定时任务模块叫celery beat.

celery则是事件驱动,调用一次发送才有。这是与celery beat两者的区别!

CELERYBEAT_SCHEDULE

官方资料:
http://docs.celeryproject.org/en/master/userguide/periodic-tasks.html
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, 

while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.
//提供:任务队列和任务调度

Task queues are used as a mechanism to distribute work across threads or machines.
//任务队列用于分布式调度的场景

Celery communicates via messages, usually using a broker to mediate between clients and workers 

//Celery通过消息通常是broker来中转clients和workers的消息。

所以,我要启动一个broker进程,一个client进程产生工单(task),N个worker进程收单(执行task)。

Celery的hell world工程:

from celery import Celery

app = Celery('hello', broker='amqp://guest@localhost//')

@app.task

def hello():

    return 'hello world'

A single Celery process can process millions of tasks a minute, with sub-millisecond round-trip latency (using RabbitMQ, librabbitmq, and optimized settings).

一个Celery进程任务一分钟几百万,亚毫秒的往返延迟(使用RabbitMQ,librabbitmq,优化设置)。

It supports

Brokers (RabbitMQ, Redis,Amazon SQS, and more…)

Concurrency (prefork (multiprocessing),Eventlet, gevent solo (single threaded))

Result Stores (AMQP, Redis Memcached,SQLAlchemy, Django ORM Apache Cassandra, Elasticsearch )

Serialization (pickle, json, yaml, msgpack.zlib, bzip2 compression.Cryptographic message signing.)

特点:

Monitoring,Scheduling,Work-flows(Cavas原语),Resource Leak Protection,Time & Rate Limits(worker task),User Components。

Celery is easy to integrate with web frameworks like Django
http://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#django-first-steps
celery与Django集成,其实就是用了它的proj name,__init__和settings.py

下面这句话很有意思。

The uppercase name-space means that all Celery configuration options must be specified in uppercase instead of lowercase, 

and start with CELERY_, so for example the task_always_eager setting becomes CELERY_TASK_ALWAYS_EAGER, and the broker_url setting becomes CELERY_BROKER_URL.

app.config_from_object('django.conf:settings', namespace='CELERY')

这话的意思是celery配置项目必须以大写的CELERY_开头!!

Django project here:
https://github.com/celery/celery/tree/master/examples/django/
Create the Celery database tables by performing a database migrations:
//结果回传

$ python manage.py migrate django_celery_results

django-celery-beat - Database-backed Periodic Tasks with Admin interface.//基于数据库的周期性任务启动

redis://:password@hostname:port/db_number,

BROKER_URL = "redis://localhost:6379/0" //这个没有密码

broker就是消息hub的意思,一定要有消息的publisher / subscriber。

测试中特别要注意。

>>> os.chdir("F:\\xiaobanma\\PycharmProjects\\demo\\proj")

>>> os.getcwd()

'F:\\xiaobanma\\PycharmProjects\\demo\\proj'

>>>

>>> from tasks import add //如果不先切换python shell目录到tasks.py目录,则找不到,no module tasks!

>>> add.delay(4,4)

<AsyncResult: 4e97898b-c41b-426f-a56e-b1032fb2bdcd>

>>> 

#celery -A periodic_task beat -l info //派出工单

#celery -A periodic_task worker -l info //收单执行

# 指定worker_pullorder去处理队列queue_pullorder的任务

python manage.py celery worker -E -l INFO -n worker_pullorder -Q queue_pullorder

# 指定worker_collect去处理队列for_task_collect的任务

python manage.py celery worker -E -l INFO -n worker_distributer -Q queue_distributer

投入到指定的队列用:add.delay(1, 3, queue='queue_add1') 

celery -A myapp beat -l info
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: