• 多机分布式执行异步任务的实现姿势


    1. 多机分布式执行异步任务的实现姿势

    1.1. 序言

    执行异步任务时, 比如需要处理 10W 个订单, 如果是 PHP, 我们一般会配置一个定时任务, 然后该定时任务就会在单机上执行; 如果是 GO 或者 JAVA, 我们也需要使用相应的策略, 保证该任务只在单机上执行, 比如分布式锁。可能有同学会问, 我直接在多机上执行同一个任务不行么, 我只想说, 你胆子真大, 当多机同时处理一条数据, 你会死的很惨的。
    那我们是否有一种方法, 可以让任务在多机同时执行, 然后又可以避免多机同时处理相同数据的问题呢? 这里给大家介绍一种多机分片的方式, 也是最近在公司 Get 到的新技能。

    1.2. 应用场景

    最近在做异步任务迁移, 要求对 DB 中的订单进行处理, 因为订单的数量非常大, 10W 的数量级是常规状态, 如果只通过一台机器去处理, 执行效率非常低, 所以需要通过多机并发处理。
    对于上述方式, 其实还有另外一种解决方案, 就是单机执行任务, 然后把任务放入消息队列, 再新增一个接口, 用于消费队列中的数据, 然后进行数据处理, 因为接口对应的服务是集群部署, 所以执行速度很快, 不过这里在设计方案时, 需要考虑消息重复消费, 多机可能同时处理单条消息, 网络异常导致消息未得到处理等问题, 具体解决方案, 欢迎大家线下和我讨论哈。

    1.3. 多机分片

    什么是多机分片呢? 说的通俗一点, 就是把数据分成 N 份, 分别给每一台机器执行。比如我们有 1000 条数据, 通过相应策略, 将数据分成 5 份, 每份数据 200 条, 如果我们有 5 台机器, 那么每台机器可以分别处理 200 条数据。
    那么具体是怎么实现?
    为了更好讲解, 我先简单模拟一下场景:

    • DB 包含 20 条数据, DB 主键 ID 为 0、1、2、3 … 19;
    • 有 3 台机器, 每台机器起一个线程跑任务, 共起 3 个线程;
    • 需要将数据分成 10 份, 每份数据有 2 条, 然后分给这 3 个线程。

    1.4. 令牌获取

    将数据分成 10 分, 就有 10 个令牌, 即 number=10, 分别为 0、1、2 … 9, 处理逻辑如下:

    • 每个任务有任务名, 以任务名为 key, 通过 increase=redis.incr(key) 计数, 然后将 increase 值通过 number 取模, 得到令牌 token=increase%number, 第一次执行的 increase=1, 所以 token=1%10=1;
    • 构造令牌 tokenKey=key+token=key1, 然后通过 redis 对 tokenKey 加一个分布式锁, 如果加锁成功, 返回令牌值;
    • 如果加锁失败, 循环执行 increase=redis.incr(key), 此时 increase=2, token=2%10=2, 拿到令牌 tokenKey=key2, 再执行分布式锁, 成功返回, 未成功, 同上依次反复。

    Redis Incr 命令将 key 中储存的数字值增一。如果 key 不存在, 那么 key 的值会先被初始化为 0 , 然后再执行 INCR 操作。

    1.5. 分片获取 DB 数据

    机器的线程拿到令牌后, 就可以去分片获取数据了, 假如 DB 的数据结构如下, 且只有 20 条数据:

    订单号 orderId商品名 productName配送状态 status
    0数据线0
    1键盘0
    2显示器0
    19鼠标0

    下面看一下分片获取数据流程:

    • 当线程拿到的令牌为 token=0, 可以通过 select * from tableName wwhere orderId % 10 = token and status = 0 limit pageSize;(假如 pageSize=100), 因为取模匹配的数据的 orderId=0 和 10, 所以该线程可以拿到 0 和 10 这两条数据, 如果 pageSize=1, 那就只能拿到 0 这条数据, 数据 10 等下次处理时再获取;
    • 拿到分片数据后, 就可以开始对数据进行逻辑处理, 处理完毕后, 需要将 status 置为 1, 避免下次再扫描到该数据。
      留给大家一个问题, 如果有一条数据一直处理失败, 每次获取数据, 都会先获取到这条问题数据, 那么有什么策略可以让这条数据推后执行呢?

    1.6. 实战模拟

    这里需要用到分布式锁, 分布式锁的代码, 已经在文章"Redis 实现分布式锁"中已经说明, 下面先看获取释放令牌的代码:

    const NO_INDEX      = 100
    const REDIS_ALIAS_NAME  = "jointly"
    const TASK_NAME   = "task_name"
    const RANGE_DATA   = int64(10)
    const PAGE_SIZE   = int64(2)
    
    // 分片任务
    type FragmentTask struct {
     RedisLock
    }
    
    // 获取令牌
    func (f *FragmentTask) GetToken(processId int64) (tokenId int64, err error) {
     i := 0
     for {
      increase, err := redis.Incr(REDIS_ALIAS_NAME, TASK_NAME)
      if err != nil {
       return 0, err
      }
      tokenId := increase % RANGE_DATA
      lockKey := TASK_NAME + string(tokenId)
      if f.GetDistributeLock(lockKey, 60) {
       fmt.Printf("Get lock key success, processId:%d, tokenId:%d\n", processId, tokenId)
       return tokenId, nil
      }
      fmt.Printf("Get lock key conflict, processId:%d, tokenId:%d\n", processId, tokenId)
      i++
      if int64(i) >= RANGE_DATA {
       fmt.Printf("Begin a new cycle.\n")
       return NO_INDEX, nil
      }
     }
    }
    
    // 释放令牌锁
    func (f *FragmentTask) ReleaseToken(tokenId int64) bool {
     lockKey := TASK_NAME + string(tokenId)
     ret := f.DelDistributeLock(lockKey)
     if !ret {
      fmt.Printf("Release token failed, tokenId:%d\n", tokenId)
     }
     return ret
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    令牌生成的流程, 前面已经详细讲解, 这里需要注意的是, 我们每次只遍历 rangData 范围, 超过该范围后会退出, 外层其实有个循环, 会重新进入。
    我们再看看通过令牌获取分片数据的逻辑:

    func ( *Order) QueryOrderList(rangeData, tokenId, pageSize int64) (data []OrderData, err error){
     o := orm.NewOrm()
     o.Using("default")
     num, err := o.Raw("SELECT * from "+ "tb_order where status = 0 and order_id % ? = ? limit ?", rangeData, tokenId, pageSize).QueryRows(&data)
     if err != nil {
      return nil, err
     }
     if num > 0 {
     }
     return data, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    下面是单个线程的任务处理流程:

    // 处理任务
    func (f *FragmentTask) DoProcess(processId int64) error {
     order := &db.Order{}
     for {
      tokenId, err := f.GetToken(processId)
      if err != nil {
       fmt.Printf("failed, exist!\n")
       return err
      }
      // 所有的令牌都锁住了, 睡眠以后, 再重新执行
      if tokenId == NO_INDEX {
       fmt.Printf("All token is conflict, sleep for a while.\n")
       time.Sleep(time.Second * 8)
       continue
      }
      orderList, err := order.QueryOrderList(RANGE_DATA, tokenId, PAGE_SIZE)
      if err != nil {
       fmt.Printf("Query order list failed, tokenId:%d, err:%s\n", tokenId, err.Error())
       f.ReleaseToken(tokenId)
       continue
      }
      fmt.Printf("Begin to process, processId:%d, tokenId:%d, print orderList:%v\n", processId, tokenId, orderList)
      // 处理任务, 用 sleep 模拟
      time.Sleep(time.Second * 1)
      // 处理完数据, 更新 DB 记录状态
      for _, orderRecord := range orderList {
       orderRecord.Status = 1
       order.UpdateOrderStatus(&orderRecord)
      }
      f.ReleaseToken(tokenId)
     }
     return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    这个逻辑是不是很清晰, 就是一个 SQL 查询。最后就是多线程处理逻辑, 我们只开了 3 个线程, 模拟 3 台机器(假如每台机器只有一个线程):

    // 测试任务分片
    func FragmentTest(fragmentTask *redis.FragmentTask) {
     // 开启 3 个线程(模拟 3 台机器), 去处理任务
     for i := 0; i <= 2; i ++ {
      go fragmentTask.DoProcess()
     }
     // 避免子线程退出, 主线程睡一会
     time.Sleep(time.Second * 100)
    }
    func main() {
     redisLock := &redis.RedisLock{}
     order := &db.Order{}
     fragmentTask := &redis.FragmentTask{}
     // 初始化资源
     redisLock.IntiRedis()
     order.InitDb()
     // 测试任务分片
     FragmentTest(fragmentTask)
     return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    我们先看看 DB 执行前数据, 初始状态 status 都是 0, 然后 order_id 是主键:

    mysql> select * from tb_order;
    +----+----------+--------------+--------+
    | id | order_id | product_name | status |
    +----+----------+--------------+--------+
    |  1 |        1 | 鼠标 1        |      0 |
    |  2 |        2 | 鼠标 2        |      0 |
    |  3 |        3 | 鼠标 3        |      0 |
    |  4 |        4 | 鼠标 4        |      0 |
    |  5 |        5 | 鼠标 5        |      0 |
    |  6 |        6 | 鼠标 6        |      0 |
    |  7 |        7 | 鼠标 7        |      0 |
    |  8 |        8 | 鼠标 8        |      0 |
    |  9 |        9 | 鼠标 9        |      0 |
    | 10 |       10 | 鼠标 10       |      0 |
    | 11 |       11 | 鼠标 11       |      0 |
    | 12 |       12 | 鼠标 12       |      0 |
    | 13 |       13 | 鼠标 13       |      0 |
    | 14 |       14 | 鼠标 14       |      0 |
    | 15 |       15 | 鼠标 15       |      0 |
    | 16 |       16 | 鼠标 16       |      0 |
    | 17 |       17 | 鼠标 17       |      0 |
    | 18 |       18 | 鼠标 18       |      0 |
    | 19 |       19 | 鼠标 19       |      0 |
    | 20 |       20 | 鼠标 20       |      0 |
    +----+----------+--------------+--------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    直接看执行结果:

    Get lock key success, processId:0, tokenId:1
    Get lock key success, processId:1, tokenId:2
    Get lock key success, processId:2, tokenId:3
    Begin to process, processId:0, tokenId:1, print orderList:[{1 1 鼠标 1 0} {11 11 鼠标 11 0}]
    Begin to process, processId:2, tokenId:3, print orderList:[{3 3 鼠标 3 0} {13 13 鼠标 13 0}]
    Begin to process, processId:1, tokenId:2, print orderList:[{2 2 鼠标 2 0} {12 12 鼠标 12 0}]
    Get lock key success, processId:0, tokenId:4
    Begin to process, processId:0, tokenId:4, print orderList:[{4 4 鼠标 4 0} {14 14 鼠标 14 0}]
    Get lock key success, processId:1, tokenId:5
    Begin to process, processId:1, tokenId:5, print orderList:[{5 5 鼠标 5 0} {15 15 鼠标 15 0}]
    Get lock key success, processId:2, tokenId:6
    Begin to process, processId:2, tokenId:6, print orderList:[{6 6 鼠标 6 0} {16 16 鼠标 16 0}]
    Get lock key success, processId:0, tokenId:7
    Begin to process, processId:0, tokenId:7, print orderList:[{7 7 鼠标 7 0} {17 17 鼠标 17 0}]
    Get lock key success, processId:1, tokenId:8
    Begin to process, processId:1, tokenId:8, print orderList:[{8 8 鼠标 8 0} {18 18 鼠标 18 0}]
    Get lock key success, processId:2, tokenId:9
    Begin to process, processId:2, tokenId:9, print orderList:[{9 9 鼠标 9 0} {19 19 鼠标 19 0}]
    Get lock key success, processId:0, tokenId:0
    Begin to process, processId:0, tokenId:0, print orderList:[{10 10 鼠标 10 0} {20 20 鼠标 20 0}]
    Get lock key success, processId:1, tokenId:1
    Begin to process, processId:1, tokenId:1, print orderList:[]
    Get lock key success, processId:2, tokenId:2
    Begin to process, processId:2, tokenId:2, print orderList:[]
    Get lock key success, processId:0, tokenId:3
    Get lock key success, processId:1, tokenId:4
    Begin to process, processId:1, tokenId:4, print orderList:[]
    Get lock key success, processId:2, tokenId:5
    Begin to process, processId:0, tokenId:3, print orderList:[]
    Begin to process, processId:2, tokenId:5, print orderList:[]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    我们简单分析一下, 我们每次从 DB 获取数据, 设置的 PageSize=2, 所以每个线程每次从 DB 会获取 2 条数据, 比如 tokenId=7 的线程, 会从 DB 拿到 [{7 7 鼠标 7 0} {17 17 鼠标 17 0}] 这两条数据, 这两条数据的 order_id 分别为 7 和 17, 因为我们的是把数据分成了 10 分, 所以是通过 10 取的模, 取模值和 tokenId 相等。通过上面的输出, 我们可以很清晰看到, 线程 0 拿到 0、1、4、7 这 4 个令牌, 线程 1 拿到 2、5、8 这 3 个令牌, 线程 2 拿到 3、6、9 这 3 个令牌, 3 个线程拿到的令牌互不冲突, 最后从 DB 查询的数据也不会冲突。我们把 PageSize 设置为 1, 再看看执行效果:

    Get lock key success, processId:0, tokenId:9
    Get lock key success, processId:1, tokenId:0
    Get lock key success, processId:2, tokenId:1
    Begin to process, processId:0, tokenId:9, print orderList:[{9 9 鼠标 9 0}]
    Begin to process, processId:2, tokenId:1, print orderList:[{1 1 鼠标 1 0}]
    Begin to process, processId:1, tokenId:0, print orderList:[{10 10 鼠标 10 0}]
    Get lock key success, processId:0, tokenId:2
    Begin to process, processId:0, tokenId:2, print orderList:[{2 2 鼠标 2 0}]
    Get lock key success, processId:1, tokenId:3
    Begin to process, processId:1, tokenId:3, print orderList:[{3 3 鼠标 3 0}]
    Get lock key success, processId:2, tokenId:4
    Begin to process, processId:2, tokenId:4, print orderList:[{4 4 鼠标 4 0}]
    Get lock key success, processId:0, tokenId:5
    Begin to process, processId:0, tokenId:5, print orderList:[{5 5 鼠标 5 0}]
    Get lock key success, processId:1, tokenId:6
    Begin to process, processId:1, tokenId:6, print orderList:[{6 6 鼠标 6 0}]
    Get lock key success, processId:2, tokenId:7
    Begin to process, processId:2, tokenId:7, print orderList:[{7 7 鼠标 7 0}]
    Get lock key success, processId:0, tokenId:8
    Get lock key success, processId:1, tokenId:9
    Begin to process, processId:0, tokenId:8, print orderList:[{8 8 鼠标 8 0}]
    Begin to process, processId:1, tokenId:9, print orderList:[{19 19 鼠标 19 0}]
    Get lock key success, processId:2, tokenId:0
    Begin to process, processId:2, tokenId:0, print orderList:[{20 20 鼠标 20 0}]
    Get lock key success, processId:0, tokenId:1
    Get lock key success, processId:1, tokenId:2
    Begin to process, processId:0, tokenId:1, print orderList:[{11 11 鼠标 11 0}]
    Begin to process, processId:1, tokenId:2, print orderList:[{12 12 鼠标 12 0}]
    Get lock key success, processId:2, tokenId:3
    Begin to process, processId:2, tokenId:3, print orderList:[{13 13 鼠标 13 0}]
    Get lock key success, processId:0, tokenId:4
    Get lock key success, processId:1, tokenId:5
    Begin to process, processId:0, tokenId:4, print orderList:[{14 14 鼠标 14 0}]
    Begin to process, processId:1, tokenId:5, print orderList:[{15 15 鼠标 15 0}]
    Get lock key success, processId:2, tokenId:6
    Begin to process, processId:2, tokenId:6, print orderList:[{16 16 鼠标 16 0}]
    Get lock key success, processId:0, tokenId:7
    Get lock key success, processId:1, tokenId:8
    Begin to process, processId:0, tokenId:7, print orderList:[{17 17 鼠标 17 0}]
    Begin to process, processId:1, tokenId:8, print orderList:[{18 18 鼠标 18 0}]
    Get lock key success, processId:2, tokenId:9
    Begin to process, processId:2, tokenId:9, print orderList:[]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    看到这里, 是不是很有意思, 你也可以动手实现一下~~

    1.7. 局限性

    这个多机分片, 并不是所有的异步任务都可以使用这种方式, 只有特点的场景才可以:

    • 一般是对 DB 的数据进行操作, 因为 DB 可以很好兼容这种分片处理方式, 上面的示例就是很好的说明;
    • 每处理完一条 DB 数据, 要求 DB 能记录数据的变更状态, 对于不记录数据处理完成状态的方式, 比如需要对库表中的所有用户发送 Push, 但是是否发送完成, DB 不进行记录, 只是从前往后遍历的方式处理, 该分片目前不能很好支持。(如果非要支持也可以, 只要你能设计一套方案, 将数据切成对应的分片, 保证所有机器执行的数据不重不漏, 也是可以的);
    • 目前的方案只支持永动型任务, 也就是任务需要一直在内存执行, 不能暂停的场景, 当然你也可以设计成可以支持暂停的方式, 比如记录每个令牌对应的数据是否全部执行完, 如果执行完了, 就先暂停一段时间, 然后再启动。

    1.8. 存在的坑

    • 这里面有个坑, 就是你获取分布式锁时, 其实给这个锁设置了超时时间, 如果超时时间过长, 机器挂了, 那么这个分片将会很长时间不会执行, 需要等到锁自动超时; 如果锁的超时时间设置过短, 会导致这个分片的数据没有执行完, 锁被其它线程获取, 会导致同一个分片有 2 个线程执行, 有悖我们的设计。(有没有完美的解决方案呢, 其实是有的, 可以加我微信, 一起讨论这个问题哈~~)

    1.9. 后记

    从限流常用方式, 再到 Redis 分布式锁, 最后是多机执行异步任务, 终于把这块想写的都写完了, 其实对外输出的过程, 也是自己成长的过程。最近在看消息队列和设计模式, 消息队列的理论部分已经写完了, 前后整理了一个月, 感觉头都大了, 所以纯理论的内容我会尽量少些, 多写一些实战方面的内容, 避免自己眼高手低。设计模式的部分, 我打算以实际项目为主, 然后去讲解常用设计模式的实现姿势。

  • 相关阅读:
    JDK、JRE实用安装教程
    类中静态代码块、实例代码块、构造方法执行次序
    History、Location
    论文阅读:基于隐马尔可夫模型的蛋白质多序列比对方法研究
    python代码是如何执行的?
    Vue.js快速入门:构建现代Web应用
    解决@Data与@Builder冲突的N种策略
    【uniapp关联unicloud,阿里云云服务空间】unicloud本地调试服务启动失败:当前项目未关联unicloud服务空间,请调整后重新运行,已解决
    【Hack The Box】linux练习-- Magic
    【Web漏洞探索】命令注入漏洞
  • 原文地址:https://blog.csdn.net/wan212000/article/details/126177078