目录
1. Celery简介
2. 安装Celery
3. 安装RabbitMQ或Redis
3.1 安装redis(本文将以redis作为broker)
3.2 安装RabbitMQ
4. 第一个Celery程序
5. 第一个Celery工程项目
Celery是由纯python编写的,但是协议可以用任何语言实现。目前,已有Ruby实现的RCelery、Node.js实现的node-celery及一个PHP客户端,语言互通也可以通过using webhooks实现。在使用Celery之前,我们先来了解以下几个概念:
任务队列:简单来说,任务队列就是存放着任务的队列,客户端将要执行的任务的消息放入任务队列中,执行节点worker进行持续监视队列,如果有新任务,就取出来执行该任务。这种机制就像生产者消、费者模型一样,客户端作为生产者,执行节worker点作为消费者,它们之间通过任务队列进行传递,如图:
中间人(broker):Celery用于消息通信,通常使用中间人(broker)在客户端和worker之间传递,这个过程从客户端(生产者)向队列添加消息开始,之后中间人把消息派送给worker(消费者)。官方给出的实现broker的工具如下:
名称 | 状态 | 监视 | 远程控制 |
RabbitMQ | 稳定 | 是 | 是 |
Redis | 稳定 | 是 | 是 |
MongoDB | 实验性 | 是 | 是 |
Beanstalk | 实验性 | 否 | 否 |
AmazonSQS | 实验性 | 否 | 否 |
Zookeeper | 实验性 | 否 | 否 |
DjangoDB | 实验性 | 否 | 否 |
SQLAlchemy | 实验性 | 否 | 否 |
CouchDB | 实验性 | 否 | 否 |
Iron MQ | 第三方 | 否 | 否 |
提示: 在实际的使用中,推荐使用RabbitMQ或者Redis作为broker
Celery的特性:
推荐使用pip安装Celery,方式如下:
pip3 install celery # 或者,该方式安装celery时,捆绑了一组特性依赖:librabbitmq,redis,auth,msgpack pip3 install celery[librabbitmq,redis,auth,msgpack]
以下是可用的捆绑,供使用时做参考:
序列化
celery[auth]:
用于使用auth
安全序列化程序。
celery[msgpack]:
用于使用 msgpack 序列化程序。
celery[yaml]:
用于使用 yaml 序列化程序。
并发
celery[eventlet]:
用于使用eventlet池。
celery[gevent]:
使用gevent池。
传输和后端
celery[librabbitmq]:
用于使用 librabbitmq C 库。
celery[redis]:
使用 Redis 作为消息传输或结果后端。
celery[sqs]:
使用 Amazon SQS 作为消息传输(实验性)。
celery[tblib]:
用于使用该task_remote_tracebacks功能。
celery[memcache]:
使用 Memcached 作为结果后端(使用pylibmc)
celery[pymemcache]:
使用 Memcached 作为结果后端(纯 Python 实现)。
celery[cassandra]:
使用 Apache Cassandra 作为 DataStax 驱动程序的后端。
celery[couchbase]:
使用 Couchbase 作为结果后端。
celery[arangodb]:
使用 ArangoDB 作为结果后端。
celery[elasticsearch]:
使用 Elasticsearch 作为结果后端。
celery[riak]:
使用 Riak 作为结果后端。
celery[dynamodb]:
使用 AWS DynamoDB 作为结果后端。
celery[zookeeper]:
使用 Zookeeper 作为消息传输。
celery[sqlalchemy]:
使用 SQLAlchemy 作为结果后端(支持)。
celery[consul]:
使用 Consul.io 键/值存储作为消息传输或结果后端(实验性)。
celery[django]:
指定 Django 支持可能的最低版本。
使用源代码安装如下:(celery · PyPI)
# 下载源代码文件 wget https://files.pythonhosted.org/packages/66/60/2713f5be1906b81d40f823f4c30f095f7b97b9ccf3627abe1c79b1e2fd15/celery-5.1.2.tar.gz # 解压 tar zxvf celery-5.1.2.tar.gz # 进入目录 cd celery-5.1.2 # 构建 python3 setup.py build # 安装,注意权限,可在前面添加sudo python3 setup.py install
以Ubuntu为例,其他操作系统可参考RabbitMQ官网:Downloading and Installing RabbitMQ — RabbitMQ
在Ubuntu系统安装redis可以使用一下命令
sudo apt-get update sudo apt-get install redis-server
启动redis
redis-server
查看redis是否启动
redis-cli
上面的命令将打开以下终端
redis 127.0.0.1:6379>
其中127.0.0.1是本机ip,6379是redis服务端口号,现在输入ping命令:
redis 127.0.0.1:6379> ping PONG
以上说明redis已经安装成功。以下是通过源码包安装redis。
wget http://download.redis.io/releases/redis-6.0.6.tar.gz tar xzf redis-6.0.6.tar.gz cd redis-6.0.6 make
make命令执行完,在redis-6.0.6/src目录下会出现编译后的Redis服务程序redis-server和启动客户端程序redis-cli
如下命令启动Redis,此命令会一直处于占用状态,我们再重新开一个命令行连接
cd redis-6.0.6/src ./redis-server ../redis.conf
注意:如果redis-server 后面指定配置文件,则会以默认的配置启动redis服务。此处我们是使用的指定的默认redis配置文件。也可以根据需要使用自己的配置文件。
启动redis服务后,显示如下:
以上表示启动成功,可以使用测试客户端程序redis-cli和redis进行交互了,例如:
# 有$ 的一行表示shell命令 $ cd src $ ./redis-cli redis> set foo bar OK redis> get foo "bar"
配置celery的BROKER_URL,redis的默认连接URL如下:
BROKER_URL = 'redis://localhost:6379/0'
这里仍以Ubuntu为例
Centos7.6系统参考:CentOS7.6 安装RabbitMQ_大帅的博客-CSDN博客
首先安装erlang。由于RabbitMQ需要Erlang语言的支持,因此需要先安装Erlang,执行命令:
sudo apt-get install erlang-nox
再安装RabbitMQ
sudo apt-get update sudo apt-get install rabbitmq-server
启动、关闭、重启、状态RabbitMQ服务的命令如下:
# 启动 sudo rabbitmq-server start # 关闭 sudo rabbitmq-server stop # 重启 sudo rabbitmq-server restart # 查看rabbitmq状态 sudo rabbitmqctl status
要使用celery,需要创建一个RabbitMQ用户和虚拟主机,并且允许用户访问改虚拟主机。
# 创建rabbitmq的用户名为myuser,密码为mypassword,请自行设置 sudo rabbitmqctl add_user myuser mypassword # 创建虚拟主机 sudo rabbitmqctl add_vhost myvhost # 设置权限 sudo rabbitmqctl set_permissions -p myvhost myuser ".*" ".*" ".*"
RabbitMQ是默认的中间人的URL位置,生产环境根据实际情况修改即可。
BROKER_URL = 'amqp://guest:guest@localhost:5672'
我们选redis作为broker,首先要修改一下redis的配置文件redis.conf,修改bind=127.0.0.1为bind=0.0.0.0,意思是允许远程访问Redis数据库。修改完毕需要重启一下redis服务。
# sudo apt-get 方式安装重启 service redis-server restart # 源码安装重启 src/redis-server ../redis.conf
启动成功后检查:
[root@python celery_demo]# ps -elf | grep redis 0 S root 38987 110789 0 80 0 - 28203 pipe_w 06:19 pts/2 00:00:00 grep --color=auto redis 4 S root 104561 2276 0 80 0 - 40606 ep_pol 04:00 pts/0 00:00:23 src/redis-server 127.0.0.1:6379
说明已成功启动。
现在来编写一个Celery程序
【示例1】(my_first_celery.py)
# encoding=utf-8 from celery import Celery import time app = Celery( 'tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0' ) @app.task def add(x, y): time.sleep(3) # 模拟耗时操作 res = x + y print(f"x + y = {res}") return res
代码说明:
Celery()的第一个参数为当前模块的名称,只有在 __main__ 模块中定义任务时才会生产名称;第二个参数指定了中间人broker,第三个参数指定了后端存储。实现了一个add函数,该函数模拟了耗时操作,等待3秒,传入两个参数并返回之和,使用app.task来装饰该函数。
接下来我们启动任务执行单元worker。
celery -A my_first_celery worker -l info
命令说明:
-A 表示程序段模块名称,worker表示启动一个执行单元,-l是指-level,表示打印的日志等级,可以使用celery -help命令查看celery命令的帮助文档。
启动成功后显示如下:
如果不想用celery命令启动worker,则可以直接使用文件驱动,修改my_first_celery.py如下所示:
【实例2】使用入口函数启动(my_first_celery.py)
添加了app.start()启动
# encoding=utf-8 from celery import Celery import time app = Celery( 'tasks', broker='redis://127.0.0.1:6379/0', backend='redis://127.0.0.1:6379/0' ) @app.task def add(x, y): time.sleep(3) # 模拟耗时操作 res = x + y print(f"x + y = {res}") return res if __name__ == '__main__': app.start()
然后再命令中执行python3 my_first_celery.py worker即可,启动后的界面和使用celery命令的结果是一致的。
接下来,编写任务调度程序:start_task.py
from my_first_celery import add # 导入任务函数add import time # delay异步调用,因为add函数里面会等待3秒,这里调用不会阻塞,程序会立即向下执行 result = add.delay(12, 12) # ready方法检查任务是否执行完毕,此处会循环检查 while not result.ready(): print(time.strftime("%H:%M:%S")) time.sleep(1) print(result.get()) # 获取任务返回的结果,也就是两个数相加之和 print(result.successful()) # 判断任务是否成功执行
执行 python3 start_task.py 得到以下结果:
[root@python celery_demo]# python3 start_task.py 06:48:05 06:48:06 06:48:07 24 True
等待了3秒后(有可能会打印4次秒数),任务返回了24,并且成功完成。此时worker界面增加的信息如下:
[2021-09-11 06:48:05,236: INFO/MainProcess] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] received [2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8] x + y = 24 [2021-09-11 06:48:08,242: WARNING/ForkPoolWorker-8] [2021-09-11 06:48:08,244: INFO/ForkPoolWorker-8] Task my_first_celery.add[41425cd6-63c8-41df-bb74-74fd0c5c7438] succeeded in 3.007353223998507s: 24
启动 41425cd6-63c8-41df-bb74-74fd0c5c7438 是 taskid ,只要指定了backend,根据这id就可以随时去backend查找运行结果。使用方法如下:
>>> from my_first_celery import add >>> taskid='41425cd6-63c8-41df-bb74-74fd0c5c7438' >>> add.AsyncResult(taskid).get() 24 >>>
或者
>>> from celery.result import AsyncResult >>> AsyncResult(taskid).get() 24
明天继续写。。