您的位置:首页 > 运维架构 > Docker

在Docker容器中配置和运行Django + Celery

2018-02-28 22:15 561 查看
阅读完这篇博文后,你将能够使用Django,PostgreSQL,Redis和RabbitMQ配置Celery,然后在Docker容器中运行它们。

今天,你将学习如何建立一个分布式任务处理系统来快速建立原型。你将使用Django,PostgreSQL,Redis和RabbitMQ配置Celery,然后在Docker容器中运行它们。阅读本教程前你需要一些Docker的实践知识,你可以在我以前的帖子中找到这些知识

Django是一个非常著名的Python Web框架,Celery是一个分布式任务队列。您将使用PostgreSQL作为常规数据库来存储作业,RabbitMQ作为消息代理,并将Redis作为任务存储后端。

动机

当你构建一个Web应用程序时,你迟早要实现某种离线任务处理。

例如:

Alice希望将她的猫的照片从.jpg转换为.png,或者从她的.jpg文件集中创建.pdf。 在一个HTTP请求中执行这些任务中的任何一个都需要很长时间才能执行完,并且会给Web服务器带来不必要的负担 - 这意味着我们无法同时处理其他请求。 常见的解决方案是在后台执行任务 - 通常在另一台机器上 - 并轮询结果。


离线任务处理的简单设置可能如下所示:

1. Alice上传图片。
2. Web服务器安排worker的任务。
3. worker获得任务并转换照片。
4. worker创建一些任务的处理结果(在这种情况下,结果是转换后的照片)。
5. Web浏览器查询结果。
6. Web浏览器从服务器获取结果。


这个设置看起来很清晰,但它有一个严重的缺陷 - 它不能很好地扩展。如果Alice有很多猫的图片,一台服务器不足以一次性处理它们呢?或者,如果还有其他一些非常大的工作,并且所有其他工作都被它阻塞?她是否在乎是否所有图片都一次性处理?如果处理在某些时候失败会怎么样?

坦率地说,有一种解决方案不会在每次获得更大图片时都会让机器死机。在Web服务器和worker之间你需要的一些东西:broker。Web服务器将通过与broker进行通信来安排新任务,broker将与worker通信以实际执行这些任务。你可能还想缓冲任务,如果失败则重试,并监视其中有多少任务已处理。

你将不得不为不同优先级的任务创建队列,或者为适合不同类型工作的任务创建队列。

所有这些都可以通过使用Celery–一个开源的分布式任务队列来大大简化。

如何建立Celery

Celery包括:

任务,如你在应用中定义的

将任务路由到worker和队列的broker

做实际工作的worker

存储后端

你可以在这里观看对Celery更深入的介绍或直接跳到Celery的入门指南

你的设置

从标准的Django项目结构开始。它可以使用django-admin创建,通过在shell中运行:

$ django-admin startproject myproject


这创建了一个项目结构:

.
└── myproject
├── manage.py
└── myproject
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py


在本教程结束时,它看起来像这样:

.
├── Dockerfile
├── docker-compose.yml
├── myproject
│   ├── manage.py
│   └── myproject
│       ├── celeryconf.py
│       ├── __init__.py
│       ├── models.py
│       ├── serializers.py
│       ├── settings.py
│       ├── tasks.py
│       ├── urls.py
│       ├── views.py
│       └── wsgi.py
├── requirements.txt
├── run_celery.sh
└── run_web.sh


创建Docker容器

由于我们正在使用Docker 1.12,因此我们需要一个正确的
Dockerfile
文件来指定如何构建镜像。

自定义容器

Dockerfile

# use base python image with python 2.7
FROM python:2.7

# add requirements.txt to the image
ADD requirements.txt /app/requirements.txt

# set working directory to /app/
WORKDIR /app/

# install python dependencies
RUN pip install -r requirements.txt

# create unprivileged user
RUN adduser --disabled-password --gecos '' myuser


我们的python依赖是:

requirements.txt

Django==1.9.8
celery==3.1.20
djangorestframework==3.3.1
psycopg2==2.5.3
redis==2.10.5


我已经冻结了版本的依赖关系(指定了版本号),以确保你将有一个能正常工作的设置。如果你愿意,你可以更新它们中的任何一个版本,但不保证能工作。

选择服务的镜像

现在我们只需要设置RabbitMQ,PostgreSQL和Redis。由于Docker推出了它们的官方库,我尽可能使用官方镜像。但是,这些可能有时会被打破。当发生这种情况时,你将不得不使用别的东西(非官方镜像)。

这里是我为这个项目测试并选择的镜像:

官方的PostgreSQL镜像

官方Redis镜像

官方RabbitMQ镜像

使用docker-compose设置多容器应用程序

现在你将使用docker-compose将你自己的容器与我们在上一节中选择的容器组合起来。

docker-compose.yml

