• Celery笔记八之数据库操作定时任务


    前面我们介绍定时任务是在 celery.py 中的 app.conf.beat_schedule 定义,这一篇笔记我们介绍一下如何在 Django 系统中的表里来操作这些任务。

    1. 依赖及migrate操作
    2. beat 的启动
    3. 表介绍
    4. 手动操作定时任务

    1、依赖安装及migrate操作

    我们先通过 app.conf.beat_schedule 定义定时任务:

    app.conf.beat_schedule = {
        'add-every-60-seconds': {
            'task': 'blog.tasks.add',
            'schedule': 60,
            'args': (16, 16),
        },
        'schedule_minus': {
            'task': 'blog.tasks.minus',
            'schedule': crontab(minute=5, hour=2),
            'args': (12, 24),
        },
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    如果我们就这样启动 Django 系统,worker 和 beat 服务,系统的定时任务就只有一个,写死在系统里。

    当然,我们也可以使用一些 celery 的函数来手动向系统里添加定时任务,但是我们有一个更好的方法来管理操作这些定时任务,那就是将这些定时任务写入到数据库中,来进行增删改查操作,定制开发。

    将定时任务写入数据库,我们需要进行以下几步操作:

    • 安装依赖
    • INSTALLED_APP添加模块
    • 执行migrate

    安装依赖

    通过 pip 安装一个 django-celery-beat 依赖:

    pip3 install django-celery-beat
    
    • 1

    INSTALLED_APP添加模块

    安装后,要正常使用还需要将其添加到 settings.py 的 INSTALLED_APPS 中:

    # settings.py
    
    INSTALLED_APPS = [
        ...,
        'django_celery_beat',
    ]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    执行migrate

    接下来我们执行 migrate 操作将需要创建的表写入数据库:

    python3 manage.py migrate
    
    • 1

    可以看到如下输出:

    Running migrations:
      Applying django_celery_beat.0001_initial... OK
      Applying django_celery_beat.0002_auto_20161118_0346... OK
      Applying django_celery_beat.0003_auto_20161209_0049... OK
      Applying django_celery_beat.0004_auto_20170221_0000... OK
      Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
      Applying django_celery_beat.0006_auto_20180322_0932... OK
      Applying django_celery_beat.0007_auto_20180521_0826... OK
      Applying django_celery_beat.0008_auto_20180914_1922... OK
      Applying django_celery_beat.0006_auto_20180210_1226... OK
      Applying django_celery_beat.0006_periodictask_priority... OK
      Applying django_celery_beat.0009_periodictask_headers... OK
      Applying django_celery_beat.0010_auto_20190429_0326... OK
      Applying django_celery_beat.0011_auto_20190508_0153... OK
      Applying django_celery_beat.0012_periodictask_expire_seconds... OK
      Applying django_celery_beat.0013_auto_20200609_0727... OK
      Applying django_celery_beat.0014_remove_clockedschedule_enabled... OK
      Applying django_celery_beat.0015_edit_solarschedule_events_choices... OK
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    然后可以看到在 Django 系统对应的数据库里新增了几张表,表的介绍及使用我们在后面再介绍。

    2、beat 的启动

    在启动 beat 前,我们需要对时区进行设置,前面我们介绍过在 Django 和 celery 中都需要设置成北京时间:

    TIME_ZONE = "Asia/Shanghai"
    USE_TZ = False
    
    # celery 时区设置 
    CELERY_TIMEZONE = "Asia/Shanghai"
    CELERY_ENABLE_UTC = False
    DJANGO_CELERY_BEAT_TZ_AWARE = False
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    启动 beat 我们需要添加参数将数据指定存储在数据库中,可以在启动 beat 的时候添加参数:

    celery -A hunter beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
    
    • 1

    也可以通过 app.conf.beat_scheduler 指定值:

    app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
    
    • 1

    然后直接启动 beat:

    celery -A hunter beat -l INFO
    
    • 1

    3、表介绍

    在执行完 migrate 之后系统会多出几张表,都是定时任务相关的表:

    • django_celery_beat_clockedschedule
    • django_celery_beat_crontabschedule
    • django_celery_beat_intervalschedule
    • django_celery_beat_solarschedule
    • django_celery_beat_periodictask
    • django_celery_beat_periodictasks

    其中 django_celery_beat_clockedschedule 和 django_celery_beat_solarschedule 暂时不介绍

    django_celery_beat_crontabschedule

    是我们的周期任务表,比如我们上面定义的:

        'schedule_minus': {
            'task': 'blog.tasks.minus',
            'schedule': crontab(minute=5, hour=2),
            'args': (12, 24),
        },
    
    • 1
    • 2
    • 3
    • 4
    • 5

    执行 celery 的 beat 后,会在该表新增一条数据,表的字段就是我们设置的 crontab() 里的值,包括 minute,hour,day_of_week,day_off_month,month_of_year 和 timezone。

    除了 timezone 字段,前面的字段如何定义和使用上一篇笔记中已经详细介绍过,timezone 字段则是我们在 settings.py 里定义的时区信息。

    django_celery_beat_intervalschedule

    这张表的数据是我们定义的间隔时间任务的表,比如每隔多少秒,多少分钟执行一次。

    该表只有 id, every 和 period 字段,every 表示的是时间的间隔,填写的数字,period 则是单位,可选项有:

    • microseconds:毫秒
    • seconds:秒
    • minutes:分钟
    • hours:小时
    • days:天

    我们在定义间隔任务的时候,除了直接使用数字表示秒之外,还可以使用 datetime.timedelta() 来定义其他时间,比如:

    from datetime import timedelta
    app.conf.beat_schedule = {
        'add-every-60-seconds': {
            'task': 'blog.tasks.add',
            'schedule': timedelta(minutes=5),
            'args': (16, 16),
        },
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    但是当我们启动 beat 的时候,系统在写入数据库的时候还是会自动为我们将其转化为秒数,比如 minutes=5,会给我们加入的数据是:

    every=300, period='seconds'
    
    • 1

    django_celery_beat_periodictask

    这张表其实是对前面几张表的任务的一个汇总,

    • crontab_id,interval_id 等外键字段来判断是属于哪张表的定时/周期任务
    • last_run_at 上次运行时间
    • total_run_count 总运行次数
    • name 表示任务名称
    • task 字段表示任务来源等

    还有参数,队列等信息。

    每一条在 django_celery_beat_crontabschedule 和 django_celery_beat_intervalschedule 表中的数据都必须在该表中有一个汇总的信息记录才可以正常运行。

    也就是说在前面的两张表中可以添加各种任务执行的策略,然后在 django_celery_beat_periodictask 中有一个数据指向该策略,就可以使用该策略进行周期任务的执行。

    其中,name 字段上是有唯一键的,但是 task 可以重复写入,这也就意味着我们可以针对同一个 task 制定不同的定时策略。

    django_celery_beat_periodictasks

    这个表就一条数据,保存的是系统上一次执行任务的时间。

    4、手动操作定时任务

    接下来我们自己定义两个周期任务,一个是 blog.tasks.add 函数,每隔20s运行一次,另一个是 blog.tasks.minus 函数,每天晚上 23点15分执行一次。

    我们首先还是运行 beat 和 worke,然后在 python3 manage.py shell 中执行下面的代码:

    import json
    from django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTask
    
    twenty_second_interval, _ = IntervalSchedule.objects.get_or_create(every=20, period=IntervalSchedule.SECONDS)
    eleven_clock_crontab, _ = CrontabSchedule.objects.get_or_create(minute=18, hour=23)
    
    PeriodicTask.objects.get_or_create(
        interval_id=twenty_second_interval.id,
        name="twenty_second_interval",
        task="blog.tasks.add",
        args=json.dumps([1, 2]),
    )
    
    PeriodicTask.objects.get_or_create(
        crontab_id=eleven_clock_crontab.id,
        name="eleven_clock_crontab",
        task="blog.tasks.minus",
        args=json.dumps([8, 2]),
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    然后可以看到运行 beat 的 shell 中或者日志文件有输出下面的信息:

    DatabaseScheduler: Schedule changed.
    
    • 1

    其实就是系统监测了 PeriodicTask 表,发现它的数据有变化就会重新更改一次,当 beat 服务启动,系统会去 PeriodicTask 表里获取数据。

    如果这些任务的数据有更改,系统就会检测到然后发出 Schedule changed 的信息。

    我这边测试了 name、enabled、one_off、args 等字段,发现修改后系统都会捕获到任务的变化。

    其中,one_off 字段的含义是该任务仅执行一次。

    本文首发于本人微信公众号,可搜索关注:Hunter后端

  • 相关阅读:
    CentOS 7 使用pugixml 库
    毫米波V2I网络的链路层仿真研究(Matlab代码实现)
    Stable Diffusion 模型:从噪声中生成逼真图像
    【Kingbase8数据库】flowable兼容人大金仓Kingbase8过程
    WebDAV之葫芦儿·派盘+书藏家
    Ninja is required to load C++ extensions in Pycharm
    批量查询谷歌PR权重的方法有哪些?是什么影响着谷歌PR值?
    springboot框架中生成一个md5文件校验类,md5文件校验类必须包括传入的一个key值秘钥,还有上传内容是byte[]类型
    【C++进阶】map和set——下篇(红黑树的学习以及封装map和set)
    Vue 源码解读(8)—— 编译器 之 解析(上)
  • 原文地址:https://blog.csdn.net/weixin_43354181/article/details/126823183