• redis - 实现周期性数据无上报检测


    需求背景

    以小时为周期不停地上报事件到事件平台,事件平台如果在连续2个周期 没有检测到上报的事件,就会发送告警给事件的相关责任人.

    问题的难点在于如何检测连续周期内无数据?

    如上图,2 点和 3 点,都没有上报数据,说明连续两个周期存在无数据上报.

    解决方案

    本文采用 redis 的有序集合实现此算法.

    1. 假设

    先做一些假设

    • 事件需要有类型属性,每种类型的事件有唯一一个上报渠道 ID
      ,参数为 data_id,每个上报渠道只有一种事件类型.
    • 上报的每个事件都有一个hash值,参数为 hash_value,相同 hash 值的所有事件当做一条时间序列.连续周期无数据检测针对的是一条时间序列进行处理.

    2. 记录事件的hash 值到 redis 集合

    事件平台每收到一个事件,就将事件的 hash 值hash_value 记录在 redis 的 set 中,示例代码如下

    from django.core.cache import cache as redis_cache
    
    data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
    hash_value = "7dcfd8f2-6feb-11ee-a73d-acde48001122"
    
    redis_key = data_id
    redis_client = redis_cache.client.get_client()
    redis_client.sadd(redis_key, hash_value)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3. 记录事件的上报时间到 redis 有序集合

    事件平台每收到一个事件,就将上报时间戳作为 score,记录到 redis 的 sorted set 中,示例代码如下

    from django.core.cache import cache as redis_cache
    
    data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
    hash_value = "7dcfd8f2-6feb-11ee-a73d-acde48001122"
    
    # 假设事件的值为
    event = {
      "hash_value": hash_value,
      "data_id": data_id,
      "data": {
        "key1": "value1",
        "key2": "value2"
      }
    }
    
    redis_key = f"{data_id}:{hash_value}"
    score = int(time.time())
    
    redis_client = redis_cache.client.get_client()
    redis_client.zadd(redis_key, {value: score})
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    使用有序集合,就可以构造出一个分数为时间戳的时间序列.

    4. 检测无数据周期

    需要开启一个定时任务,这里假设5 分钟检测一次时间序列.每次检测指定时间范围内事件的数量.

    from datetime import timezone
    from django.core.cache import cache as redis_cache
    from django.utils import timezone
    
    data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
    hash_value = "7dcfd8f2-6feb-11ee-a73d-acde48001122"
    
    # 检测周期 1 小时
    interval_seconds = 3600
    # 无数据周期数
    cycle_number = 2
    # 计算无数据检查时间范围
    period_end_time = timezone.now()
    period_start_time = period_end_time - timedelta(seconds=interval_seconds * cycle_number)
    # 计算分数区间
    period_start_score = int(period_start_time.timestamp())
    period_end_score = int(period_end_time.timestamp())
    
    # 获得周期范围内的事件数量
    redis_key = f"{data_id}:{hash_value}"
    redis_client = redis_cache.client.get_client()
    count = redis_client.zcount(redis_key, period_start_score, period_end_score)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    如果count为 0,说明在连续周期内无数据.

    5. 删除历史数据

    由于是周期性事件上报,时间久了 set 和 sorted set 的值就会越来越大,导致 redis 服务器内存告警.所以我们需要定期清理过期的数据.

    from django.core.cache import cache as redis_cache
    
    data_id = "9fffdf1c-6feb-11ee-a73d-acde48001122"
    
    
    redis_client = redis_cache.client.get_client()
    
    # 获取所有的事件
    redis_key = data_id
    hash_value_set = redis_client.smembers(redis_key)
    
    # 指定删除的时间区间
    min_score = ""
    max_score = ""
    
    # 根据指定的时间取件删除有序集合中过期的数据
    for hash_value in hash_value_set:
        redis_key = f"{data_id}:{hash_value}"
        redis_client.zremrangebyscore(redis_key, min_score, max_score)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
  • 相关阅读:
    8Manage PM:通过项目管理信息系统做好进度管控
    mysql查看binlog内容,dump 恢复单表
    MySQL使用CASE WHEN统计SQL语句代替子查询SQL统计,CASE WHEN常用写法,根据不同的条件对数据进行分类、分组和聚合
    MindSponge分子动力学模拟——Constraint约束(2023.09)
    基于springboot+vue水务报修处理系统
    Nacos使用教程(五)——配置管理中心
    深度学习之Python,OpenCV中的卷积
    接口性能测试方案设计方法有哪些?要怎么去写?
    用uniapp开发打包多端应用完整指南
    LibreOJ 137. 最小瓶颈路(加强版) 题解 Kruscal重构树 ST表
  • 原文地址:https://blog.csdn.net/liuyuan_jq/article/details/133964437