version: '2'

services:
# PostgreSQL database
db:
image: postgres:9.4
hostname: db
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
ports:
- "5432:5432"

# Redis
redis:
image: redis:2.8.19
hostname: redis

# RabbitMQ
rabbit:
hostname: rabbit
image: rabbitmq:3.6.0
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=mypass
ports:
- "5672:5672"  # we forward this port because it's useful for debugging
- "15672:15672"  # here, we can access rabbitmq management plugin

# Django web server
web:
build:
context: .
dockerfile: Dockerfile
hostname: web
command: ./run_web.sh
volumes:
- .:/app  # mount current directory inside container
ports:
- "8000:8000"
# set up links so that web knows about db, rabbit and redis
links:
- db
- rabbit
- redis
depends_on:
- db

# Celery worker
worker:
build:
context: .
dockerfile: Dockerfile
command: ./run_celery.sh
volumes:
- .:/app
links:
- db
- rabbit
- redis
depends_on:
- rabbit


配置Web服务器和worker

你可能已经注意到,worker和web服务器都会运行一些启动脚本。在这里他们是(确保它们是可执行的):

run_web.sh

#!/bin/sh

# wait for PSQL server to start
sleep 10

cd myproject
# prepare init migration
su -m myuser -c "python manage.py makemigrations myproject"
# migrate db, so we have the latest db schema
su -m myuser -c "python manage.py migrate"
# start development server on public ip interface, on port 8000
su -m myuser -c "python manage.py runserver 0.0.0.0:8000"


run_celery.sh

#!/bin/sh

# wait for RabbitMQ server to start
sleep 10

cd myproject
# run Celery worker for our project myproject with Celery configuration stored in Celeryconf
su -m myuser -c "celery worker -A myproject.celeryconf -Q default -n default@%h"


第一个脚本 - run_web.sh - 将迁移数据库并在端口8000上启动Django开发服务器。

第二个脚本 run_celery.sh 将启动一个Celery worker监听队列默认值。

在这个阶段,这些脚本将无法正常工作,因为我们还没有配置它们。我们的应用程序仍然不知道我们想使用PostgreSQL作为数据库,或者在哪里找到它(在某个容器中)。我们还必须配置Redis和RabbitMQ。

但在开始之前,有一些有用的Celery设置可以使您的系统更好地运行。以下是这个Django应用程序的完整设置。

myproject/settings.py

import os

from kombu import Exchange, Queue

BASE_DIR = os.path.dirname(os.path.dirname(__file__))

# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = 'megg_yej86ln@xao^+)it4e&ueu#!4tl9p1h%2sjr7ey0)m25f'

# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
TEMPLATE_DEBUG = True
ALLOWED_HOSTS = []

# Application definition

INSTALLED_APPS = (
'rest_framework',
'myproject',
'django.contrib.sites',
'django.contrib.staticfiles',

# required by Django 1.9
'django.contrib.auth',
'django.contrib.contenttypes',

)

MIDDLEWARE_CLASSES = (
)

REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': ('rest_framework.permissions.AllowAny',),
'PAGINATE_BY': 10
}

ROOT_URLCONF = 'myproject.urls'

WSGI_APPLICATION = 'myproject.wsgi.application'

# Localization ant timezone settings

TIME_ZONE = 'UTC'
USE_TZ = True

CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"

LANGUAGE_CODE = 'en-us'
USE_I18N = True
USE_L10N = True

# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.7/howto/static-files/ STATIC_URL = '/static/'

# Database Condocker-composeuration
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': os.environ.get('DB_ENV_DB', 'postgres'),
'USER': os.environ.get('DB_ENV_POSTGRES_USER', 'postgres'),
'PASSWORD': os.environ.get('DB_ENV_POSTGRES_PASSWORD', 'postgres'),
'HOST': os.environ.get('DB_PORT_5432_TCP_ADDR', 'db'),
'PORT': os.environ.get('DB_PORT_5432_TCP_PORT', ''),
},
}

# Redis

REDIS_PORT = 6379
REDIS_DB = 0
REDIS_HOST = os.environ.get('REDIS_PORT_6379_TCP_ADDR', 'redis')

RABBIT_HOSTNAME = os.environ.get('RABBIT_PORT_5672_TCP', 'rabbit')

if RABBIT_HOSTNAME.startswith('tcp://'):
RABBIT_HOSTNAME = RABBIT_HOSTNAME.split('//')[1]

BROKER_URL = os.environ.get('BROKER_URL',
'')
if not BROKER_URL:
BROKER_URL = 'amqp://{user}:{password}@{hostname}/{vhost}/'.format(
user=os.environ.get('RABBIT_ENV_USER', 'admin'),
password=os.environ.get('RABBIT_ENV_RABBITMQ_PASS', 'mypass'),
hostname=RABBIT_HOSTNAME,
vhost=os.environ.get('RABBIT_ENV_VHOST', ''))

