1 基本原理

Celery 是一个异步任务队列,能够将耗时操作(如发邮件、处理图片、网络爬虫等)从 Django 主线程中分离出来,由后台的 worker 处理,避免阻塞请求。Celery 作为独立运行的后台进程(Worker),持续监听消息队列(Broker),接收并处理任务,而 Django 主线程用于发布任务。

Celery 是一种独立于 Django 的程序,通常需手动启动。它通过 Redis 等工具与主线程进行通信,并将任务进度和结果写入数据库,方便后续追溯。

基本流如下:

1
2
3
4
5
6
7
8
9
graph TD;
A[用户请求] --> B[Django Backend]
B --> C{是否耗时任务?}
C -->|否| D[直接处理]
C -->|是| E[Celery Task]
E --> F[Redis/RabbitMQ]
F --> G[Celery Workers]
G --> H[数据库]
B --> H

1.1 消息中间件 Broker

Django 和 Celery 通过消息中间件(Broker)进行通信,最常用的选择是 Redis 或 RabbitMQ,因为它们速度快且稳定。如果可以接受安装 Redis(在本地启动只需一条命令),建议直接使用它。当然,还可以考虑其他替代方案。

方案 适合场景 是否推荐
Redis 作为 Broker 正式项目 ✅ 推荐
数据库作为 Broker 简化部署,任务少 ⚠️ 勉强可用
memory:// 开发测试 🧪 临时用
threading / async 极简功能 🔧 替代品

以批处理为例,时序图如下:

1
2
3
4
5
6
7
8

sequenceDiagram
Client->>API: POST /batch-process/
API->>Celery: process_batch_files.delay()
API->>Client: Return task_id
Client->>API: GET /task-status/{task_id}
API->>Client: Return status
Note over Client,API: 重复查询直到任务完成

2 memory 方式实现

为了简化步骤,我先测试了 memory 方法。

2.1 安装 Celery

1
2
$ pip install celery

2.2 加入项目

  • 创建 myproject/my_celery.py 文件,内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject') # 注意改成自己的 myproject 名

app.conf.broker_url = 'memory://'
app.conf.result_backend = 'cache+memory://'

app.autodiscover_tasks()

  • myproject/__init__.py 里加入:
1
2
3
from .my_celery import app as celery_app  
__all__ = ['celery_app']`

2.3 异步功能函数

添加异步功能:在应用的 tasks.py 中定义任务:

1
2
3
4
5
6
7
from celery import shared_task

@shared_task
def add(x, y):
print(f"Adding {x} + {y}")
return x + y

调用异步功能:

1
2
3
4
from myapp.tasks import add

add.delay(2, 3)

注意:由于 memory 维护不佳,建议直接使用 Redis。

3 redis 方式实现

3.1 使用 docker 方式安装 redis

1
2
docker run -d --name redis-dev -p 6379:6379 redis

3.2 安装 redis 包

1
2
pip install redis

3.3 修改代码

替换设置部分:

1
2
3
4
5
6
7
8
9
app.conf.update(
broker_url='redis://localhost:6379/0', # 如果在docker中使用,需要转换成宿主机ip
result_backend='redis://localhost:6379/1',
task_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
enable_utc=True,
)

3.4 启动 celery

一般情况下,需要手动启动 celery 进程

1
2
celery -A your_project worker --loglevel=info