Celery是一个简单,灵活,可靠的分布式系统,用于处理大量消息,同时为操作提供维护此类系统所需的工具。它是一个任务队列,专注于实时处理,同时还支持任务调度。
可以使用的场景如:
Celery 扮演生产者和消费者的角色,先了解一下什么是生产者消费者模式。
该模式还需要有一个缓冲区处于生产者和消费者之间,作为一个中介。生产者把数据放入缓冲区,而消费者从缓冲区取出数据,如下图所示:
接下来需要弄清楚几个问题,谁生产数据(Task),谁是中间件(Broker),谁来消费数据(Worker),消费完之后运行结果(backend)在哪?
看下图就很清楚了
celery 的5个角色
Celery 本身不提供队列服务,推荐用Redis或RabbitMQ实现队列服务。那么需要先安装Redis之类的中间件
docker pull redis:latest docker run -itd --name redis-test -p 6379:6379 redis
上面是没有设置密码的,设置密码用下面这句
docker run -itd --name myredis -p 6379:6379 redis --requirepass "123456" --restart=always --appendonly yes
pip 安装相关依赖包
pip install celery==3.1.26.post2 pip install redis==2.10.6
先写个最简单的demo,新建一个tasks.py文件,task任务需使用@shared_task
装饰器
from celery import Celery from celery import shared_task # 实例化,添加broker地址 app = Celery('tasks', broker='redis://ip:6379') @shared_task def add(x, y): return x + y
打开 tasks.py 所在的目录,启动 worker,-A 参数表示的是 Celery APP 的名,这里指的是 tasks.py。
worker 是一个执行任务角色,后面的 loglevel=info 记录日志类型默认是 info。
celery -A tasks worker --loglevel=info
运行结果
D:\demo\demo\aaa>celery -A tasks worker --loglevel=info [2021-10-19 09:12:01,168: WARNING/MainProcess] e:\python36\lib\site-packages\celery\apps\worker.py:161: CDeprecationWarning: Starting from version 3.2 Celery will refuse to accept pickle by default. The pickle serializer is a security concern as it may give attackers the ability to execute any command. It's important to secure your broker from unauthorized access when using pickle, so we think that enabling pickle should require a deliberate action and not be the default choice. If you depend on pickle then you should set a setting to disable this warning and to be sure that everything will continue working when you upgrade to Celery 3.2:: CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml'] You must only enable the serializers that you will actually use. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED)) -------------- celery@DESKTOP-HJ487C8 v3.1.26.post2 (Cipater) ---- **** ----- --- * *** * -- Windows-10-10.0.17134-SP0 -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x2148e024c50 - ** ---------- .> transport: redis://localhost:6379// - ** ---------- .> results: disabled:// - *** --- * --- .> concurrency: 4 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2021-10-19 09:12:01,276: INFO/MainProcess] Connected to redis://localhost:6379// [2021-10-19 09:12:01,365: INFO/MainProcess] mingle: searching for neighbors [2021-10-19 09:12:02,761: INFO/MainProcess] mingle: all alone [2021-10-19 09:12:03,313: WARNING/MainProcess] celery@DESKTOP-HJ487C8 ready.
从运行日志看到有tasks任务
[tasks] . tasks.add
看到Connected to redis
说明已经连接成功了
任务已经创建了,那么什么时候去触发这个任务呢,我们需要在代码里面去触发这个任务,接着上面代码继续写
@shared_task def add(x, y): return x + y if __name__ == '__main__': # 触发任务 res = add.delay(10, 15) print(res) print(type(res)) # AsyncResult
运行结果
7492f49b-6735-46fb-a16d-9ec24bd31e56 <class 'celery.result.AsyncResult'>
通过add任务,调用 .delay()
方法来触发一次任务,返回 AsyncResult 类,那么执行的任务结果都在 AsyncResult 类里
运行 的时候查看 worker 运行日志,可以看到已经接收到任务Received task
,每个任务会生成一个uuid的task_id,不会重复
[2021-10-19 09:24:14,356: INFO/MainProcess] Received task: tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] [2021-10-19 09:24:14,395: INFO/MainProcess] Task tasks.add[885a79ba-c87c-49f7-a23f-0824142c3c98] succeeded in 0.046999999998661224s: 25
workder 会自动监听到推过来的任务,然后执行,可以看到执行结果’succeeded ’
调用 .delay()
方法触发任务后,返回 AsyncResult 类,可以查看任务的状态,任务id和任务结果
D:\demo\demo\aaa>python Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> res = add.delay(10, 15) >>> res.task_id '6a7c8e10-7192-4865-9108-3e98596b9d37' >>> >>> res.status Traceback (most recent call last): File "<stdin>", line 1, in <module> File "E:\python36\lib\site-packages\celery\result.py", line 394, in state return self._get_task_meta()['status'] File "E:\python36\lib\site-packages\celery\result.py", line 339, in _get_task_meta return self._maybe_set_cache(self.backend.get_task_meta(self.id)) File "E:\python36\lib\site-packages\celery\backends\base.py", line 307, in get_task_meta meta = self._get_task_meta_for(task_id) AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for' >>>
任务执行后会生成一个task_id,查看任务运行状态,会发现出现异常AttributeError: ‘DisabledBackend’ object has no attribute ‘_get_task_meta_for’
这是因为任务运行的结果,需要保存到一个地方backend,但是前面实例化的时候只配置一个broker地址,并没有配置backend地址来接收运行结果
from celery import Celery from celery import shared_task # backend接收执行结果 app = Celery('tasks', broker='redis://ip:6379', backend='redis://ip:6379') @shared_task def add(x, y): return x + y
重新配置后一定要重启worker监听
celery -A tasks worker --loglevel=info
在启动日志[config]里面会看到results这一项已经配置成功
D:\demo\demo\aaa>python Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 27 2018, 03:37:03) [MSC v.1900 64 bit (AMD64)] on win32 Type "help", "copyright", "credits" or "license" for more information. >>> from tasks import add >>> res = add.delay(10,15) >>> res.task_id '5ce249c9-a15b-426a-949b-d1b94bf9f8fa' >>> >>> res.status 'SUCCESS' >>> >>> res.get() 25
常用的几个属性
当触发一个任务后,会得到一个task_id,但是我们不会一直去查询status状态去获取结果,可能会过一段时间再去看看运行结果。
那么在已经知道task_id 的情况下,如何去查询状态和结果?可以用到AsyncResult 类
from celery.result import AsyncResult res = AsyncResult(id='5ce249c9-a15b-426a-949b-d1b94bf9f8fa') print(res.state) # 'SUCCESS' print(res.get()) # 25
结合django使用,参考前面这篇https://www.cnblogs.com/yoyoketang/p/15422804.html
更多参考教程https://blog.csdn.net/u010339879/article/details/97691231
更多参考教程https://www.pianshen.com/article/2176289575/