Go教程

django 定时任务第三方库apscheduler

本文主要是介绍django 定时任务第三方库apscheduler,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

1、apscheduler未安装的需要自己安装,安装命令如下:

pip install apscheduler

  

2、apscheduler库有很多不同类型的调度器,其他我也不懂,只知道BlockingScheduler与BackgroundScheduler,这是比较常用的两种
  区别主要在于BlockingScheduler会阻塞主线程的运行,而BackgroundScheduler不会阻塞。所以,我们在不同的情况下,选择不同的调度器:
  BlockingScheduler: 调用start函数后会阻塞当前线程。当调度器是你应用中唯一要运行的东西时使用(下面写的就是这种调度)
  BackgroundScheduler: 调用start后主线程不会阻塞。当你不运行任何其他框架时使用,并希望调度器在你应用的后台执行
  用人话说就是:用BlockingScheduler创建的类,调用start()函数开始运行后,会阻塞(相当于停止)当前已在运行的线程,只执行当前的定时任务,运行到你主动关掉为止
  而BackgroundScheduler创建的类,调用start后主线程不会阻塞,会继续运行下一行代码,如果后面没有死循环什么的,订的时间又比较长,会出现定时还没生效,代码已经运行结束的情况
  参考文章:https://www.cnblogs.com/will-wu/p/14721592.html

 

3、apscheduler的触发器
  apscheduler的触发器有三种:date(日期触发),interval(固定间隔触发),cron(周期触发)

    date表示具体的一次性任务,interval表示循环任务,cron表示定时任务  

  3.1、日期触发:date,特定的时间点触发,最基本的一种调度,作业只会执行一次
    run_date(datetime or str) 任务运行的日期或者时间
    timezone(datetime.tzinfo or str) 指定时区

# 导入BlockingScheduler调度器
from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

# 封装函数待用
def print_hours_now():
    print('小时:%s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

# 创建一个对象
timing_type = BlockingScheduler()

# run_date时间到了之后,开始执行print_hours_now
timing_type.add_job(print_hours_now,'date',run_date='2021-04-30 16:13:00')
# 或这样写
# timing_type.add_job(func=print_hours_now,trigger='date',run_date='2021-04-30 16:13:00')

# 立即执行
timing_type.add_job(print_hours_now,'date')

# 等待触发时间到达,开始执行指定任务
timing_type.start()

  

3.2、interval:固定间隔触发
    year:每隔X年执行一次
    weeks:每隔X周执行一次 weeks=X,范围1-12
    days:每隔X天执行一次,范围1-31
    hours: 每隔几小时执行一次 | hours=0,范围0-23
    minutes: 每隔几分执行一次 | minutes=0,范围0-59
    seconds: 每隔几秒执行一次 | seconds=0,范围0-59
    start_date: 最早执行时间 | start_date=None,范围是年月日的组合字符串
    end_date: 最晚执行时间 | end_date=None,范围是年月日的组合字符串
    timezone: 执行时间区间 | timezone=None,范围是年月日的组合字符串

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def print_minutes_now():
    print('打印时间:%s' % datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

timing_type = BlockingScheduler()
# 在start_date到end_date时间段内,每隔5s执行一次print_minutes_now;其中,func=这些可以省略,start_date和end_date可以单独使用,
# timing_type.add_job(func=print_minutes_now,trigger='interval',seconds=5,start_date='2021-05-04 11:40',end_date='2021-05-04 11:41:00')
# 在end_date之前,每隔0.2小时运行一次print_minutes_now
timing_type.add_job(func=print_minutes_now,trigger='interval',minutes=2,end_date='2021-05-08 11:41:00')
# 每隔1小时执行一次
timing_type.add_job(func=print_minutes_now,trigger='interval',hours=1)
# 启动调度器
timing_type.start()

  

3.3、cron:周期触发,在特定时间周期性地触发
    cron参数:
      year(int or str) 年,4位数字
      month(int or str) 月(范围1-12)
      day(int or str) 日(范围1-31)
      week(int or str) 周(范围1-53)
      day_of_week(int or str) 周内第几天或者星期几(范围0-6或者mon,tue,wed,thu,fri,stat,sun)
      hour(int or str) 时(0-23)
      minute(int or str) 分(0-59)
      second(int or str) 秒(0-59)
      start_date(datetime or str) 最早开始日期(含)
      end_date(datetime or str) 最晚结束日期(含)
      timezone(datetime.tzinfo or str) 指定时区

    表达式:
      * 所有 通配符。例:minutes=*即每分钟触发
      */a 所有 可被a整除的通配符。
      a-b 所有 范围a-b触发
      a-b/c 所有 范围a-b,且可被c整除时触发
      xth y 日 第几个星期几触发。x为第几个,y为星期几
      last x 日 一个月中,最后个星期几触发
      last 日 一个月最后一天触发
      x,y,z 所有 组合表达式,可以组合确定值或上方的表达式

from apscheduler.schedulers.blocking import BlockingScheduler
from datetime import datetime

def print_time_now():
    print('打印当前时间:',datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

timing_type = BlockingScheduler()
# 每个月每天的17:36,打印当前时间,*代表0-59
# timing_type.add_job(print_time_now,'cron',month='*',day='*',hour=17,minute=36,second='*')
# 每天的每小时16分的零秒开始到59秒执行print_time_now(年月日时不写的时候会默认为所有,秒不写则默认为0)
timing_type.add_job(func=print_time_now, trigger='cron',minute='16',second='0-59')
# 周一到周五6:30执行timing_type.add_job(print_time_now, "cron",day_of_week = "1-5", hour = 6, minute = 30)
# 启动调度器 timing_type.start()

  

示例:

#封装邮件发送方法
from django.core.mail import send_mail


def send_email(email_user):
    send_mail(
        subject='这里是邮件标题',
        message='这里是邮件内容',
        from_email='test@qq.com',  # 发件人
        recipient_list=[email_user],  # 收件人
        #收件人可以直接写,也可以从setting.py中配置中导入
        fail_silently=False
    )
    print("邮件发送成功")
    return "邮件已发送"


#定时任务
from apscheduler.schedulers.background import BackgroundScheduler

#待定时执行的函数
def update_days():
    all_borrow_book = Borrow.objects.filter(borrow_status=2)
    now = datetime.date.today()
    # all_bookname = all_borrow_book.book_id
    for book in all_borrow_book:
        id = book.id
        user_id = book.user_id
        book_id = book.book_id
        print("userid:",user_id)
        email_user = User.objects.get(username=user_id).email
        lib_book = Book.objects.get(book_number=book_id)
        update_book = Borrow.objects.get(id=id)
        return_date = book.return_date
        update_days = return_date.__sub__(now).days
        update_book.surplus_days = update_days
        if update_days < 0:
            update_book.borrow_status = -1
            update_book.save()
            lib_book.book_status = -1
            lib_book.save()
            send_email(email_user)
        else:
            update_book.save()
        print("surplus_days: ",update_days)


#开启定时工作
try:
    # 实例化调度器
    scheduler = BackgroundScheduler()
    # scheduler.add_job(my_job, 'interval', minutes=10)
    scheduler.add_job(update_days,'cron',month='*',day='*',hour=19,minute=38,second=00)
    scheduler.start()
except Exception as e:
    print(e)

  

https://apscheduler.readthedocs.io/en/3.x/userguide.html#code-examples

https://www.cnblogs.com/will-wu/p/14731728.html

 

这篇关于django 定时任务第三方库apscheduler的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!