• APScheduler如何设置任务不并发(即第一个任务执行完再执行下一个)?


    1.软件环境

    Windows10 教育版64位
    Python 3.6.3
    APScheduler 3.6.3

    2.问题描述

    Python中定时任务的解决方案,总体来说有四种,分别是:crontabschedulerCeleryAPScheduler,其中:

    1. crontabLinux 的一个定时任务管理工具,在Windows上面有替代品pycron,但Windows不像 Linux那样有很多强大的命令程序,pycron使用起来有局限性定制性不好;
    2. Scheduler太过于简单、复杂一点的定时任务做起来太困难,特别是以月份以上时间单位的定时任务;
    3. Celery依赖的软件比较多,比较耗资源;
    4. APScheduler(Advanced Python Scheduler) 基于 Quartz,可以跨平台而且配置方便,提供了dateintervalcron3种不同的触发器,与Linux上原生的 crontab 格式兼容,可以设置任何高度复杂的定时任务,灵活的要死。

    在此不介绍APScheduler的基本特性,有需要的可以直接去看APScheduler官方文档,我们直接切到主题:

    APScheduler如何设置任务不并发(即第一个任务执行完再执行下一个)?

    APScheduler在多个任务相同时间点同时被触发时,会同时并发执行多个任务,如使用下方的示例代码:

    '''
    ===========================================
      @author:  jayce
      @file:    apscheduler设置任务不并发.py         
      @time:    2022/7/1/001   19:38 
    ===========================================
    '''
    from apscheduler.schedulers.blocking import BlockingScheduler
    import time
    
    
    def job_printer(text):
        '''
        死循环,用来模拟长时间执行的任务
        :param text: 
        :return: 
        '''
        while True:
            time.sleep(2)
            print("job text:{}".format(text))
    
    
    if __name__ == '__main__':
        schedule = BlockingScheduler()
    
        schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])
        schedule.add_job(job_printer, "cron", second='*/20', args=['每20秒执行一次!'])
     
        schedule.print_jobs()
        schedule.start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    可以看到,函数job_printer是一个死循环,用来模拟长时间执行的任务,我们使用add_jobAPScheduler中添加2个job_printer,区别是2个任务的时间间隔为:每10秒执行一次每20秒执行一次
    因为job_printer是一个死循环,相当于job_printer一直没有被执行完,但其实APScheduler在任务没有被执行完的情况下,同时执行多个不同的job_printer

    job text:每10秒执行一次!
    job text:每20秒执行一次!
    job text:每10秒执行一次!
    job text:每20秒执行一次!
    job text:每10秒执行一次!
    job text:每20秒执行一次!
    job text:每10秒执行一次!
    job text:每20秒执行一次!
    job text:每10秒执行一次!
    Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 20:47:50 CST)" skipped: maximum number of running instances reached (1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    即:
    在这里插入图片描述
    可以看到10秒的job_printer和20秒的job_printer交替被执行,而其实10秒的job_printer其实根本没有执行完。这在CPU或者GPU等硬件设备能够承担负载的情况下,当然是好事,但如果你的硬件不够的话,发生OOM等资源不够的情况,程序就被中断了,导致你的模型训练或业务逻辑失败!
    具体的
    我这边是使用APSchedulerTensorflow进行在线学习(online learning)时,在不同的时间节点下会对模型使用不一样的重训练方式,如有2个定时任务(A:每10秒执行一次,B:每20秒执行一次)和2种重训练方式(XY),当你的显存存在如下情况:

    显存很少只够一个程序进行训练,不能多个程序同时运行,否则会OOM

    那么只能引导程序依次执行,而不能并发执行,等当同一时间内XY同时被触发时,只执行其中1个,另外1个不执行。

    那这个时候又该怎么办呢?
    在这里插入图片描述

    3.解决方法

    通过查阅官方文档,发现可以通过设置执行任务的线程数,来控制只有1个执行器进行任务的执行,进而达到执行完任务X再执行任务Y,具体如下:

    '''
    ===========================================
      @author:  jayce
      @file:    apscheduler设置任务不并发.py         
      @time:    2022/7/1/001   19:38 
    ===========================================
    '''
    from apscheduler.executors.pool import ThreadPoolExecutor
    
    
    if __name__ == '__main__':
        # 为了防止全量和增量并发造成显存溢出,进而训练失败,设置同一时间只能有一个任务运行
        schedule = BlockingScheduler(executors={'default': ThreadPoolExecutor(1)})
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    通过向BlockingScheduler设定最大的ThreadPoolExecutor=1,即可达到我们想要的效果!

    4.结果预览

    job text:每10秒执行一次!
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 21:17:50 CST)" skipped: maximum number of running instances reached (1)
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 21:18:00 CST)" skipped: maximum number of running instances reached (1)
    Execution of job "job_printer (trigger: cron[second='*/20'], next run at: 2022-07-01 21:18:00 CST)" skipped: maximum number of running instances reached (1)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    即:
    在这里插入图片描述
    可以看到,一直在执行第1个被触发的任务,相同时间被触发的任务都被skipped了~~
    当然,如果你想要第1个任务执行完时,执行被跳过的任务,可以通过在add_job中设置misfire_grace_time实现!

    FAQ

    1.APScheduler如果某个任务挂掉了,整个定时任务程序会中断吗?还是下次时间继续执行该任务?

    答案是:程序不会中断,到下次执行任务的时间点,还会重新执行。
    具体的,使用如下测试代码:

    '''
    ===========================================
      @author:  jayce
      @file:    apscheduler设置任务不并发.py         
      @time:    2022/7/1/001   19:38 
    ===========================================
    '''
    from apscheduler.schedulers.blocking import BlockingScheduler
    from apscheduler.executors.pool import ThreadPoolExecutor
    import time
    
    
    def exception_maker():
        '''
        异常制造器,用来模拟任务执行被中断
        :return:
        '''
        return 1 / 0
    
    
    def job_printer(text):
        '''
        死循环,用来模拟长时间执行的任务
        :param text:
        :return:
        '''
        while True:
            time.sleep(2)
            print("job text:{}".format(text))
    
    
    if __name__ == '__main__':
        schedule = BlockingScheduler()
    
        schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])
        schedule.add_job(exception_maker, "cron", second='*/5')
    
        schedule.print_jobs()
        schedule.start()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    可以看到exception_maker已经失败多次,但是不影响其他任务和它自身的下次执行:

    Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:30 CST)" raised an exception
    Traceback (most recent call last):
      File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
        retval = job.func(*job.args, **job.kwargs)
      File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
        return 1 / 0
    ZeroDivisionError: division by zero
    Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:35 CST)" raised an exception
    Traceback (most recent call last):
      File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
        retval = job.func(*job.args, **job.kwargs)
      File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
        return 1 / 0
    ZeroDivisionError: division by zero
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:40 CST)" raised an exception
    Traceback (most recent call last):
      File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
        retval = job.func(*job.args, **job.kwargs)
      File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
        return 1 / 0
    ZeroDivisionError: division by zero
    job text:每10秒执行一次!
    job text:每10秒执行一次!
    Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 19:53:40 CST)" skipped: maximum number of running instances reached (1)
    Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:45 CST)" raised an exception
    Traceback (most recent call last):
      File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
        retval = job.func(*job.args, **job.kwargs)
      File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
        return 1 / 0
    ZeroDivisionError: division by zero
    job text:每10秒执行一次!
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    即:
    在这里插入图片描述


    都看到这里了,还不赶紧点赞评论收藏走一波?

  • 相关阅读:
    【linux】stat文件属性中三个时间的区别(Access time,Modify time,Change time)
    如何理解图神经网络的傅里叶变换和图卷积
    Go 语言内置类型全解析:从布尔到字符串的全维度探究
    pdf生成:wkhtmltopdf
    运维排查篇 | Redis占用内存过高怎么办?
    OSSID: Online Self-Supervised Instance Detection by (And For) Pose Estimation
    数据库查询优化:主从读写分离及常见问题
    接口与抽象类的区别
    07Linux--环境安装
    【艾特淘】手淘搜索新时代来了,开启搜索短视频时代,都是免费流量
  • 原文地址:https://blog.csdn.net/qq_15969343/article/details/125565204