• 【fastapi】定时任务管理


    在FastApi框架搭建的WBE系统中如何实现定时任务的管理?
    Python中常见的定时任务框架包括Celery、APScheduler和Huey。以下是每个框架的简单对比和示例代码。
    1.Celery: 分布式任务队列,适合处理长时间运行的任务。

    # 安装celery
    # pip install celery
     
    # celery_task.py
    from celery import Celery
     
    app = Celery('tasks', broker='redis://localhost:6379/0')
     
    @app.task
    def add(x, y):
        return x + y
     
    # 使用
    # celery -A celery_task worker -l info
    # 执行任务
    # result = add.delay(4, 4)
    

    2.APScheduler: 定时任务调度,内置多种触发器。

    # 安装apscheduler
    # pip install apscheduler
     
    from apscheduler.schedulers.blocking import BlockingScheduler
     
    def my_job():
        print("执行任务...")
     
    scheduler = BlockingScheduler()
    scheduler.add_job(my_job, 'interval', seconds=5)  # 每5秒执行一次
    scheduler.start()
    

    3.Huey: 轻量级的任务队列,使用Redis作为数据存储。

    # 安装huey
    # pip install huey
     
    from huey import Huey
     
    huey = Huey('my_app')
     
    @huey.task()
    def task_a():
        print('Task A is running')
     
    # 使用
    if __name__ == '__main__':
        huey.run()
        # 或者在生产环境中使用huey.poll()
    

    Celery适合处理长任务,需要消息队列和分布式的场景;Huey轻量但需要其他Redis做存储。
    所以我们选择APScheduler集成到我们的web系统中。

    环境配置
    pip install apscheduler fastapi[all]

    APScheduler的基本组件
    APScheduler 有四种组件,分别是:调度器(scheduler),作业存储(job store),触发器(trigger),执行器(executor)。

    这里有个注意事项,很多博主都没讲的地方,在Web项目中集成APScheduler,调度器不能选择BlockingScheduler,这样会阻塞WEB系统的进程,导致定时框架启动而,web系统无法运行。

    不多说直接上干货:

    定时框架基础配置

    from pytz import timezone
    from apscheduler.schedulers.background import BackgroundScheduler
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
    
    
    jobstores = {
        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
    }
    
    executors = {
        'default': ThreadPoolExecutor(30),  # 线程池数量
        'processpool': ProcessPoolExecutor(3),  # 进程池数量
    }
    
    job_defaults = {
        'coalesce': False,  # 默认情况下关闭新的作业
        'max_instances': 10  # 设置调度程序将同时运行的特定作业的最大实例数10
    }
    # scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,
    #                               timezone=timezone('Asia/Shanghai'))
    scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults,timezone=timezone('Asia/Shanghai'))
    
    def A():
        print('hello')
    scheduler.add_job(A, 'interval', minutes=1)
    

    管理接口

    from fastapi.responses import JSONResponse
    
    @app.get('/list',summary='查询定时任务列表')
    def task_list():
        '''查询定时任务列表'''
    
        data = [
            {'name': i.name, 'id': i.id,
             'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '',
             'interval': str(i.trigger.interval),
             'interval_length': i.trigger.interval_length,
             'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '',
             'end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '',
             'status': bool(i.next_run_time),
             }
            for i in scheduler.get_jobs()]
        return JSONResponse(data)
    
    
    @app.get('/info',summary='查询定时任务详情')
    def task_info(id):
        '''查询定时任务详情'''
        data = {}
        if i := scheduler.get_job(id):
            data = {'name': i.name, 'id': i.id,
                    'start_date': i.trigger.start_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.start_date else '',
                    'interval': str(i.trigger.interval),
                    'interval_length': i.trigger.interval_length,
                    'next_run_time': i.next_run_time.strftime('%Y-%m-%d %H:%M:%S') if i.next_run_time else '',
                    'end_date': i.trigger.end_date.strftime('%Y-%m-%d %H:%M:%S') if i.trigger.end_date else '',
                    'status': bool(i.next_run_time),
                    }
        return JSONResponse(data)
    
    
    @app.get('/stop',summary='停止指定任务')
    def task_stop(id):
        '''停止指定任务'''
        if job := scheduler.get_job(id):
            job.pause()
        return JSONResponse('ok')
    
    
    @app.get('/resume',summary='恢复指定停止任务')
    def task_resume(id):
        '''恢复指定停止任务'''
        if job := scheduler.get_job(id):
            job.resume()
        return JSONResponse('ok')
    
    
    @app.get('/stop/all',summary='停止所有任务')
    def task_stop_all():
        '''停止所有任务'''
        for i in scheduler.get_jobs():
            i.pause()
        return JSONResponse('ok')
    
    
    @app.get('/resume/all',summary='恢复所有停止任务')
    def task_resume_all():
        '''恢复所有停止任务'''
        for i in scheduler.get_jobs():
            i.resume()
        return JSONResponse('ok')
    
    
    @app.get('/remove/all',summary='删除所有任务')
    def task_remove_all():
        '''删除所有任务'''
        scheduler.remove_all_jobs()
        return JSONResponse('ok')
    
    
    @app.get('/remove',summary='删除指定任务')
    def task_remove(id):
        '''删除指定任务'''
        if job := scheduler.get_job(id):
            job.remove()
        return JSONResponse('ok')
    
  • 相关阅读:
    各地区收入差距不平等、基尼系数、省级层面(分城镇和乡村)
    基于VUE + Echarts 实现可视化数据大屏环保可视化
    编译安装oh-my-zsh
    求圆心到点的直线与圆的相交点
    项目人力资源管理
    Web前端:ReactJS为你的应用程序开发带来的10大好处
    操作系统知识点总结——第三章内存管理
    简单而经典:Java中的冒泡排序算法详解
    tomcat里部署多个war,导致配置文件错乱。
    vue监听页面中的某个div的滚动事件,并判断滚动的位置
  • 原文地址:https://blog.csdn.net/qq_42874996/article/details/139843258