Django集成Celery实现异步任务
# 环境配置
本文参考自Celery官方文档 (opens new window)
# Python包
Django==3.1.7
celery==5.0.5
django-celery-results==2.0.1
# 消息代理
RabbitMQ==3.8.14
1
2
3
4
5
6
2
3
4
5
6
# Step1 新建django项目
我的django项目结构如下
(venv) w@w-Vulcan-Series:~/PycharmProjects/beer_server$ tree -I "venv|migrations|__pycache__" -L 5
.
├── apps
│ ├── config
│ │ ├── admin.py
│ │ ├── apps.py
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── serializers.py
│ │ ├── tests.py
│ │ ├── urls.py
│ │ └── views.py
│ ├── __init__.py
│ ├── project
│ │ ├── admin.py
│ │ ├── apps.py
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── serializers.py
│ │ ├── tests.py
│ │ ├── urls.py
│ │ └── views.py
│ ├── testcase
│ │ ├── admin.py
│ │ ├── apps.py
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── serializers.py
│ │ ├── tasks.py
│ │ ├── tests.py
│ │ ├── urls.py
│ │ └── views.py
│ ├── testsuite
│ │ ├── admin.py
│ │ ├── apps.py
│ │ ├── __init__.py
│ │ ├── models.py
│ │ ├── serializers.py
│ │ ├── tests.py
│ │ ├── urls.py
│ │ └── views.py
│ └── user
│ ├── admin.py
│ ├── apps.py
│ ├── __init__.py
│ ├── models.py
│ ├── serializers.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── beer_server
│ ├── asgi.py
│ ├── celery.py
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── LICENSE
├── manage.py
├── README.md
├── requirements.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# Step2 安装消息代理服务RabbitMQ
我这里为了演示使用docker启动一个RabbitMQ容器
docker run -d --name ramq -p 5672:5672 -p 15672:15672 rabbitmq:management && docker ps
# 开启rabbitmq的mqtt协议插件(端口1883)
# docker run -d --name ramq -p 5672:5672 -p 15672:15672 -p 1883:1883 -v /home/w/rabbitmq:/var/lib/rabbitmq rabbitmq:management && docker ps
1
2
3
2
3
容器启动后,默认已经开启了RabbitMQ
的Web
管理界面服务。
# 进入容器开启RabbitMQ的Web管理界面服务
可选步骤,如果未开启Web服务则需要执行这一步骤
# 进入ramq容器
docker exec -it ramq /bin/bash
# 开启web管理服务
rabbitmq-plugins enable rabbitmq_management
1
2
3
4
2
3
4
此时访问http://127.0.0.1:15672/#/
就可以看到RabbitMQ的登录页面了,账号和密码默认都是guest
,登录后就可以看到如下页面了
# Step3 pip3安装Celery包
pip3 install -i https://pypi.douban.com/simple celery
1
# Step4 在项目settings.py同级目录下新建celery.py文件,文件内容如下
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'beer_server.settings')
app = Celery('beer_server')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# Step5 在项目settings.py同级目录下的__init__.py文件内写入如下内容
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
1
2
3
4
5
2
3
4
5
# Step6 新建一个测试任务模块:apps目录下的testcase目录下新建tasks.py文件(eg:apps/testcase/tasks.py),内容如下
from celery import shared_task
from testcase.models import TestCase
@shared_task
def add(x, y):
return x + y
@shared_task
def count_testcases():
return TestCase.objects.count()
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# Step7 安装django-celery-results包,使用Django ORM作为结果后端
# 安装django_celery_results
pip3 install -i https://pypi.douban.com/simple django-celery-results
1
# 在settings.py文件中的INSTALLED_APPS列表中添加django_celery_results
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'django_celery_results',
]
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 进行数据迁移
python3 manage.py migrate django_celery_results
1
# 然后在settings.py文件中添加如下内容
# Celery配置选项
# 配置时区,使用与django项目相同的时区设置
CELERY_TIMEZONE = TIME_ZONE
# 异步任务运行结果使用django自带的ORM来存储
CELERY_RESULT_BACKEND = 'django-db'
1
2
3
4
5
2
3
4
5
# Step8 启动Celery任务队列服务
(venv) w@w-Vulcan-Series:~/PycharmProjects/beer_server$ celery -A beer_server worker -l INFO
-------------- celery@w-Vulcan-Series v5.0.5 (singularity)
--- ***** -----
-- ******* ---- Linux-5.4.0-60-generic-x86_64-with-glibc2.29 2021-04-26 23:21:07
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: beer_server:0x7f62b67dcfa0
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results:
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. testcase.tasks.add
. testcase.tasks.count_testcases
[2021-04-26 23:21:07,357: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2021-04-26 23:21:07,367: INFO/MainProcess] mingle: searching for neighbors
[2021-04-26 23:21:08,394: INFO/MainProcess] mingle: all alone
[2021-04-26 23:21:08,410: WARNING/MainProcess] /home/w/PycharmProjects/beer_server/venv/lib/python3.8/site-packages/celery/fixups/django.py:203: UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2021-04-26 23:21:08,411: INFO/MainProcess] celery@w-Vulcan-Series ready.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Step9 使用django shell对Celery服务进行测试
(venv) w@w-Vulcan-Series:~/PycharmProjects/beer_server$ python3 manage.py shell
Python 3.8.5 (default, Jan 27 2021, 15:41:15)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
(InteractiveConsole)
>>> from testcase.tasks import count_testcases, add
>>> print(count_testcases.delay())
57bdf093-a6ff-4846-acd9-cacc83a3f513
>>> print(add.delay(10, 25))
f8125941-2887-4590-9e63-02f61e353690
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# Celery服务运行的日志如下,可以看到已经正常接收到这两个异步任务,并执行完成
[2021-04-26 23:21:08,411: INFO/MainProcess] celery@w-Vulcan-Series ready.
[2021-04-26 23:23:44,528: INFO/MainProcess] Received task: testcase.tasks.count_testcases[57bdf093-a6ff-4846-acd9-cacc83a3f513]
[2021-04-26 23:23:44,555: INFO/ForkPoolWorker-8] Task testcase.tasks.count_testcases[57bdf093-a6ff-4846-acd9-cacc83a3f513] succeeded in 0.025395925000339048s: 2
[2021-04-26 23:23:57,057: INFO/MainProcess] Received task: testcase.tasks.add[f8125941-2887-4590-9e63-02f61e353690]
[2021-04-26 23:23:57,075: INFO/ForkPoolWorker-8] Task testcase.tasks.add[f8125941-2887-4590-9e63-02f61e353690] succeeded in 0.017471159000706393s: 35
1
2
3
4
5
2
3
4
5
# 查看django_celery_results_taskresult表,可以看到运行成功的异步任务的执行记录
# end!
编辑 (opens new window)
上次更新: 2023/03/03, 15:32:27
- 01
- Python实现对字符串的加解密02-25
- 02
- Python3对大文件中指定字符进行排序再写入到新的文件10-24
- 03
- Ubuntu下配置adb环境连接Android设备进行调试08-17