• 【记录】celery + mongodb 动态添加、更新定时任务时间


    更新时间

    • 2023-06-12

    一、目录结构

    ├── run_celery.py
    ├── celery_tasks
        │   ├── sync_task.py
        │   ├── task1.py
        │   └── task2.py
        └──
    ├── service
        │   ├── add_task.py
        └──
    ├── database
        │   ├── mongodb_class.py
        └──
    ├── config
        │   ├── config.py
        └──
    └──
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    功能说明

    1. run_celery.py 用于定义celery应用
    2. celery_tasks 用于存放需要定时执行的任务方法
    3. sync_task.py 用于触发动态添加任务
    4. task1.py, task2.py 被动态添加的任务
    5. service add_task.py 添加任务方法
    6. database mongodb_class.py 被封装的mongodb操作类
    7. config config.py 用于定义celery、mongodb 配置

    二、逻辑

    在这里插入图片描述

    三、依赖

    1. mongodb 安装

    https://blog.csdn.net/qq_21103471/article/details/124706915

    2. pip 依赖包安装

    pip install celery==5.2.7
    
    pip install celerybeat-mongo==0.2.0
    
    pip install eventlet==0.33.1
    
    pip install mongoengine==0.24.2
    
    pip install pymongo==3.12.0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    四、代码

    1. 配置项

    config.py

    # celery 任务数据库
    CELERY_DB_NAME_BROKE = 'celery_broke'  # mongodb 任务存储 - 可自定义
    CELERY_DB_NAME_BACKEND = 'celery_backend'  # mongodb celery 执行结果存储
    CELERY_COLLECT_NAME = 'schedules'  # 任务所在表名
    CELERY_DRIVER = 'mongodb'  # celery 数据存储方式
    CELERY_DB_HOST = 'localhost'  
    CELERY_DB_PORT = 27017
    CELERY_TASK_DB_URI = f'{CELERY_DRIVER}://{CELERY_DB_HOST}:{CELERY_DB_PORT}'  # 用于本地操作mongodb库
    CELERY_BROKER = f'{CELERY_TASK_DB_URI}/{CELERY_DB_NAME}'  # 用于给celery存储消息中间件,任务id
    CELERY_BACKEND = f'{CELERY_TASK_DB_URI}/{CELERY_DB_NAME}' # 用于给celery存储执行结果
    CELERY_DB_ALIAS = 'celery_alias'  # 别名
    CELERY_TIMEZONE = 'Asia/Shanghai'  # 时区
    CELERY_UTC = True  # 是否UTC时间
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2. celery 任务定义

    
    from celery import Celery
    from config.config import (
        CELERY_TASK_DB_URI, CELERY_BACKEND, CELERY_BROKER, CELERY_TIMEZONE,
        CELERY_UTC, CELERY_DB_NAME_BROKE
    )
    from service.add_task import TaskScheduler
    
    task_path = 'celery_tasks'
    check_task_path = f'{task_path}.sync_task.sync_time'
    sync_task_path1 = f'{task_path}.task1.task_a'
    sync_task_path2 = f'{task_path}.task2.task_b'
    
    celery_app = Celery(
        'task_name_whatever',
        backend=CELERY_BACKEND,
        broker=CELERY_BROKER,
    )
    
    celery_app.conf.update(
        {
            'beat_dburi': CELERY_TASK_DB_URI,
            'broker_url': CELERY_BROKER,
            'result_backend': CELERY_BACKEND,
            'timezone': CELERY_TIMEZONE,
            'enable_utc': CELERY_UTC,
            'mongodb_scheduler_db': CELERY_DB_NAME_BROKE, # mongodb schedule 所在数据库,与celery beat相关
            'include': [
                f'{task_path}.sync_task',
                f'{task_path}.task1',
                f'{task_path}.task2',
            ],
        }
    )
    # 每3分钟检查一下是否需要动态添加/更新任务时间
    TaskScheduler.update(
        'sync_task',
        check_task_path,
        'sync_task',
        'minutes',
        3,
    )
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    3. mongodb 操作封装

    PS:celerybeatmongo PeriodicTask 用的是 mongoengine
    而我自己项目操作数据库用的是pymongo
    这一步记录是为了连贯,可以不用看,只是按照我的业务逻辑重写了一下方法
    其实就是 mongodb 的增删改查

    import pymongo
    from configs.config import CELERY_DB_NAME, CELERY_TASK_DB_URI, CELERY_COLLECT_NAME
    
    
    class MongoDBClass:
    
        def __init__(self, opt, collect_name=CELERY_COLLECT_NAME, value=None, condition=None, order='_id'):
            # 连接mongodb
            self.client = pymongo.MongoClient(CELERY_TASK_DB_URI)
            self.db = self.client[CELERY_DB_NAME]
            self.col = self.db[collect_name]
            self.opt = opt
            self.value = value
            self.condition = condition
            self.order = order
    
        def query(self):
            return self.col.find_one(self.condition)
    
        def insert(self):
            ...
    
        def update(self, is_insert=False):
            ...
    
        def upsert(self):
            ...
    
        def delete(self):
            ...
    
        def close(self):
            self.client.close()
    
        def __enter__(self):
            try:
                for attr in dir(MongoDBClass):
                    if self.opt == attr:
                        return eval(f'self.{attr}')()
            except Exception as e:
                print(f'execute mongodb error, error option:{self.opt}, error info:', e)
                return None
    
        def __exit__(self, exc_type, exc_val, exc_tb):
            self.close()
            # 异常
            if exc_type is not None:
                print(exc_type, exc_val)
                return False
            return True
    
    
    class MongoDBOperate:
        @staticmethod
        def query(collect_name=CELERY_COLLECT_NAME, condition=None):
            with MongoDBClass('query', collect_name, condition=condition) as res:
                return res
    
        @staticmethod
        def is_exist(collect_name=CELERY_COLLECT_NAME):
            with MongoDBClass('find', collect_name) as res:
                return True if res else False
    
        @staticmethod
        def insert(value, collect_name=CELERY_COLLECT_NAME):
            with MongoDBClass('insert', collect_name, value=value) as res:
                return res
    
        @staticmethod
        def update(condition, value, collect_name=CELERY_COLLECT_NAME):
            with MongoDBClass('update', collect_name, value=value, condition=condition) as res:
                return res
    
        @staticmethod
        def delete(condition, collect_name=CELERY_COLLECT_NAME):
            with MongoDBClass('delete', collect_name, condition=condition) as res:
                return res
    
        @staticmethod
        def upsert(condition, value, collect_name=CELERY_COLLECT_NAME):
            # 更新,不存在则添加
            with MongoDBClass('upsert', collect_name, value=value, condition=condition) as res:
                return res
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    4. 定义任务添加方法

    from celerybeatmongo.models import PeriodicTask, get_periodic_task_collection
    from database.mongo_class import MongoDBClass
    from configs.config import (
        CELERY_DB_NAME_BROKE, CELERY_DB_HOST, CELERY_DB_PORT, CELERY_DB_ALIAS
    )
    import mongoengine
    
    
    # 重写meta,定义PeriodicTask mongodb 默认连接
    def celery_get_periodic_task_collection():
        try:
            return get_periodic_task_collection()
        except Exception as e:
            print('celery_get_periodic_task_collection err:', e)
            return "schedules"
    
    
    class SubPeriodicTask(PeriodicTask):
        meta = {
            'collection': celery_get_periodic_task_collection(),
            'allow_inheritance': True,
            'db_alias': CELERY_DB_ALIAS,
        }
    
    
    class TaskScheduler:
    
        @staticmethod
        def add(task_name: str, task_path: str, period: str, every: int, args=None, kwargs=None):
            permissible_periods = ['days', 'hours', 'minutes', 'seconds']
            if period not in permissible_periods:
                raise Exception('Invalid period specified')
            # 判断任务是否存在
            if TaskScheduler.is_exist(task_name):
                print(f'任务{task_name}已存在')
                return False
            # 创建任务
            mongoengine.connect(
            CELERY_DB_NAME_BROKE, host=CELERY_DB_HOST, port=CELERY_DB_PORT, alias=CELERY_DB_ALIAS)
            
            periodic = SubPeriodicTask(
                name=task_name,
                task=task_path,
                interval=SubPeriodicTask.Interval(every=every, period=period),
                run_immediately=True,
                enabled=True,
                args=args,
                kwargs=kwargs,
                connection_alias=CELERY_DB_ALIAS,
            )
            periodic.save()
            mongoengine.disconnect()
            return True
    
        @staticmethod
        def update(task_name: str, task_path: str, search_name: str, period: str, every: int, args=None, kwargs=None):
            # 判断相关任务是否已经存在
            with MongoDBClass('query', condition={'name': search_name}) as tasks:
            	if task and isinstance(task, dict):
                    task_id = task.get('_id')
                    if task_id:
                    	TaskScheduler.delete({'_id': task_id})
            return TaskScheduler.add(task_name, task_path, period, every, args, kwargs)
    
        @staticmethod
        def query(condition):
            return MongoDBOperate.query(condition=condition)
    
        @staticmethod
        def delete(condition):
            return MongoDBOperate.delete(condition=condition)
            
    	@staticmethod
        def is_exist(task_name):
            task = TaskScheduler.query({'name': task_name})
            return True if task and task.get('_id') else False
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    5. 检查方法调用任务添加

    sync_task.py

    
    from celery_run import celery_app, sync_task_path1, sync_task_path2, check_task_path
    from service.add_task import TaskScheduler
    from datetime import datetime
    import re
    
    
    @celery_app.task(name=check_task_path)
    def check_time():
    	# 假设此处 查询了数据库,字段test=1就执行task1,=2 则执行task2
    	# ...
    	test = 1
    	task_name = ''
    	task_path = ''
    	sync_time = 0
    	if test == 1:
    		# 以下为业务逻辑,省略
    		task_name = 'task1'
            task_path = sync_task_path1
            sync_time = 20
        elif test == 2:
    		# 以下为业务逻辑,省略
        	task_name = 'task2'
            task_path = sync_task_path2
            sync_time = 15
        # 如果需要修改任务,则调用任务修改方法
        if task_name:
        	# 任务名统一为:
        	task_name = f'{task_name}-every-{sync_time}-minute'
        	# 任务在mongodb 里的查找方法我用的是正则匹配任务名
            search_name = f'$regex:/{task_name}-every-(.*?)-minute/'
            # 任务修改
            TaskScheduler.update(
                task_name, task_path, search_name, 'minutes', sync_time,
                ['传参1', '传参2'])
         
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    6. 最终任务方法

    task1.py

    from celery_run import celery_app, sync_task_path1
    
    @celery_app.task(name=sync_task_path1)
    def task_a(arg1, arg2):
        # 业务逻辑
        ...
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    task2.py

    from celery_run import celery_app, sync_task_path2
    
    @celery_app.task(name=sync_task_path2)
    def task_b(arg1, arg2):
        # 业务逻辑
        ...
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    7. 执行

    celery -A celery_run.celery_app worker --loglevel=debug -P eventlet -f ./logs/celery.log
    
    celery -A celery_run.celery_app beat -S celerybeatmongo.schedulers.MongoScheduler -l info -f ./logs/celery2.log
    
    • 1
    • 2
    • 3

    四、总结

    一个任务的文件轨迹为:sync_task.py -> add_task.py -> task1.py & task2.py

  • 相关阅读:
    分布式操作系统的必要性及重要性
    图像&视频编辑工具箱MMEditing安装及使用示例(Inpainting)
    组合总和 (递归回溯+剪枝)
    奥威BI系统:做数据可视化大屏,又快又简单
    怎么运用大语言模型的
    WebAssembly入门笔记[4]:利用Global传递全局变量
    Linux内核设计与实现 第三章 进程管理
    NC16884 [NOI2001]食物链
    家宽动态公网IP,使用docker+ddns 实现动态域名解析
    Flink学习6:编程模型
  • 原文地址:https://blog.csdn.net/qq_35567179/article/details/127704085