• Python解决多个服务线程/进程重复运行定时任务的问题


    记录多实例服务定时任务出现运行多次的问题

    问题:web项目运行多个实例时,定时任务会被执行多次的问题

    举例来说
    我使用库APScheduler排定了一个定时任务taskA在每天的晚上9点需要执行一次,我的web服务使用分布式运行了8个实例,于是在每天晚上的9点,我的8个服务都会接收到APScheduler发布的这个定时任务,然后每个服务都会执行一次这个定时任务,这与我预期的只执行1次是不符的,我想要解决这个问题,让定时任务只被执行一次

    设想的解决方式:
    使用redis分布式锁的方式(redis.setnx(key, value))让实例只能运行一次定时任务,从而解决定时任务会被执行多次的问题

    setnx(key, value)方法很简单,就是当key不存在时,setnx会成功,否则会失败

        def setnx(self, name: KeyT, value: EncodableT) -> Awaitable:
            """Set the value of key ``name`` to ``value`` if key doesn't exist"""
            return self.execute_command("SETNX", name, value)
    
    • 1
    • 2
    • 3

    实现方案

    1. setnx(key, value)方法会在key不存在时设置value,当多个线程同时接到排期准备运行同一个任务时,只有第一个线程setnx会成功(返回True),于是第一个setnx成功的线程运行了定时任务,其他线程在setnx时由于key已经存在会失败(返回False),从而让它们跳过定时任务的执行

    2. 仍然存在的问题:定时任务一般会执行多次,在其下一次执行时,setnx相同key的这条记录应该被删除掉,因为这是一次新的任务,否则之后的任务执行都会因setnx时key已存在而失败导致任务无法执行

      i. 第一种方案:在setnx成功的线程1任务执行完成后删除这个key在redis中存储的记录,从而让下一次任务第一次运行时又可以成功setnx(key, value)而执行
      但这种方案存在一定的风险:如果存在线程2因为一些原因阻塞了,在线程1执行完任务才开始接收到运行定时任务的指令,那么线程2会在key被删除后开始尝试setnx,那必然会成功,然后重复了运行任务

      ii. 基于第一种方案的考虑,确定了第二种方案,只需要给每次的定时任务添加唯一标识即可避免第一种方案的问题:设置此次任务运行的唯一key_x,在setnx成功的线程1任务执行完成之后不对这次定时任务的key_x执行删除
      此次定时任务唯一key_x的设置很容易想到的方案是在这次定时任务id上添加运行的排期时间,这样就可以让这一次的定时任务是唯一且可识别了,只要运行了一次其值就永久设置为True,不会在执行第二次(考虑到资源占用,实际应该设置一个较长的过期时间也完全可以避免方案1的风险)

    设置有过期时间的方法应该使用redis.set(key, value, nx=True, ex=10)方法,这里nx=True表名使用命令SETNX,而ex=10则是过期的时间,单位为秒

    第一种方案的可重复运行的小案例:

    # -*- coding: utf-8 -*-
    import asyncio
    import time
    
    import aioredis
    from aioredis import Redis
    
    loop = asyncio.get_event_loop()
    redis_coro = aioredis.Redis()
    
    
    def redis_distributed_lock(cache_key, cache_value="locked"):
        def decorator(func):
            async def wrapper(*args, **kwargs):
                redis_instance = await redis_coro
                # 这里设置了10小时的过期时间,完全可以避免重复运行的风险了
                locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10)
                if locked:  # 第一个线程设置成功值会运行任务,否则不会运行任务
                    print("success")
                    return await func(*args, **kwargs)
                print(f"failed")
    
            return wrapper
    
        return decorator
    
    
    async def ntasks():
        t_time = ["9点", "10点", "11点"]   # 模拟任务在三个时间点被执行
        redis: Redis = await redis_coro
        t_id = 1
    
        async def task_func(tid):
            print(f"{tid=}, executing...")
            return tid
    
        # 为了可重复运行这个示例,先执行删除之前设置的key
        ret = await redis.delete(str(t_id))
    
        for t_t in t_time:
            redis_key = f"{t_id}"    
            # 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_func
            task_f = redis_distributed_lock(redis_key)(task_func)
            # 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理
            for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop):
                res = await f
                # print(f"{res=}")
            print(f"=" * 80)
            time.sleep(5)  # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化
            # break
    
    
    if __name__ == '__main__':
        try:
            loop.run_until_complete(ntasks())
        finally:
            loop.stop()
            loop.close()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    运行结果

    success
    tid='1', executing...
    failed
    failed
    failed
    failed
    failed
    failed
    failed
    ================================================================================
    failed
    failed
    failed
    failed
    failed
    failed
    failed
    failed
    ================================================================================
    failed
    failed
    failed
    failed
    failed
    failed
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    只有第一个时间点是按照预期执行,之后的时间点执行都总是失败,因为每个时间点的该任务设置的key都是一样的

    第二种方案的可重复运行的小案例:

    # -*- coding: utf-8 -*-
    import asyncio
    import time
    
    import aioredis
    from aioredis import Redis
    
    loop = asyncio.get_event_loop()
    redis_coro = aioredis.Redis()
    
    
    def redis_distributed_lock(cache_key, cache_value="locked"):
        def decorator(func):
            async def wrapper(*args, **kwargs):
                redis_instance = await redis_coro
                # 这里设置了10小时的过期时间,完全可以避免重复运行的风险了
                locked = await redis_instance.set(cache_key, cache_value, nx=True, ex=60 * 60 * 10)
                if locked:  # 第一个线程设置成功值会运行任务,否则不会运行任务
                    print("success")
                    return await func(*args, **kwargs)
                print(f"failed")
    
            return wrapper
    
        return decorator
    
    
    async def ntasks():
        t_time = ["9点", "10点", "11点"]   # 模拟任务在三个时间点被执行
        redis: Redis = await redis_coro
        t_id = 1
    
        async def task_func(tid):
            print(f"{tid=}, executing...")
            return tid
    
        for t_t in t_time:
            redis_key = f"{t_id}_{t_t}"     # set的key用定时任务的id+时间点来作为此次定时任务的唯一标识
            # 为了可重复运行这个示例,先执行删除之前设置的key
            ret = await redis.delete(redis_key)
            print(f"key deleted? {ret}")
            # 这里本来预期是直接放到函数头上装饰,但是不方便控制redis_key参数,所以使用了原始的方式装饰task_func
            task_f = redis_distributed_lock(redis_key)(task_func)
            # 假设启动分布式服务8个,会执行8次定时任务,这里创建了8个任务,按照先执行完先返回的顺序处理
            for f in asyncio.as_completed([task_f(redis_key) for _ in range(8)], loop=loop):
                res = await f
                # print(f"{res=}")
            print(f"=" * 80)
            time.sleep(5)  # 模拟一个定时任务在多个时间点执行,下一次执行时,时间参数(t_t)会发生变化
            # break
    
    
    if __name__ == '__main__':
        try:
            loop.run_until_complete(ntasks())
        finally:
            loop.stop()
            loop.close()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    输出

    key deleted? 1
    success
    tid='1_9点', executing...
    failed
    failed
    failed
    failed
    failed
    failed
    failed
    ================================================================================
    key deleted? 1
    success
    tid='1_10点', executing...
    failed
    failed
    failed
    failed
    failed
    failed
    failed
    ================================================================================
    key deleted? 1
    success
    tid='1_11点', executing...
    failed
    failed
    failed
    failed
    failed
    failed
    failed
    ================================================================================
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    可以看到task_func任务的每个时间点的执行都只有一次成功,而且不会出现只有第一个时间点执行成功而之后的时间点执行都全是失败的情况

    有用的参考:
    起初总是尝试在协程方法中使用多线程threading.Thread老是碰壁,看了这个回答后读了这篇文章,感觉豁然开朗

  • 相关阅读:
    pip和conda的环境管理,二者到底应该如何使用
    Plato Farm全新玩法,套利ePLATO稳获超高收益
    前端入门学习笔记三十六
    Django的网站项目开发好了,该用何种方案在Centos上部署【答:Gunicorn(uWSGI)+Nginx】
    新的3D地图制图技术改变了全球定位的游戏规则
    从数学角度和编码角度解释 熵、交叉熵、KL散度
    二叉(搜索)树的最近公共祖先 ●●
    ArcGIS QGIS学习一:打开shp、geojson地图变形变扁问题(附最新坐标边界下载全国省市区县乡镇)
    go语言之MongoDB的插入查找更新和删除
    Ts官方文档翻译-Generic范型
  • 原文地址:https://blog.csdn.net/Moelimoe/article/details/126679161