• 如何做到百万数据半小时跑批结束


    什么是跑批

    跑批就是应用程序定时对数据的批量处理。

    跑批有以下特性:

    • 大数据量:批量任务一般伴随着大量的数据处理

    • 自动化:要求制定时间或频率自动运行

    • 性能:要求在指定时间内完成批处理任务

    • 健壮性:针对于异常数据,不可导致程序崩溃

    • 可靠性:针对于异常数据,我们后续可跟踪

    数据准备

    CREATE TABLE `test` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `date_time` datetime DEFAULT NULL COMMENT '时间',
      `str1` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3099998 DEFAULT CHARSET=utf8mb4
    
    -- 添加数据存储过程
    delimiter $$
    create procedure insert_test()
    begin
    declare n int default 1;
    while n< 3000000
    do 
    insert into test(date_time,str1) values(concat( CONCAT(FLOOR(2023 + (RAND() * 1)),'-',LPAD(FLOOR(10 + (RAND() * 2)),2,0),'-',LPAD(FLOOR(1 + (RAND() * 25)),2,0))),n);
    set n = n+1;
    end while;
    end
    
    
    call insert_test();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    跑批需要考虑哪些问题

    深度分页

    MySQL limit 深分页 会变慢。

    -- 0.016s
    select id,str1 from test where date_time> '2020-09-19' limit 0,10;
    -- 17.147s
    select id,str1 from test where date_time> '2020-09-19' limit 2000000,10;
    
    • 1
    • 2
    • 3
    • 4

    limit 的偏移量越大,执行时间越长。limit a, b会查询前a+b条数据,然后丢弃前a条数据,select * 会查询所有的列,会有回表操作。

    针对上面的问题,我们需要的操作时尽量减少无效的回表策略,limit a,b,直接获取a+1到a+b条数据的id,再根据这些id查询数据这样就减少了回表的操作。

    可以使用子查询优化SQL,先查出id,在分页。

    -- 0.656s
    select id,str1 FROM test where id >= (select a.id from test a where a.date_time >= '2020-09-19' limit 2000000,1) LIMIT 10;
    
    
    • 1
    • 2
    • 3

    sql优化这里不做过多赘述。

    批量处理

    跑批可能会涉及到数据准备的过程,边循环跑批数据边去查找所需的数据,一方面for嵌套的循环处理,时间复杂度通常是随着你的 for 个数上升的,例如:

      // 调用数据库查询需跑批数据
      List<BizDo> bizDoList = this.list(businessDate);
      // for 循环处理数据
      for(BizDo ba : bizDoList) {
        // 业务处理逻辑.. 省略
        
        // 查询账户数据
        List<BizAccountDo> bizAccountDoList = this.listGetBizAccount(ba.getbizUserId());
        for (BizAccountDo bic : bizAccountDoList){
          // 账户处理逻辑.. 省略
        }
        ... // 后续还会嵌套 for 循环
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这种情况可以采用批量处理,例如可以userId放在集合中,再去批量查询,这样可以提升效率。

    List bizUserIdList  = bizApplyDoList.parallelStream().map(BizApplyDo::getbizUserId()).collect(Collectors.toList());
    // 批量进行账户查询
    List bizAccountDoList = this.listGetBizAccount(bizUserIdList);
    
    • 1
    • 2
    • 3

    同样对于插入也可以采用批量处理。

    分片处理

    在生产环境中,都是采用集群部署,如果一个跑批任务只跑在一个机器上,那效率肯定很低,我们可以利用 xxl-job**「分片广播」** 和 「动态分片」 功能。

    执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

    “分片广播” 以执行器为维度进行分片,支持动态扩容执行器集群从而动态增加分片数量,协同进行业务处理;在进行大数据量业务操作时可显著提升任务处理能力和速度。

    “分片广播” 和普通任务开发流程一致,不同之处在于可以获取分片参数,获取分片参数进行分片业务处理。

    • Java语言任务获取分片参数方式:BEAN、GLUE模式(Java)

      // 可参考Sample示例执行器中的示例任务"ShardingJobHandler"了解试用 int shardIndex = XxlJobHelper.getShardIndex();int shardTotal = XxlJobHelper.getShardTotal();
      
      • 1

    分片参数属性说明:

    index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;total:总分片数,执行器集群的总机器数量;
    
    • 1

    该特性适用场景如:

    • 1、分片任务场景:10个执行器的集群来处理10w条数据,每台机器只需要处理1w条数据,耗时降低10倍;
    • 2、广播任务场景:广播执行器机器运行shell脚本、广播集群节点进行缓存更新等

    /**
     * 分片广播进行100W用户重置
     * @param param
     */
    @XxlJob(value = "shardingJob")
    public void shardingJob(String param){
    
        // 获取当前节点的index 与 总节点数
        int shardIndex = XxlJobHelper.getShardIndex();
        int shardTotal = XxlJobHelper.getShardTotal();
        log.info("当前节点的index = {}, 总结点数 = {}", shardIndex, shardTotal);
    
        List<Integer> userIds = this.getUserIds();
        //这里只是给出参考,具体要结合实际
        userIds.stream().forEach(id ->{
            if(id % shardTotal == shardIndex){
                
                //todo 业务
            }
        });
    }
    
    /**
     * 模拟用户id
     * @return
     */
    private List<Integer> getUserIds() {
        List<Integer> userIds = new ArrayList<>();
        for(int i = 0; i < 100 ; i++){
            userIds.add(i + 1);
        }
        return userIds;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    线程安全

    在进行跑批时,一般会采用多线程的方式进行处理,因此要考虑线程安全的问题,比如使用线程安全的容器,使用JUC包下的工具类。

    事务

    事务粒度要尽可能的小,选择合适的事务范围,要根据业务选择合适的事务传播属性。

    1、这些操作自身是无法回滚的,这就会导致数据的不一致。可能RPC调用成功了,但是本地事务回滚了,可是PRC调用无法回滚了。

    2、在事务中有远程调用,就会拉长整个事务。那么久会导致本事务的数据库连接一直被占用,那么如果类似操作过多,就会导致数据库连接池耗尽或者单个链接超时

    异常处理

    要保证程序的健壮性,做好异常处理,不能因为一处报错,导致整个任务执行失败,对于异常的数据可以跳过,不影响其他数据的正常执行。

  • 相关阅读:
    LeetCode 数据结构与算法:最大子数组和
    flink-sql所有表连接器
    【three.js】坐标辅助器和轨道控制器
    web自动化测试进阶篇03 ———自动化并发测试应用
    【正点原子STM32连载】 第五十七章 DSP FFT实验(Julia分形)实验 摘自【正点原子】APM32F407最小系统板使用指南
    IPWorks Zip Delphi 流式压缩组件
    Linux安装所需软件
    华为数通方向HCIP-DataCom H12-831题库(单选题:81-100)
    Chiplet技术与汽车芯片(一)
    ThreadPoolExecutor源码解析
  • 原文地址:https://blog.csdn.net/qq_30823993/article/details/134437099