# We don't want to have dead connections stored on rabbitmq, so we have to negotiate using heartbeats
BROKER_HEARTBEAT = '?heartbeat=30'
if not BROKER_URL.endswith(BROKER_HEARTBEAT):
BROKER_URL += BROKER_HEARTBEAT

BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 10

# Celery configuration

# configure queues, currently we have only one
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
)

# Sensible settings for celery
CELERY_ALWAYS_EAGER = False
CELERY_ACKS_LATE = True
CELERY_TASK_PUBLISH_RETRY = True
CELERY_DISABLE_RATE_LIMITS = False

# By default we will ignore result
# If you want to see results and try out tasks interactively, change it to False
# Or change this setting on tasks level
CELERY_IGNORE_RESULT = True
CELERY_SEND_TASK_ERROR_EMAILS = False
CELERY_TASK_RESULT_EXPIRES = 600

# Set redis as celery result backend
CELERY_RESULT_BACKEND = 'redis://%s:%d/%d' % (REDIS_HOST, REDIS_PORT, REDIS_DB)
CELERY_REDIS_MAX_CONNECTIONS = 1

# Don't use pickle as serializer, json is much safer
CELERY_TASK_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ['application/json']

CELERYD_HIJACK_ROOT_LOGGER = False
CELERYD_PREFETCH_MULTIPLIER = 1
CELERYD_MAX_TASKS_PER_CHILD = 1000


这些设置将配置Django应用程序,以便它能够发现PostgreSQL数据库,Redis缓存和Celery。

现在,是时候将Celery连接到应用程序。创建一个文件celeryconf.py并粘贴这段代码:

myproject/celeryconf.py

import os

from celery import Celery
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

app = Celery('myproject')

CELERY_TIMEZONE = 'UTC'

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


这应该足以将Celery连接到我们的应用程序,因此run_X脚本将起作用。你可以在这里阅读更多关于Django和Celery的第一步。

定义任务

Celery在每个Django应用程序的tasks.py文件中查找任务。通常,任务是使用装饰器或通过继承Celery Task Class创建的。

以下是如何使用装饰器创建任务:

@app.task
def power(n):
"""Return 2 to the n'th power"""
return 2 ** n


以下是如何通过继承Celery任务类来创建任务:

class PowerTask(app.Task):
def run(self, n):
"""Return 2 to the n'th power"""
return 2 ** n


两者都很好,适合稍微不同的用例。

myproject/tasks.py

from functools import wraps

from myproject.celeryconf import app
from .models import Job

# decorator to avoid code duplication

def update_job(fn):
"""Decorator that will update Job with result of the function"""

# wraps will make the name and docstring of fn available for introspection
@wraps(fn)
def wrapper(job_id, *args, **kwargs):
job = Job.objects.get(id=job_id)
job.status = 'started'
job.save()
try:
# execute the function fn
result = fn(*args, **kwargs)
job.result = result
job.status = 'finished'
job.save()
except:
job.result = None
job.status = 'failed'
job.save()
return wrapper

# two simple numerical tasks that can be computationally intensive

@app.task
@update_job
def power(n):
"""Return 2 to the n'th power"""
return 2 ** n

@app.task
@update_job
def fib(n):
"""Return the n'th Fibonacci number.
"""
if n < 0:
raise ValueError("Fibonacci numbers are only defined for n >= 0.")
return _fib(n)

def _fib(n):
if n == 0 or n == 1:
return n
else:
return _fib(n - 1) + _fib(n - 2)

# mapping from names to tasks

TASK_MAPPING = {
'power': power,
'fibonacci': fib
}


为调度任务构建API

如果你的系统中有任务,你如何运行它们呢?在本节中,你将创建用于作业调度的用户界面。在后端应用程序中,API将是你的用户接口。我们为你的API 使用Django REST框架。

为了使其尽可能简单,你的应用程序将只有一个模型并且只有一个ViewSet(具有许多HTTP方法的端点)。

*myproject/models.py中创建名为Job的模型:

from django.db import models

class Job(models.Model):
"""Class describing a computational job"""

# currently, available types of job are:
TYPES = (
('fibonacci', 'fibonacci'),
('power', 'power'),
)

# list of statuses that job can have
STATUSES = (
('pending', 'pending'),
('started', 'started'),
('finished', 'finished'),
('failed', 'failed'),
)

type = models.CharField(choices=TYPES, max_length=20)
status = models.CharField(choices=STATUSES, max_length=20)

created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
argument = models.PositiveIntegerField()
result = models.IntegerField(null=True)

def save(self, *args, **kwargs):
"""Save model and if job is in pending state, schedule it"""
super(Job, self).save(*args, **kwargs)
if self.status == 'pending':
from .tasks import TASK_MAPPING
task = TASK_MAPPING[self.type]
task.delay(job_id=self.id, n=self.argument)


