在FastApi框架搭建的WBE系统中如何实现定时任务的管理?
Python中常见的定时任务框架包括Celery、APScheduler和Huey。以下是每个框架的简单对比和示例代码。
1.Celery: 分布式任务队列,适合处理长时间运行的任务。
# 安装celery
# pip install celery
# celery_task.py
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def add(x, y):
return x + y
# 使用
# celery -A celery_task worker -l info
# 执行任务
# result = add.delay(4, 4)
2.APScheduler: 定时任务调度,内置多种触发器。
# 安装apscheduler
# pip install apscheduler
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
print("执行任务...")
scheduler = BlockingScheduler()
scheduler.add_job(my_job, 'interval', seconds=5) # 每5秒执行一次
scheduler.start()
3.Huey: 轻量级的任务队列,使用Redis作为数据存储。
# 安装huey
# pip install huey
from huey import Huey
huey = Huey('my_app')
@huey.task()
def task_a():
print('Task A is running')
# 使用
if __name__ == '__main__':
huey.run()
# 或者在生产环境中使用huey.poll()
Celery适合处理长任务,需要消息队列和分布式的场景;Huey轻量但需要其他Redis做存储。
所以我们选择APScheduler集成到我们的web系统中。
环境配置
pip install apscheduler fastapi[all]
APScheduler的基本组件
APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。
这里有个注意事项,很多博主都没讲的地方,在Web项目中集成APScheduler,调度器不能选择BlockingScheduler,这样会阻塞WEB系统的进程,导致定时框架启动而,web系统无法运行。
不多说直接上干货:
from pytz import timezone
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(30), # 线程池数量
'processpool': ProcessPoolExecutor(3), # 进程池数量
}
job_defaults = {
'coalesce': False, # 默认情况下关闭新的作业
'max_instances': 10 # 设置调度程序将同时运行的特定作业的最大实例数10
}
# scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,
# timezone=timezone('Asia/Shanghai'))
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,timezone=timezone('Asia/Shanghai'))
def A():
print('hello')
scheduler.add_job(A, 'interval', minutes=1)
from fastapi.responses import JSONResponse
@app.get('/list',summary='查询定时任务列表')
def task_list():
'''查询定时任务列表'''
data = [
{'name': i.name, 'id': i.id,
'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '',
'interval': str(i.trigger.interval),
'interval_length': i.trigger.interval_length,
'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '',
'end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '',
'status': bool(i.next_run_time),
}
for i in scheduler.get_jobs()]
return JSONResponse(data)
@app.get('/info',summary='查询定时任务详情')
def task_info(id):
'''查询定时任务详情'''
data = {}
if i := scheduler.get_job(id):
data = {'name': i.name, 'id': i.id,
'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '',
'interval': str(i.trigger.interval),
'interval_length': i.trigger.interval_length,
'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '',
'end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '',
'status': bool(i.next_run_time),
}
return JSONResponse(data)
@app.get('/stop',summary='停止指定任务')
def task_stop(id):
'''停止指定任务'''
if job := scheduler.get_job(id):
job.pause()
return JSONResponse('ok')
@app.get('/resume',summary='恢复指定停止任务')
def task_resume(id):
'''恢复指定停止任务'''
if job := scheduler.get_job(id):
job.resume()
return JSONResponse('ok')
@app.get('/stop/all',summary='停止所有任务')
def task_stop_all():
'''停止所有任务'''
for i in scheduler.get_jobs():
i.pause()
return JSONResponse('ok')
@app.get('/resume/all',summary='恢复所有停止任务')
def task_resume_all():
'''恢复所有停止任务'''
for i in scheduler.get_jobs():
i.resume()
return JSONResponse('ok')
@app.get('/remove/all',summary='删除所有任务')
def task_remove_all():
'''删除所有任务'''
scheduler.remove_all_jobs()
return JSONResponse('ok')
@app.get('/remove',summary='删除指定任务')
def task_remove(id):
'''删除指定任务'''
if job := scheduler.get_job(id):
job.remove()
return JSONResponse('ok')