Celery是一个Python任务队列系统,用于处理跨线程或网络节点的工作任务分配。它使异步任务管理变得容易。
您的应用程序只需要将消息推送到像RabbitMQ这样的代理,Celery worker会弹出它们并安排任务执行。
celery 的5个角色
1.django环境v2.1.2
2.安装celery版本
pip install celery==3.1.26.post2
3.安装django-celery包
pip install django-celery==3.3.1
Broker(RabbitMQ) 负责创建任务队列,根据一些路由规则将任务分派到任务队列,然后将任务从任务队列交付给 worker
先使用docker 搭建RabbitMQ 环境,rabbitMQ 镜像仓库地址 https://hub.docker.com/_/rabbitmq找带有 mangement的版本,会带web后台管理界面
下载 3.8.0-management 镜像
docker pull rabbitmq:3.8.0-management
启动容器,设置账号 admin 和密码 123456
docker run -d --name rabbitmq3.8 -p 5672:5672 -p 15672:15672 --hostname myRabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.0-management
宿主机需开放 5672 和 15672 这 2 个端口,5672 是后端接口访问的端口,15672 是前端 web 管理后台页面地址,输入http://ip:15672
可以访问 web 网站
输入前面设置的账号 admin 和密码 123456 可以直接登录
要在 Django 项目中使用 Celery,您必须首先定义 Celery 库的一个实例(称为“应用程序”)
如果你有一个现代的 Django 项目布局,比如:
- proj/ - manage.py - proj/ - __init__.py - settings.py - urls.py
那么推荐的方法是创建一个新的proj/proj/celery.py模块来定义 Celery 实例:
import os from celery import Celery # Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django apps. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}')
其中debug_task是测试的任务,可以注掉
# @app.task(bind=True) # def debug_task(self): # print('Request: {0!r}'.format(self.request))
上面一段只需改这句,'proj’是自己django项目的app名称
app = Celery('proj')
然后你需要在你的proj/proj/__init__.py
模块中导入这个应用程序。这确保在 Django 启动时加载应用程序,以便@shared_task装饰器(稍后提到)将使用它:
# This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ('celery_app',)
上面这段固定的,不用改
在app下新建tasks.py,必须要是tasks.py文件名称,django会自动查找到app下的该文件
@shared_task def add(x, y): print("task----------1111111111111111111111") return x + y @shared_task def mul(x, y): return x * y
tasks.py可以写任务函数add、mul,让它生效的最直接的方法就是添加app.task 或shared_task 这个装饰器
setting.py添加配置
# RabbitMQ配置BROKER_URL 和backend BROKER_URL = 'amqp://admin:123456@192.168.1.11:5672//' CELERY_RESULT_BACKEND = 'rpc://' CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_ACCEPT_CONTENT=['json'] CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True
views.py创建视图
from .tasks import add, mul def task_demo(request): res = add.delay(10, 20) print(res.task_id) # 返回task_id return JsonResponse({"code": 0, "res": res.task_id})
前面pip已经安装过celery应用了,celery是一个独立的应用,可以启动worker
celery -A MyDjango worker -l info
其中MyDjango是你自己的django项目名称
运行日志
-------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: yoyo:0x1ea1a96e9b0 - ** ---------- .> transport: amqp://admin:**@192.168.1.11:5672// - ** ---------- .> results: rpc:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . yoyo.tasks.add . yoyo.tasks.mul [2021-10-18 22:45:03,155: INFO/MainProcess] Connected to amqp://admin:**@192.168.1.11:5672// [2021-10-18 22:45:03,347: INFO/MainProcess] mingle: searching for neighbors [2021-10-18 22:45:04,897: INFO/MainProcess] mingle: all alone [2021-10-18 22:45:05,406: WARNING/MainProcess] e:\python36\lib\site-packages\celery\fixups\django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2021-10-18 22:45:05,407: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.
运行的时候,当我们看到" Connected to amqp"说明已经连接成功了!
在django shell交互环境调试运行任务
D:\202107django\MyDjango>python manage.py shell Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. (InteractiveConsole) >>> from yoyo.tasks import add,mul >>> from celery.result import AsyncResult >>> >>> res = add.delay(11, 12) >>> res <AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1> >>> res.status 'SUCCESS' >>> >>> res.backend <celery.backends.redis.RedisBackend object at 0x0000015E011C3128> >>> >>> res.task_id 'c5ff83a4-4840-4b36-8869-5ce6081904f1' >>> >>> >>> get_task = AsyncResult(id=res.task_id) >>> get_task <AsyncResult: c5ff83a4-4840-4b36-8869-5ce6081904f1> >>> get_task.result 23 >>>
res.status是查看任务状态
res.task_id 是获取任务的id
res.result 获取任务结果
根据任务的id查询任务的执行结果AsyncResult(id=res.task_id).result获取