然后创建一个序列化器,视图和URL配置来访问它。

myproject/serializers.py

from rest_framework import serializers

from .models import Job

class JobSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = Job


myproject/views.py

from rest_framework import mixins, viewsets

from .models import Job
from .serializers import JobSerializer

class JobViewSet(mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet):
"""
API endpoint that allows jobs to be viewed or created.
"""
queryset = Job.objects.all()
serializer_class = JobSerializer


myproject/urls.py

from django.conf.urls import url, include
from rest_framework import routers

from myproject import views

router = routers.DefaultRouter()
# register job endpoint in the router
router.register(r'jobs', views.JobViewSet)

# Wire up our API using automatic URL routing.
# Additionally, we include login URLs for the browsable API.
urlpatterns = [
url(r'^', include(router.urls)),
url(r'^api-auth/', include('rest_framework.urls', namespace='rest_framework'))
]


为了完整性,还有myproject/wsgi.py,为项目定义WSGI配置:

import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

from django.core.wsgi import get_wsgi_application
application = get_wsgi_application()


manage.py

#!/usr/bin/env python
import os
import sys

if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")

from django.core.management import execute_from_command_line

execute_from_command_line(sys.argv)


_init_.py留空。

就这样。呃…很多代码。幸运的是,所有的东西都在GitHub上,所以你可以把它分开。

运行设置

由于所有内容都是从Docker Compose运行的,因此在尝试启动应用程序之前,请确保已经安装了Docker和Docker Compose:

$ cd /path/to/myproject/where/is/docker-compose.yml
$ docker-compose build
$ docker-compose up


最后一个命令将启动五个不同的容器,所以开始使用你的API并在此期间与Celery一起愉快玩耍。

访问API

在浏览器中导航到
127.0.0.1:8000
以浏览你的API并安排一些工作。

扩大规模

目前,我们只有每个容器的一个实例。我们可以通过
docker-compose ps
命令获取有关我们的一组容器的信息。

$ docker-compose ps
Name                          Command               State                                        Ports
------------------------------------------------------------------------------------------------------------------------------------------------------
dockerdjangocelery_db_1       /docker-entrypoint.sh postgres   Up      0.0.0.0:5432->5432/tcp
dockerdjangocelery_rabbit_1   /docker-entrypoint.sh rabb ...   Up      0.0.0.0:15672->15672/tcp, 25672/tcp, 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp
dockerdjangocelery_redis_1    /entrypoint.sh redis-server      Up      6379/tcp
dockerdjangocelery_web_1      ./run_web.sh                     Up      0.0.0.0:8000->8000/tcp
dockerdjangocelery_worker_1   ./run_celery.sh                  Up


使用docker-compose扩展容器非常容易。 只需将容器名称和容器数量使用
docker-compose scale
命令即可:

$ docker-compose scale worker=5
Creating and starting dockerdjangocelery_worker_2 ... done
Creating and starting dockerdjangocelery_worker_3 ... done
Creating and starting dockerdjangocelery_worker_4 ... done
Creating and starting dockerdjangocelery_worker_5 ... done


输出表明docker-compose为我们创建了另外四个worker容器。 我们可以再次用
docker-compose ps
命令查看它:

$ docker-compose ps
Name                          Command               State                                        Ports
------------------------------------------------------------------------------------------------------------------------------------------------------
dockerdjangocelery_db_1       /docker-entrypoint.sh postgres   Up      0.0.0.0:5432->5432/tcp
dockerdjangocelery_rabbit_1   /docker-entrypoint.sh rabb ...   Up      0.0.0.0:15672->15672/tcp, 25672/tcp, 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp
dockerdjangocelery_redis_1    /entrypoint.sh redis-server      Up      6379/tcp
dockerdjangocelery_web_1      ./run_web.sh                     Up      0.0.0.0:8000->8000/tcp
dockerdjangocelery_worker_1   ./run_celery.sh                  Up
dockerdjangocelery_worker_2   ./run_celery.sh                  Up
dockerdjangocelery_worker_3   ./run_celery.sh                  Up
dockerdjangocelery_worker_4   ./run_celery.sh                  Up
dockerdjangocelery_worker_5   ./run_celery.sh                  Up


你会看到那里有五位强大的Celery worker。太好了!

总结

恭喜!您只需将Django与Celery结合即可构建分布式异步计算系统。我认为你会同意建立一个API非常容易,甚至更容易扩展它的工作人员!然而,生活对我们来说并不总是那么好,有时我们需要排除故障。

贡献

Justyna Ilczuk撰写的原创文章,由MichałKobus更新。

译自: https://blog.syncano.io/configuring-running-django-celery-docker-containers-pt-1/

github: https://github.com/Syncano/docker-django-celery
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: