Django集成Celery实现异步任务

什么是Celery

我之前在前面简单的介绍过什么是Celery:分布式任务队列-Celery 、感兴趣的小伙伴可以去看看;今天我们来看看Django如何通过Celery实现异步任务。异步任务在很多业务场景都有使用到、比如我们在后台发送邮件的时候、发送邮件的时间会比较长一点;还有在后台去做作业处理、做大量计算甚至做深度学习以及海量数据的处理都会用到异步任务;在开始之前我们先把Celery的环境跑起来。

Celery官方文档:https://docs.celeryproject.org/en/latest/

Celery Github地址:https://github.com/celery/celery

我们参考上面的官方文档、首先把Celery安装一下;安装完成之后我们需要去安装各种依赖的包、比如我们这里我们需要用到redis、auth鉴权、msgpack等依赖、都需要进行安装;当然Celery还支持很多的依赖、感兴趣的小伙伴可以自行去查看官方文档。

# 安装Celery
pip install -U Celery

# 安装Celery的依赖
pip install "celery[librabbitmq]"
# 这里的消息代理我们不需要librabbitmq、我们以redis为例;当然你也可以选择其他的消息代理
pip install "celery[redis,auth,msgpack]"

安装完成以后我们就可以在项目中来创建一个Celery的任务了、我们在项目的根目录下面创建一个celery目录;在celery目录里面创建一个 tasks.py 的 Celery APP、tasks.py里面的内容如下:

from celery import Celery

# Celery的APP名称叫做tasks、传入backend存储(异步处理的结果存储在backend里面)、broker是任务处理的代理(其实里面也是一个消息队列)
app = Celery('tasks', backend='redis://172.16.200.110', broker='redis://172.16.200.110')
# 注:这里的backend可以是数据库、可以是Redis、也可以是RPC的消息队列等。

# 这里我们定义一个异步任务task
@app.task
def add(x, y):
    return x + y

Celery任务创建完成以后我们通过下面的命令把Celery任务启动起来:

celery -A tasks worker --loglevel=info
celery --app tasks worker --loglevel=info

注:这里我们需要进入上面创建的celery目录来执行上面的命令;-A 是指 APP 的意思(上面的两个命令是等同的)。

image-20210120210241039

从上面的截图我们可以看到Celery服务已经正常启动了、transport 是 redis;results结果也在redis里面。现在我们去celery目录里面添加一个运行任务的脚本 run_task.py 、具体代码如下:

# 导入add方法
from tasks import add

# 执行add调用delay方法
result = add.delay(4, 4)
# 打印task运行结果(是否已经执行完成)
print('Is task Ready: %s' % result.ready())

# 从结果里面取运行结果
run_result = result.get(timeout=1)
print('Task Result: %s' % run_result)

注:这里任务提交以后会有一个ready的状态、如果一秒钟执行完成以后就直接取到结果;如果超过一秒钟就会超时、如果超时就返回一个超时状态。

我们通过下面的命令来运行上面的脚本(进入项目虚拟环境并进入celery目录下面执行):

python run_task.py

image-20210120211408184

命令执行完成以后我们可以在控制台看到、celery接收到了一个新的任务并成功执行完成。其实我们在前面通过backend把异步任务处理结果存在redis里面、我们也可以去redis里面查看存储的结果。当然、这里我们还有更简单的方法可以查看celery的存储结果。celery官方建议我们通过监控来查看任务状态、当然官方也提供了很多监控方案、比如:命令行、Flower等等。这里我们采用 Flower 来进行监控、Flower 可以看到任务的执行进展、任务的执行历史、执行结果;还可以做远程控制等。我们先通过下面的命令来安装 Flower :

# 使用pip安装Flower
pip install flower

# 运行flower命令将启动您可以访问的Web服务器
celery -A proj flower

# 默认端口为http:// localhost:5555,但是您可以使用–port参数更改此 端口
celery -A proj flower --port=5555

# 代理URL也可以通过以下--broker参数传递
celery flower --broker=redis://172.16.200.110:6379/0

# 我们使用一条命令执行上面的过程
celery -A tasks flower --broker=redis://172.16.200.110:6379/0

image-20210120225447300

启动Flower之后我们使用 http://127.0.0.1:5555 链接去访问Flower,我们可以看到此时Celery已经正常启动、但是一个任务都没有;如下图:

image-20210120225617396

我们再次运行一下上面的 run_tasks.py 任务、但是执行完成以后Celery总是报错、故障代码为:ValueError: not enough values to unpack (expected 3, got 0)

image-20210120230442693

这里我们需要先去安装一个扩展插件:eventlet、然后在启动 worker 的时候加上一个参数 -P eventlet、如下:

pip install eventlet -i https://pypi.douban.com/simple
celery -A tasks worker -l info -P eventlet

然后我们重新启动Celery、然后再次执行 run_tasks.py 任务、可以看到任务已经可以正常执行、我们去Flower里面查看一下Celery的执行情况:

image-20210120230849301

我们可以看到两个任务、一个任务执行失败、就是我们之前报错的那个任务;还有一个任务执行成功就是后来执行的任务。我们还可以点击UUID下面的链接查看Celery的详细信息:

image-20210120230944459

Django集成 Celery

在开始之前我们先来看看我们的项目目录、如下:

D:.
│  db.sqlite3
│  manage.py
│  test data.csv
│
├─chpa_data
│  │  admin.py
│  │  apps.py
│  │  dingtalk.py
│  │  middleware.py
│  │  models.py
│  │  tasks.py
│  │  urls.py
│  │  views.py
│  │  __init__.py
│  │
│  ├─migrations
│  │  │  0001_initial.py
│  │  │  0002_shirtsales.py
│  │  │  0003_auto_20210107_2020.py
│  │  │  __init__.py
│
├─datasite
│  │  asgi.py
│  │  celery.py
│  │  settings.py
│  │  urls.py
│  │  wsgi.py
│  │  __init__.py
│
├─logs
│      admin.info.log
│      performance.info.log
│
├─templates
│      base.html
│      home.html
│      pie-chart.html
│      __init__.py

Celery4.0版本以后可以支持Django1.8及以上版本、Django集成Celery不需要再安装额外的Package;首先我们需要去settings.py的同级目录下创建一个celery.py的脚本文件、并插入如下内容:

# coding:utf-8
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# 指定Django默认配置文件、这里我们把Celery相关配置文件放在Django项目的settings.py里面
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'datasite.settings')

# 创建Celery实例;这里建议指定broker、不指定broker容易出现错误
app = Celery('datasite', broker='redis://172.16.200.110:6379/0')

# 指定从django的settings.py里读取celery配置
app.config_from_object('django.conf:settings')

# 自动从所有已注册的django app中加载任务
app.autodiscover_tasks()


# 用于测试的异步任务
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

然后我们需要把Celery实例导入到settings.py同级目录下的__init__.py文件里面、这样可以保证在应用启动的时候可以加载Celery的环境、内容如下:

# proj/proj/__init__.py:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from __future__ import absolute_import

from .celery import app as celery_app

__all__ = ('celery_app',)

新版原生的Celery已经支持Django了,不需要再借助 django-celery和celery-with-redis 这种第三方库了;配置参数名也由大写变成了小写,无需再加CELERY前缀。另外当我们通过 app = Celery(‘myproject’) 创建Celery实例时如果不指定Broker,很容易出现[ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused 错误。

现在我们就可以去系统的 settings.py 配置文件里面把 Celery 的相关配置添加上了、具体如下:

# Celery相关配置
# 配置Celery时区,默认时UTC
if USE_TZ:
    timezone = TIME_ZONE

# Celery配置redis作为broker消息代理
broker_url = 'redis://172.16.200.110:6379/0'

# Celery结果存储到Redis
result_backend = 'redis://172.16.200.110:6379/0'

# 可接受的内容格式
accept_content = ["json"]
# 任务序列化数据格式
task_serializer = "json"
# 结果序列化数据格式
result_serializer = "json"

# 可选参数:给某个任务限流
# task_annotations = {'tasks.my_task': {'rate_limit': '10/s'}}
# 可选参数:给任务设置超时时间。超时立即中止worker
# task_time_limit = 10 * 60
# 可选参数:给任务设置软超时时间,超时抛出Exception
# task_soft_time_limit = 10 * 60
# 可选参数:如果使用django_celery_beat进行定时任务
# beat_scheduler = "django_celery_beat.schedulers:DatabaseScheduler"

注:关于Celery的更多配置内容详见官方文档:https://docs.celeryproject.org/en/stable/userguide/configuration.html

现在我们来通过钉钉实现一个消息发送的功能、在这个功能里面我们把 Celery 集成进去;因为一般涉及到IO及网络操作的地方、往往就是系统的瓶颈;所以我们通过下面的代码把发送消息做成异步任务在 Celery 里面执行;在开始之前我们先把钉钉插件集成进来:

# 安装钉钉插件
pip install DingtalkChatbot -i https://pypi.douban.com/simple

# 在 chpa_data APP 目录下面创建一个dingtalk.py文件、并插入如下内容
# coding=utf-8
from dingtalkchatbot.chatbot import DingtalkChatbot

from django.conf import settings


def send(message, at_mobiles=[]):
    # 引用 settings里面配置的钉钉群消息通知的WebHook地址:
    webhook = settings.DINGTALK_WEB_HOOK

    # 初始化机器人小丁, # 方式一:通常初始化方式
    xiaoding = DingtalkChatbot(webhook)

    # 方式二:勾选“加签”选项时使用(v1.5以上新功能)
    # xiaoding = DingtalkChatbot(webhook, secret=secret)

    # Text消息@所有人
    xiaoding.send_text(msg=('消息通知: %s' % message), at_mobiles=at_mobiles)

# 我们把钉钉Webhook配置信息放到settings.py中
# 钉钉消息相关配置
DINGTALK_WEB_HOOK = 'https://oapi.dingtalk.com/robot/send?access_token=***'

这样我们就把钉钉集成进来了、然后我们去 chpa_data APP 目录下创建一个tasks.py任务文件:

from __future__ import absolute_import, unicode_literals

from celery import shared_task
from .dingtalk import send


@shared_task
def send_message(message):
    send(message)

现在我们就可以去views里面去引用Celery啦、当我们去访问页面的时候、我们通过钉钉发送一个简单的信息如下:

from chpa_data import tasks

def pie_chart(request):
    labels = []
    data = []
    queryset = City.objects.order_by('-population')[:1]
    for city in queryset:
        labels.append(city.name)
        data.append(city.population)
        logger.info('从数据库获取数据!')
    # 通过celery执行一个异步任务发送钉钉消息到钉钉群
    tasks.send_message.delay('从数据库获取数据!')
    return render(request, 'pie-chart.html', {
        'labels': labels,
        'data': data,
    })

注:这里我们只是发送了一个简单的信息、当然我们还可以实现一些复杂的内容;例如我们把访问者IP地址、访问者时间、访问内容等信息发送到钉钉群、感兴趣的小伙伴可以自己去实现一下。

上面的内容配置完成之后我们就可以去终端里面启动 Celery Worker 和 Flower 了:

# Linux下测试
(datasite) D:\NextCloud\EnvsProject\datasite>Celery -A datasite worker -l info

# Windows下测试
(datasite) D:\NextCloud\EnvsProject\datasite>Celery -A datasite worker -l info -P eventlet

# 启动Flower
(datasite) D:\NextCloud\EnvsProject\datasite>flower -A datasite --port=5555
# 从Celery启动
(datasite) D:\NextCloud\EnvsProject\datasite>celery flower -A datasite --address=127.0.0.1 --port=5555

image-20210124211136655

注:如果我们能看到 [tasks] 下所列异步任务清单如debug_task,以及最后一句celery@xxxx ready, 说明 redis 和 celery 都配置好了,可以开始正式工作了。

image-20210124211350066

Flower启动以后我们就可以通过 http://127.0.0.1:5555/ 访问Flower的Dashboard了;我们把项目启动起来、然后去访问 http://127.0.0.1:8080/pie-chart/ 页面;我们可以在Celery的终端里面看到、Celery已经接受到了钉钉信息发送任务并处理成功、耗时3毫秒。

image-20210124211704399

当然、我们也可以去Flower监控页面里面查看任务列表、我们可以看到这里有一个成功的任务进程;点击菜单上的 Tasks 可以看到任务列表:

image-20210124211530565

image-20210124211552332

image-20210124211602638

钉钉群也成功接收到了消息通知、说明Celery已经和Django集成成功了。

image-20210124211637457

推荐文章

3条评论

  1. Great info. Lucky me I found your website by accident (stumbleupon).
    I have saved it for later!

    Here is my blog post: quality treatment

  2. Currently it appears like Movable Type is the preferred blogging
    platform available right now. (from what I’ve read) Is that what you’re using on your blog?

  3. Excellent items from you, man. I have take into accout your
    stuff prior to and you’re just extremely wonderful.

    I actually like what you’ve got here, really
    like what you’re saying and the best way in which
    you say it. You are making it enjoyable and you continue to care
    for to keep it smart. I can not wait to read much more from
    you. That is actually a great website.

发表评论

邮箱地址不会被公开。 必填项已用*标注