APScheduler
全称是Advanced Python Scheduler
是一个 Python 定时任务框架,用于执行周期或者定时任务,APScheduler 支持三种调度任务:固定时间间隔,固定时间点(日期)Linux 下的 Crontab 命令 ,同时支持异步执行、后台执行调度任务。
可以在主程序的运行过程中快速增加新作业或删除旧作业.apscheduler 可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换,不仅可以添加、删除定时任务,还可以将任务存储到数据库中、实现任务的持久化
特点:
pip install apscheduler
某一个工作到来时引发的事件,包含调度的逻辑,每一个作业都有它自己的触发器,用于决定哪个作业任务会执行,除了它们初始化配置之外,其完全是无状态的。总的来说就是一个任务应该在什么时候执行
主要是处理作业的运行,它将要执行的作业放在新的线程或者线程池中运行。执行完毕之后,再通知调度器。基于线程池的操作,可以针对不同类型的作业任务,更为高效的使用CPU的计算资源。
保存要调度的任务,其中除了默认的作业存储是把作业保存在内存中,其他的作业存储是将作业保存在数据库中。一个作业的数据将在保存在持久化的作业存储之前,会对作业执行序列化操作,当重新读取作业时,再执行反序列化操作。同时,调度器不能分享同一个作业存储。作业存储支持主流的存储机制:如redis,mongodb,关系型数据库,内存等等。
负责将上面几个组件联系在一起,一般在应用中只有一个调度器,程序开发者不会直接操作触发器、作业存储或执行器,而是利用调度器提供了处理这些合适的接口,作业存储和执行器的配置都是通过在调度器中完成的。
新建一个 schedulers (调度器) 。
添加一个调度任务(job stores)。
运行调度任务。
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 创建执行函数 def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler #创建调度器 sched = BlockingScheduler() # 添加执行函数 sched.add_job(my_job, 'interval', seconds=5, id='my_job_id') # 执行定时调度 sched.start() # 或者采用装饰器的方式 sched = BlockingScheduler() @sched.scheduled_job('interval', seconds=5) def my_job(): print time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) sched.start()
Job作为APScheduler最小执行单位。创建Job时指定执行的函数,函数中所需参数,Job执行时的一些设置信息
func
:Job执行的函数
trigger
:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行
args=None, kwargs=None
id
:指定作业的唯一ID
name
:指定作业的名字
misfire_grace_time
:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
max_instances
:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数来确定是否可执行
next_run_time
:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触发时间
coalesce
:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
executor
:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此job的 执行器,执行job指定的函数
APScheduler 提供 7 种调度器,能够满足我们各种场景的需要
BlockingScheduler : 调度器在当前进程的主线程中运行,调用start函数会阻塞当前线程,不能立即返回。
BackgroundScheduler : 调度器在后台线程中运行,调用start主线程不会阻塞。
AsyncIOScheduler : 适用于 asyncio 模块(一个异步框架)一起使用。
GeventScheduler : 适用于 gevent(高性能的Python并发框架)。
TornadoScheduler : 适用于 Tornado(一个web框架)的IO模型,用 ioloop.add_timeout 完成定时唤醒。
TwistedScheduler : 配合 TwistedExecutor,用 reactor.callLater 完成定时唤醒。
QtScheduler : 适用于 构建 Qt 应用程序,需使用QTimer完成定时唤醒。
导入方式
from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.gevent import GeventScheduler from apscheduler.schedulers.tornado import TornadoScheduler from apscheduler.schedulers.qt import QtScheduler
作用:根据触发器的规则计算出Job的触发时间,然后与当前时间比较确定此Job是否会被执行,总之就是根据trigger规则计算出下一个执行时间。
目前APScheduler 提供三种任务触发器:
date
固定日期出发,它表示特定的时间点触发, 作业任务只会执行一次interval
间隔时间调度, 固定时间间隔触发。interval 间隔调度cron
特定时间周期性地触发,和Linux crontab 格式兼容。它是功能最强大的触发器sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text']) sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
sched.add_job(job_function, 'interval', hours=2)
(int|str) 表示参数既可以是int类型,也可以是str类型 (datetime | str) 表示参数既可以是datetime类型,也可以是str类型 year=(int|str) month (1-12) -(表示取值范围为1-12月) month=(int|str) day of the (1-31) -(表示取值范围为1-31日) day=(int|str) week=(int|str) hour=(int|str) hour (0-23) - (表示取值范围为0-23时) minute=(int|str) minute (0-59) - (表示取值范围为0-59分) second=(int|str) second (0-59) - (表示取值范围为0-59秒) start_date=(datetime|str) end_date=(datetime|str) timezone=(datetime,tzinfo)
字符串参数格式
字符形式 | 类型 | 描述 | |
---|---|---|---|
* | 所有 | 通配符。例:minutes=* 即每分钟触发 | |
*/a | 所有 | 可被a整除的通配符。 | |
a-b | 所有 | 范围a-b触发 | |
a-b/c | 所有 | 范围a-b,且可被c整除时触发 | |
x th y | 日 | 第几个星期几触发。x为第几个,y为星期几 | |
last x | 日 | 一个月中,最后个星期几触发 | |
last | 日 | 一个月最后一天触发 | |
x,y,z | 所有 | 组合表达式,可以组合确定值或上方的表达式 |
# 6-8,11-12月第三个周五 00:00, 01:00, 02:00, 03:00运行 sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3') # 每周一到周五运行 直到2024-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'
https://blog.csdn.net/abc_soul/article/details/88875643
作业存储器的选择有两种:一是内存,也是默认的配置;二是数据库。
具体选哪一种看我们的应用程序在崩溃时是否重启整个应用程序,如果重启整个应用程序,那么作业会被重新添加到调度器中,此时简单的选取内存作为作业存储器即简单又高效。
提供了四种存储方式:
执行调度任务的模块。最常用的 executor
有两种:ProcessPoolExecutor
和 ThreadPoolExecutor
Event是APScheduler在进行某些操作时触发相应的事件,用户可以自定义一些函数来监听这些事件, 当触发某些Event时,做一些具体的操作 常见的比如。Job执行异常事件 EVENT_JOB_ERROR。Job执行时间错过事件 EVENT_JOB_MISSED。
import time from apscheduler.schedulers.blocking import BlockingScheduler def job(): print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) if __name__ == '__main__': # 该示例代码生成了一个BlockingScheduler调度器,使用了默认的任务存储MemoryJobStore,以及默认的执行器ThreadPoolExecutor,并且最大线程数为10。 # BlockingScheduler:在进程中运行单个任务,调度器是唯一运行的东西 scheduler = BlockingScheduler() # 采用阻塞的方式 # 采用固定时间间隔(interval)的方式,每隔5秒钟执行一次 scheduler.add_job(job, 'interval', seconds=5) scheduler.start()
import time from apscheduler.schedulers.blocking import BlockingScheduler def job(): print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) if __name__ == '__main__': # BlockingScheduler:在进程中运行单个任务,调度器是唯一运行的东西 scheduler = BlockingScheduler() # 采用阻塞的方式 # 采用date的方式,在特定时间只执行一次 scheduler.add_job(job, 'date', run_date='2018-09-21 15:30:00') scheduler.start()
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 输出时间 def job(): print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # BlockingScheduler scheduler = BlockingScheduler() scheduler.add_job(job, 'cron', day_of_week='*', hour='*', minute='10') scheduler.start()
from apscheduler.schedulers.blocking import BlockingScheduler def everyday_job(): print('Hello World!') sched = BlockingScheduler() #每隔一天 执行一次程序 # sched.add_job(everyday_job, 'interval', days=1) #每天早上十点半和十八点半各执行一次程序 sched.add_job(everyday_job, 'cron', hour='10, 18', minute='30') sched.start()
from apscheduler.schedulers.blocking import BlockingScheduler import datetime def everyday_job(): print(datetime.datetime.now()) print('Hello World!') sched = BlockingScheduler() # 每隔一天 执行一次程序 # sched.add_job(everyday_job, 'interval', days=1) # 每天早上十点半和十八点半各执行一次程序 sched.add_job(everyday_job, 'cron', minute='*/2') sched.start()
参考文献:
https://zhuanlan.zhihu.com/p/95563033