Windows10 教育版64位
Python 3.6.3
APScheduler 3.6.3
Python
中定时任务的解决方案,总体来说有四种,分别是:crontab
、 scheduler
、 Celery
、APScheduler
,其中:
crontab
是 Linux
的一个定时任务管理工具,在Windows
上面有替代品pycron
,但Windows
不像 Linux
那样有很多强大的命令程序,pycron
使用起来有局限性
,定制性
不好;Scheduler
太过于简单、复杂一点的定时任务做起来太困难,特别是以月份
以上时间单位的定时任务;Celery
依赖的软件比较多,比较耗资源;APScheduler(Advanced Python Scheduler)
基于 Quartz
,可以跨平台而且配置方便,提供了date
、interval
、cron
3种不同的触发器,与Linux
上原生的 crontab
格式兼容,可以设置任何高度复杂的定时任务,灵活的要死。在此不介绍APScheduler
的基本特性,有需要的可以直接去看APScheduler官方文档,我们直接切到主题:
APScheduler如何设置任务不并发(即第一个任务执行完再执行下一个)?
APScheduler
在多个任务相同时间点同时被触发时,会同时并发执行多个任务,如使用下方的示例代码:
'''
===========================================
@author: jayce
@file: apscheduler设置任务不并发.py
@time: 2022/7/1/001 19:38
===========================================
'''
from apscheduler.schedulers.blocking import BlockingScheduler
import time
def job_printer(text):
'''
死循环,用来模拟长时间执行的任务
:param text:
:return:
'''
while True:
time.sleep(2)
print("job text:{}".format(text))
if __name__ == '__main__':
schedule = BlockingScheduler()
schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])
schedule.add_job(job_printer, "cron", second='*/20', args=['每20秒执行一次!'])
schedule.print_jobs()
schedule.start()
可以看到,函数job_printer
是一个死循环,用来模拟长时间执行的任务,我们使用add_job
向APScheduler
中添加2个job_printer
,区别是2个任务的时间间隔为:每10秒执行一次
和每20秒执行一次
。
因为job_printer
是一个死循环,相当于job_printer
一直没有被执行完,但其实APScheduler
在任务没有被执行完的情况下,同时执行多个不同的job_printer
:
job text:每10秒执行一次!
job text:每20秒执行一次!
job text:每10秒执行一次!
job text:每20秒执行一次!
job text:每10秒执行一次!
job text:每20秒执行一次!
job text:每10秒执行一次!
job text:每20秒执行一次!
job text:每10秒执行一次!
Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 20:47:50 CST)" skipped: maximum number of running instances reached (1)
即:
可以看到10秒的job_printer
和20秒的job_printer
交替被执行,而其实10秒的job_printer
其实根本没有执行完。这在CPU
或者GPU
等硬件设备能够承担负载的情况下,当然是好事,但如果你的硬件不够的话,发生OOM等资源不够的情况,程序就被中断了,导致你的模型训练或业务逻辑失败!
具体的
:
我这边是使用APScheduler
和Tensorflow
进行在线学习(online learning
)时,在不同的时间节点下会对模型使用不一样的重训练方式,如有2个定时任务(A
:每10
秒执行一次,B
:每20
秒执行一次)和2种重训练方式(X
和Y
),当你的显存存在如下情况:
显存很少只够一个程序进行训练,不能多个程序同时运行,否则会
OOM
;
那么只能引导程序依次执行,而不能并发执行,等当同一时间内X
和Y
同时被触发时,只执行其中1个,另外1个不执行。
那这个时候又该怎么办呢?
通过查阅官方文档,发现可以通过设置执行任务的线程数,来控制只有1个执行器进行任务的执行,进而达到执行完任务X
再执行任务Y
,具体如下:
'''
===========================================
@author: jayce
@file: apscheduler设置任务不并发.py
@time: 2022/7/1/001 19:38
===========================================
'''
from apscheduler.executors.pool import ThreadPoolExecutor
if __name__ == '__main__':
# 为了防止全量和增量并发造成显存溢出,进而训练失败,设置同一时间只能有一个任务运行
schedule = BlockingScheduler(executors={'default': ThreadPoolExecutor(1)})
通过向BlockingScheduler
设定最大的ThreadPoolExecutor=1
,即可达到我们想要的效果!
job text:每10秒执行一次!
job text:每10秒执行一次!
job text:每10秒执行一次!
job text:每10秒执行一次!
job text:每10秒执行一次!
Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 21:17:50 CST)" skipped: maximum number of running instances reached (1)
job text:每10秒执行一次!
job text:每10秒执行一次!
job text:每10秒执行一次!
job text:每10秒执行一次!
job text:每10秒执行一次!
Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 21:18:00 CST)" skipped: maximum number of running instances reached (1)
Execution of job "job_printer (trigger: cron[second='*/20'], next run at: 2022-07-01 21:18:00 CST)" skipped: maximum number of running instances reached (1)
即:
可以看到,一直在执行第1个被触发的任务,相同时间被触发的任务都被skipped
了~~
当然,如果你想要第1个任务执行完时,执行被跳过的任务,可以通过在add_job
中设置misfire_grace_time
实现!
APScheduler
如果某个任务挂掉了,整个定时任务程序会中断吗?还是下次时间继续执行该任务?答案是:程序不会中断,到下次执行任务的时间点,还会重新执行。
具体的,使用如下测试代码:
'''
===========================================
@author: jayce
@file: apscheduler设置任务不并发.py
@time: 2022/7/1/001 19:38
===========================================
'''
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
import time
def exception_maker():
'''
异常制造器,用来模拟任务执行被中断
:return:
'''
return 1 / 0
def job_printer(text):
'''
死循环,用来模拟长时间执行的任务
:param text:
:return:
'''
while True:
time.sleep(2)
print("job text:{}".format(text))
if __name__ == '__main__':
schedule = BlockingScheduler()
schedule.add_job(job_printer, "cron", second='*/10', args=['每10秒执行一次!'])
schedule.add_job(exception_maker, "cron", second='*/5')
schedule.print_jobs()
schedule.start()
可以看到exception_maker
已经失败多次,但是不影响其他任务和它自身的下次执行:
Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:30 CST)" raised an exception
Traceback (most recent call last):
File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
return 1 / 0
ZeroDivisionError: division by zero
Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:35 CST)" raised an exception
Traceback (most recent call last):
File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
return 1 / 0
ZeroDivisionError: division by zero
job text:每10秒执行一次!
job text:每10秒执行一次!
Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:40 CST)" raised an exception
Traceback (most recent call last):
File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
return 1 / 0
ZeroDivisionError: division by zero
job text:每10秒执行一次!
job text:每10秒执行一次!
Execution of job "job_printer (trigger: cron[second='*/10'], next run at: 2022-07-01 19:53:40 CST)" skipped: maximum number of running instances reached (1)
Job "exception_maker (trigger: cron[second='*/5'], next run at: 2022-07-01 19:53:45 CST)" raised an exception
Traceback (most recent call last):
File "C:\Users\Jayce\Anaconda3\envs\tf2.3\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "E:/Code/Python/demo代码/apscheduler设置任务不并发.py", line 14, in exception_maker
return 1 / 0
ZeroDivisionError: division by zero
job text:每10秒执行一次!
即:
都看到这里了,还不赶紧点赞
,评论
,收藏
走一波?