Celery是一个简单、灵活且可靠,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。
既然Celery是一个实时的任务处理系统,那么他必然至少存在三部分,一任务发布者,二任务存储者,三任务处理者,四任务结果存储者,五任务结果消费者。
- 这个任务处理系统是独立出来帮我们完成任务的,那他一定需要一个发布任务的接口对吧。
- 任务发布上去了不是说处理就处理的,肯定需要时间等待,所以这个系统必须先将其存储起来,等到要处理的时候再去拉出来。在这里我们使用redis来存储任务
- 这个处理任务肯定也需要一个处理者来完成这个任务吧
- 处理完了任务是不是还需要返回结果?不然咱咋知道这个任务是否执行完了。在这里我们使用redis来保存任务结果。
- 任务消费者是celery提供给我们拉取任务执行结果的接口
上面可以看出,celery完全属于自己的部分只有两个,任务发布接口,任务处理器,而任务存储器和任务结果存储器都是借助的外部东西
既然他是实时的,那么这五部分应该可以独立运行。确实如此。
下面将对这几部分以简单的代码进行描述
一个最常见的使用案例,用户注册账号需要给用户邮箱确认链接,发送需要时间,用户点击也需要时间,如果采用同步执行的方式,用户在此期间什么也做不了,最常见的方式就是采用异步发送消息,后端服务器发送确认链接,用户可以进行其他操作。
celery_task01.py
import celery import time backend = 'redis://192.168.1.101:6379/1' #异步结果 broker = 'redis://192.168.1.101:6379/2' #消息中间件 # 定义celery对象,task是celery对象的名字,backend是结果存储系统,broker是任务队列存储系统 cel = celery.Celery('task', backend=backend, broker=broker) # 将执行函数封装成一个task执行单元 @cel.task def send_email(name): print('向%s发送邮件。。。' % name) time.sleep(5) print('向%s发送邮件完成' % name) return 'ok' # celery可以封装多个执行单元 @cel.task def send_msg(name): print('向%s发送短信。。。' % name) time.sleep(5) print('向%s发送短信完成' % name) return 'ok'
在这里我们首先定义了一个Celery对象,这个对象就是系统。celery系统封装了系统与任务队列存储器和任务结果存储的交互,我们只需要指定这两个存储器,它内部自会运作。然后用celery系统的装饰器封装了两个任务执行流程。至此还需要我们发布任务和拉取任务结果。
celery_produce.py
from celery_task01 import send_email,send_msg result = send_email.delay("yaowy") print(result.id) result2 = send_email.delay("bai") print(result2.id)
在这里我们必须引入我们设置好的celery系统,工作流程经过装饰以后多了delay方法,可以将参数传入函数中并且封装成一个任务对象,并且返回一个任务id,celery系统内部自会将其放入任务队列存储器。然后开启worker拉取任务并执行,然后将执行结果放入结果存储器。想要获取任务执行结果,还需要一个任务id
from celery_task01 import cel from celery.result import AsyncResult async_result=AsyncResult(id='e726337e-f529-407f-8988-ff373018e77f',app=cel) if async_result.successful(): result=async_result.get() print(result)
首先定义了一个异步拉取结果的对象,这里需要两个东西,一个是任务id,在这里也可以看出,需要先发布任务,并且获取任务id,才能获取结果,第二个是celery系统对象,你既然要拉取数据,肯定需要跟系统打交道。
- 第一步肯定要运行系统,运行系统不能直接运行python文件,而是要用命令。我这里用的是celery5.2.3
命令:
celery --app=celery_task01 worker -l info -P eventlet -E
注意:现在的博客多是用的celery3,命令为celery -A tasks worker --loglevel=info
。这个命令在我这里已经不适用了。
解释:--app指定定义系统的python文件,worker表示开启worker任务处理进程 -l 指定日志打印级别为info,-P指定任务处理模式,这里使用了协程,celery4.x及以上在Windows上运行时必须指定这个,不然会报错。-E指定打开events。
- 第二步,执行任务发布python文件,即celery_produce.py,同时在控制台复制任务id粘贴到celery_consumer.py中。
- 第三步,执行任务结果拉取python文件,即celery_consumer.py。控制台打印了ok。说明成功,一个最简单的celery系统打通了。