主要介绍通过 apscheduler的三种
在特定时间定期运行,相比较Linux crontab 多了 second/year/week(第多少周)/start_date/end_date
主要的参数:
特殊说明
1、linux crontab 中的 week 对应到 apscheduler中是 day_of_week (取值0到6后者mon,tue,wed,thu,fri,sat,sun)
2、配置job的时候,并不是所有时间字段都是必须
3、但是需要知道的是,隐含 大于最小有效值
的字段默认为*
,而较小的字段
默认为其最小值
,除了 week
和 day_of_week
默认为*
举例说明:
# month=6, hour=1 最小有效值字段为 hour; 其等价于
year='*', month=6, day=*, week='*', day_of_week='*', hour=1, minute=0, second=0
# 意思是在每年 6 月每天 0 点 0 分 0 秒运行;
4、字段支持表达式,其中特殊的是 day
, 支持 last
、last x
和 xth y
x
在当月的最后一次,比如 last mon
指 当月的最后一个周一y
在当月的第 x
次,比如 4th mon
指当月的第四个周一以固定的时间间隔运行Job
主要的参数:
某个特定时间仅运行一次的任务,类似于Linux的
at
主要的参数:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Author: Colin
@Github: https://github.com/opscolin
@DateTime: 2022/8/3 11:14 AM
@File: demo_aps.py
@Software: PyCharm
"""
import datetime
import time
import logging
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
# from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
# 日志 logger 配置
logging.basicConfig(filemode='a', filename='python-apscheduler.log')
# apscheduler store/executor 配置
job_stores = {
# 'mongo': MongoDBJobStore(),
# 'mongo': {'type': 'mongodb'},
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite'),
'db-ops': SQLAlchemyJobStore(
url='mysql+pymysql://xxxx:yyyy@192.168.a.b:3306/dbname?charset=utf8mb4')
}
executors = {
'default': ThreadPoolExecutor(20),
# 'default': {'type': 'threadpool', 'max_workers': 20},
'process_pool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
# 方式一
scheduler = BackgroundScheduler(jobstores=job_stores, executors=executors, job_defaults=job_defaults)
# 方式二
# scheduler = BackgroundScheduler()
# scheduler.configure(jobstores=job_stores, executors=executors, job_defaults=job_defaults, timezone=utc)
# 定义任务Job
# 无参数Job
def job1():
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"No params of job1 at {now}")
# 带参数 Job
def job2(name):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"Job with param {name} at {now}")
# Demo - Cron
scheduler.add_job(job2, trigger='cron', hour=11, minute=45, second=10, args=['Betty'])
# 每 30 秒执行一次
scheduler.add_job(job1, trigger='cron', second='*/20', jobstore='db-kfzops')
# 2022-12-31 之前, 第一季度和第三季度的每个月第三个周一的凌晨1点半执行
scheduler.add_job(job2, trigger='cron', args=['Betty'],
end_date='2022-12-31',
month='1-3,7-9',
day='3rd mon',
hour=1,
minute=30)
# Demo - Interval -> 每 30 秒执行一次
scheduler.add_job(job1, trigger='interval', seconds=30)
# Demo - Date
scheduler.add_job(job2, trigger='date', run_date='2022-08-03 14:20:00', args=['Betty'])
def main():
scheduler._logger = logging
scheduler.start()
while True:
time.sleep(1)
if __name__ == '__main__':
main()
脚本地址:https://gitee.com/colin5063/pylearn2022/blob/master/examples/scripts/demo_apscheduler.py
@register_job()
和 scheduler.add_job()
目前把任务的结果通过 jobstore 写入 sqlite
或者 MySQL
中,job_state
字段都是乱码,即使配置了字符集,也是乱码,目录该问题暂未找到原因。
如果大家有答案,欢迎留言分享
如果觉得文章对你有所帮忙,欢迎点赞收藏,或者可以关注个人公众号 全栈运维
哦,一起学习,一起健身~