• 基于sanic的服务使用celery完成动态修改定时任务


    首先声明一下
    考虑到celery目前和asyncio的不兼容性,协程任务需要转换为非异步的普通方法才能被当做task加入定时,并且celery和asyncio使用可能会带来预想不到的问题,在celery官方第二次承诺的6.0版本融合asyncio之前,需要慎重考虑一下
    如果你的项目是融合了asyncio的项目,而且并不需要像celery文档中描述的那么多的复杂的定时功能,一个轻量级的包APScheduler完全可以满足你的需求,而且兼容asyncio框架

    功能实现介绍

    这是一个基于Sanic服务和Celery定时任务操作的功能,实现的原理大致如下图

    • Server:是我们的sanic服务,负责接收和响应请求,接收任务请求之后会异步非阻塞地将预警的定时任务交给celery处理
    • Beat(Scheduler): 定期触发任务(提前设置好的周期性或定时任务),有可用worker时,任务将会被执行,这里我们的服务使用redis作为Beat Scheduler
    • Queue: 接收的任务的队列,使任务有序的进出,是celery本身实现
    • Worker: 执行任务
    • Result Store(Result backend ):
      存储任务的位置,有需要时可召回任务的结果,但是任务的结果会设置一个过期时间,这里我们的服务使用redis作为Result Store

    运行和使用的示例

    sanic-celery server示例的目录结构
    在这里插入图片描述

    主要关注的内容在celery_app, query和第一层的sanic_server.py和结构,settings.py保存的是项目的根目录

    import os
    import sys
    
    
    CELERY_BASE_DIR = os.path.dirname(os.path.abspath(__file__))
    
    
    sys.path.insert(0, CELERY_BASE_DIR)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    celery

    celery app启动:

    • 创建celery app,并将celery app启动的配置信息加入(配置信息在执行命令行启动celery之前加入都可以)
    • 配置文件的内容,可参考官方文档,这里给出了简单示例的配置内容和说明,注意4.x之后的celery配置变量要用小写的

    在这里插入图片描述

    # -*- coding:utf-8 -*-
    from celery import Celery
    
    from . import config
    app = Celery("app_name")
    app.config_from_object(config)
    
    config.py
    
    broker_url = 'redis://localhost:6379/1'
    result_backend = 'redis://localhost:6379/2'
    redbeat_redis_url = 'redis://localhost:6379/3'
    redbeat_key_prefix = 'roiq_redbeat_key_prefix:'
    # 任务运行结果过期时间,默认一天,传入秒数或者timedelta对象,参考https://docs.celeryq.dev/en/stable/userguide/configuration.html#result-expires
    result_expires = 600
    
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    timezone = 'Asia/Shanghai'
    enable_utc = True
    
    
    
    # (!)所有的tasks都要提前在这里imports
    imports = (
        "query.tasks",
        "send_email.tasks"
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    关于参数的更多详细说明,可参考官方文档

    Beat Scheduler是针对周期性任务和延时任务需求的,非Django的celery默认不支持celery服务运行的时候修改任务状态的,针对我们的业务需求,我们需要在服务运行的时候增加、修改和查看任务,因此引入了支持redis作为beat scheduler的模块redbeat,redbeat的使用参考链接,只需要使用其中的创建、更新和删除等常用操作方法

    参考redbeat入门链接安装好redbeat之后,以redbeat作为celery的beat启动celery,不配置redbeat_redis_url时默认broker也是beat

    celery启动命令

    在windows环境下,beat要和worker、broker分开启动

    指定readbeat作为beat启动celery

    在命令行执行:celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10

    • -A是celery app的位置,这里celery_app的__init__.py中包含celery app
    • beat指定需要启动beat(默认不启动)
    • -S指定beat的Scheduler对象
    • -l是loglevel,打印日志的信息等级,支持info, debug等关键字
    • –max-interval指定beat检查新修改的任务的间隔时间,默认5分钟,这里为了方便调试设置为10秒钟,比较实时地看到结果

    启动worker

    在命令行执行:celery -A worker -l debug -P gevent,为了支持windows上运行,需要先安装gevent(pip install gevent),在linux不需要-P选项

    更多参数和详情可以用celery --help,celery worker --help, celery beat --help查看

    启动celery服务之后,测试celery运行时的修改操作

    redbeat在celery运行时修改任务的操作

    使用redbeat支持在celery运行时修改任务的操作,执行时确保celery的app、worker、beat服务和redis等存储服务都在运行

    定义一个简单的任务:

    query/tasks.py

    # -*- coding:utf-8 -*-import asyncio
    import time
    
    import pandas as pd
    
    from celery_app import app
    
    
    async def countdown_task(a, b):
        """以一个简单的方法代替sql查询的task"""
        await asyncio.sleep(1)
        for i in range(3):
            print(f"-------{i}---------")
            time.sleep(1)
        return a+b
    
    
    @app.task
    def sync_countdown_task(a, b):
        return asyncio.get_running_loop().run_until_complete(countdown_task(a, b))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    由于项目中使用的全都是异步协程方法,需要将协程转换为普通的任务,才能够注册为celery的task

    sanic_server.py

    # -*- coding:utf-8 -*-
    import asyncio
    from datetime import timedelta
    
    from celery.schedules import crontab, schedule
    from redbeat import RedBeatSchedulerEntry
    from sanic import Sanic
    from sanic import response
    
    from celery_app import app as celery_app
    from celery_app.config import redbeat_key_prefix
    from query.tasks import sync_countdown_task
    
    sanic_app = Sanic("sanic_celery")
    
    loop = asyncio.get_event_loop()
    
    
    # 开始定时任务,需要在不重启celery服务的情况下将任务添加到beat
    async def query_task_create(request):
        """
        通过此api创建周期性的查询任务
        """
        tasks = f"query.tasks"            # 任务所在的模块(具体到.py文件)
        sche = schedule(timedelta(seconds=5))
        task_name = sync_countdown_task.__name__
        task = f"{tasks}.{task_name}"
        entry = RedBeatSchedulerEntry(task_name, task, sche, args=(1, 2), app=celery_app)
        print(entry)
        key = entry.key       # key存到数据库...
        entry.save()        
        return response.text(f"schedule2 created..., task key is: {key}")
    
    
    async def schedule_disable(request):
        task_name = sync_countdown_task.__name__
        key = redbeat_key_prefix + task_name        # key 可以
        entry = RedBeatSchedulerEntry.from_key(key, celery_app)
        entry.enabled = False
        entry.save()
        print(entry)
        return response.text("schedule disabled..")
    
    
    async def schedule_enable(request):
        task_name = sync_countdown_task.__name__
        key = redbeat_key_prefix + task_name
        entry = RedBeatSchedulerEntry.from_key(key, celery_app)
        entry.enabled = True
        entry.save()
        print(entry)
        return response.text("schedule enabled..")
    
    
    async def schedule_delete(request):
        task_name = sync_countdown_task.__name__     # 请求时获得(最开始也是用数据库存储和获取)
        task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"
        entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)
        print(entry)
        entry.delete()
        print("删除后的entry: ", entry)
        return response.text(task_name+" deleted")
    
    
    async def schedule_update(request):
        task_name = sync_countdown_task.__name__     # 请求时获得(最开始也是用数据库存储和获取)
        task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"
        # 获取task key
        entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)        # (!)要考虑任务已经删除,key不存在的情况
        print(entry)
        # 修改schedule
        entry.schedule = schedule(timedelta(seconds=3))
        # 修改参数
        entry.args = (3, 4)
        entry.save()
        print(entry)
        return response.text(task_name+" updated")
    
    
    async def schedule_info(request):
        """
    
        """
        task_key = f"{redbeat_key_prefix}{sync_countdown_task.__name__}"
        entry = RedBeatSchedulerEntry.from_key(task_key, app=celery_app)
        return response.text(f"{entry}")
    
    
    # todo: 1.设置result存储的过期时间; 2.添加和更新任务时,考虑key错误但是不会在项目中抛出的问题;
    
    
    sanic_app.add_route(query_task_create, "/create2")
    sanic_app.add_route(schedule_update, "/update")
    sanic_app.add_route(schedule_delete, "/delete")
    sanic_app.add_route(schedule_disable, "/disable")
    sanic_app.add_route(schedule_enable, "/enable")
    sanic_app.add_route(schedule_info, "/info")
    
    
    if __name__ == '__main__':
        sanic_app.run(port=4321)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101

    注:更新和删除等操作的key/task_key的获取,在上线时需要从数据库中存储和获取

    设置定时任务的运作流程

    • 设定celery配置,存放于config.py中(也可以用其他方式存储)
    • 创建app,导入配置的内容
    • 编写好task和server调用的api
    • celery -A celery_app beat -S redbeat.RedBeatScheduler -l debug --max-interval=10类似的命令运行beat,celery -A celery_app worker -l debug -P gevent -E类似的命令运行worker
    • 运行sanic服务
    • 根据api传入的参数使用redbeat.RedBeatSchedulerEntry创建定时任务,使用RedBeatSchedulerEntry.from_key()获取并修改定时任务
    • 根据api用户和产品返回已设定的定时任务列表供用户查看和操作
  • 相关阅读:
    什么浏览器广告少?多御安全浏览器轻体验
    深入理解线段树
    [JS Framework] 当前运行的基座不包含原生插件[XXX],请在manifest中配置该插件,重新制作包括该原生插件的自定义运行基座
    【C++上层应用】7. Web编程*
    普罗米修斯
    【UiBot干货】UiBot屏幕锁屏常见的7个问题
    Python代码中引用已经写好的模块、方法
    SpringBoot电商项目实战Day7 堆
    NET 使用自带 DI 批量注入服务(Service)和 后台服务(BackgroundService)
    opengl环境配置,使用glew和glfw(C++)
  • 原文地址:https://blog.csdn.net/Moelimoe/article/details/125457557