• flask python 设置定时任务 flask 周期性执行任务方案


    flask 通常使用 flask_apscheduler 框架设计定时任务,flask_apscheduler 功能很全面,能按设定的时间规则执行任务,可以持久化到各类数据库(mysql,redis,mongodb),实现对定时任务增、删、改、查等操作。

    安装

    pip3 install flask_apscheduler
    
    • 1

    1、调用方法

    方法一:使用 Config 类配置时间规则

    from flask import Flask
    from flask_apscheduler import APScheduler
    
    class Config(object):
        # 列表类型,如有需要可以定义多个job
        JOBS = [
            {
                'id': 'job_1',                # 一个标识
                'func': '__main__:job1',     # 指定运行的函数
                'args': (1, 2),              # 传入函数的参数
                'trigger': 'interval',       # 指定 定时任务的类型
                'seconds': 5                # 运行的间隔时间
            }
        ]
    
        SCHEDULER_API_ENABLED = True
    
    def job1(a, b):                          # 运行的定时任务的函数
        print(str(a) + ' ' + str(b))
    
    
    if __name__ == '__main__':
        app = Flask(__name__)                 # 实例化flask
        app.config.from_object(Config())      # 为实例化的 flask 引入配置
        scheduler = APScheduler()                  # 实例化 APScheduler
        scheduler.init_app(app)                    # 把任务列表放入 flask
        scheduler.start()                          # 启动任务列表
        app.debug = True
        app.run(host='0.0.0.0',port=8000)          # 启动 flask
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    方法二:使用装饰器

    from flask import Flask
    from flask_apscheduler import APScheduler
    
    
    # 实例化 APScheduler
    scheduler = APScheduler()
    
    @scheduler.task('interval', id='job_1', args=(1,2),seconds=5)
    def job1(a, b):  # 运行的定时任务的函数
        print(str(a) + ' ' + str(b))
    
    
    if __name__ == '__main__':
        app = Flask(__name__)  # 实例化flask
        scheduler.start()  # 启动任务列表
        app.debug=True
        app.run(host='0.0.0.0',port= 8000)  # 启动 flask
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    方法三:通过调用 flask_apscheduler 的 api (推荐)

    from flask import Flask
    from apscheduler.schedulers.background import BackgroundScheduler
    
    # 调度器在后台线程中运行,不会阻塞当前线程
    scheduler = BackgroundScheduler()
    
    def job1(a, b):                          # 运行的定时任务的函数
        print(str(a) + ' ' + str(b))
    
        
    scheduler.add_job(func=job1, args=("1","2"),id="job_1", trigger="interval", seconds=5, replace_existing=False)
    '''
    func:定时任务执行的函数名称。
    args:任务执行函数的位置参数,若无参数可不填
    id:任务id,唯一标识,修改,删除均以任务id作为标识
    trigger:触发器类型,参数可选:date、interval、cron
    replace_existing:将任务持久化至数据库中时,此参数必须添加,值为True。并且id值必须有。不然当程序重新启动时,任务会被重复添加。
    '''
    
    if __name__ == '__main__':
        app = Flask(__name__)  # 实例化flask
        scheduler.start()  # 启动任务列表
        app.debug=True
        app.run(host='0.0.0.0',port= 8000)  # 启动 flask
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    实例对象 scheduler 拥有增、删、改、查等基本用法:

    新增任务:add_job()
    
    • 1
    编辑任务:modify_job()
    
    • 1
    删除任务:remove_job(id)(删除所有任务:remove_all_jobs()
    • 1
    查询任务:get_job(id)(查询所有任务:get_jobs()
    • 1
    暂停任务:pause_job(id)
    
    • 1
    恢复任务:resume_job(id)
    
    • 1
    运行任务:run_job(id)(立即运行,无视任务设置的时间规则)
    
    • 1
  • 相关阅读:
    [免费专栏] Android安全之Xposed插件开发【从零手把手带】教程
    基于Java毕业设计在线商城系统源码+系统+mysql+lw文档+部署软件
    那些一门心思研究自动化测试的人,最后都怎样了?
    图像分类,看我就够啦!
    Redis安装
    受心理学启发,这项眼球追踪生成式模型大幅降低训练成本
    AVM赛道研究:预计2024年渗透率突破50%!下一个破局点在哪儿?
    2.5 C#视觉程序开发实例1----CamManager实现模拟相机采集图片(Form_Vision部分代码)
    Visual Studio常见编译错误记录
    哭了,早知道就不用花钱去学3D建模了️
  • 原文地址:https://blog.csdn.net/weixin_39589455/article/details/133712526