• 高可用环境kafka消息未按顺序消费问题


    目录

    1、背景

    2、问题排查

    3、问题解决


    1、背景

    质检任务是异步执行,正常情况下任务状态扭转是    等待中》运行中》成功(失败)。在质量平台生成任务实例,此时状态是等待中,生成实例之后把具体的任务sql给到大数据平台执行,大数据平台会发运行中、成功、失败状态的kafka消息,正常情况下状态是顺序下发。

    升级部署某个项目,生产环境突然出现很多任务,一直是运行中状态。

    2、问题排查

    (1)怀疑大数据平台,任务没有正常执行完成,所以任务一直是运行中

    1、在yarn平台以及数据库中,都没有发现正在运行中的质量sql任务

    2、排查质量平台服务器日志(kafka消息打印接收消息日志很必要,出问题利于排查),发现这个某个sqlId,正常返回了kafka消息,包括运行中、成功、失败等消息。

    通过上面的排查,大数据平台没问题,正常执行了任务,正常按顺序给质量平台发了kafka消息

    (2)排查质量平台处理kafka消息逻辑

    kafka按顺序返回了状态,质量平台没按顺序消费,看质量平台代码如下。

    @Slf4j
    @Component("dsjAdapterListen")
    public class DsjAdapterListen {

        @Autowired
        ZyslCljgService zyslCljgService;

        /**
         * kafka消费消息,需要配置kafka
         */
        @KafkaListener(groupId = "${spring.kafka.consumer.group-id:dquality}", topics =
                {"status_dquality_" + CommonConstant.ZYJH_CHANNEL})
        public void topicConsumer(String message) {
            StatusInfo statusInfo = JSON.parseObject(message, StatusInfo.class);
            log.warn("==============>KafkaListener:start作业实例结果处理,sqlId:{},zyslId:{},状态:{}", statusInfo.getSqlId(),
                    statusInfo.getTaskId(), statusInfo.getStatus());
            zyslCljgService.procZyslJobData(statusInfo);
        }
    }

    @Slf4j
    @Component("zyslCljgServiceImpl")
    public class ZyslCljgServiceImpl implements ZyslCljgService {

        @Override
        @Async("zyslClThreadPool")
        public void procZyslJobData(StatusInfo statusInfo) {
            try {
                String zyslId = statusInfo.getTaskId();
                String sqlId = statusInfo.getSqlId();
              -dosomething();
                //运行中任务把检核状态更新成运行中
                if (StatusEnum.RUNNING.getCode().equals(statusInfo.getStatus().getCode())) {
                    if (ZyslYxztEnum.WAITING_SUBMIT.getBm().equals(oldGxZysl.getYxzt())
                   || ZyslYxztEnum.WAITING.getBm().equals(oldGxZysl.getYxzt())) {
                      //运行中
                    oldGxZysl.setYxzt(ZyslYxztEnum.RUNNING.getBm());


                        gxZyslMapper.updateById(oldGxZysl);
                    }
                    //运行中状态
                   oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.RUNNING.getBm());
                    gxZyrzMapper.updateById(oldGxZyrzUpdate);
                    return;
                }
                if (StatusEnum.FAILED.getCode().equals(statusInfo.getStatus().getCode())) {
                    log.error("大数据job错误,执行任务失败:{},zyslid:{},参数:{}", statusInfo.getMessage(), zyslId,
                            JSON.toJSONString(statusInfo));
                    //处理失败
                    procFail(statusInfo, oldGxZyrzUpdate);
                }
                if (StatusEnum.FINISH.getCode().equals(statusInfo.getStatus().getCode())) {
                    //正常sql和异常sql都执行完成
                    oldGxZyrzUpdate.setYxzt(ZyslYxztEnum.SUCCESS.getBm());
                }
                // 先更新状态,后处理事件
                 dosomething2();
            } catch (Exception e) {
                log.error("大数据作业实例结果处理报错:{}", e.getMessage());
            }
        }
        
         public void  dosomething2(GxZyrz gxZyrz, SsZyxx zyxx) {
            if (Constants.CODE_SUCCESS.equals(gxZyrz.getYxzt())) {
                  //成功状态查询es、发邮件等
                 dosomething3();
            } else if (Constants.CODE_FAILED.equals(gxZyrz.getYxzt())) {
                gxZyrz.setYcs(0L);
                gxZyrz.setZyl(0L);
                gxZyrz.setSfgj("N");
            }
            gxZyrzMapper.updateById(gxZyrz);
        }

    }

    (1)kafka这个topic只有一个分区,数据质量服务器kafka消息是设置了消费者组,即使是高可用,现场部署多台服务器,也会只有一台服务器会消费这个topic的消息数据,所有排除是因为部署了多台服务器的原因。

    (2)kafka消息被多线程异步处理了

    1、如果任务sql执行成功,kafka返回运行中、执行成功消息

    线程1 - 处理待运行任务

    线程2- 处理成功状态

    通过代码分析,线程1更新任务之前会更新另外一张表状态,假如gxZyslMapper.updateById是2秒时间,   线程2更新状态之前会dosomething3()查询es、查询数据表、发邮件,肯定超过5秒,然后更新sql任务状态。

    结论: 如果sql是执行成功,这种情况,应该不会出现线程2先把任务状态更新成成功,然后线程1把状态更新是运行中。

    2、如果任务sql执行失败,kafka返回运行中、执行失败消息

    线程1 - 处理待运行任务

    线程2- 处理失败状态

    通过代码分析,线程1更新任务之前会更新另外一张表状态,假如update是2秒时间,   线程2直接更新sql任务状态。

    结论: 如果sql是失败成功,这种情况,如果运行中、运行失败状态消息时间建个在2秒内,应该会出现线程2先把任务状态更新成失败,然后线程1把状态更新是运行中。

    代码分析之后,带着结论去现场验证,发现确实是失败状态任务状态被逆写了。

    按这个结论,按理来讲部署的所有现场都会出现问题,为什么只有这个现场有问题呢?

    大数据那边升级了代码,以前执行失败的任务,运行中和运行失败,他们发消息间隔至少耗时在5秒,改了逻辑之后直接失败的任务,发信息间隔在2秒内。这就验证了这个问题

    3、问题解决

    1、运行中状态,先更新sqlId对应的任务状态,然后更新别的数据表状态;2、更新运行中的状态不直接更新,带着状态更新    update zyrz set yxzt = '2' where id = 'xxx' and yxzt not in('0','1')

  • 相关阅读:
    [附源码]计算机毕业设计springboot动物保护协会网站
    Hugging News #0717: 开源大模型榜单更新、音频 Transformers 课程完成发布!
    Java:关于在 Web 开发中使用 Java 的完整指南
    MATLAB算法实战应用案例精讲-【数模应用】梯度下降(GD)(附R语言、Python和MATLAB代码)
    管理器注册类方法和调用类函数方法
    MATLAB实现TopSis优劣解距离法——分析《世界征服者3》将领排名
    C++ VS2015安装教程,下载和安装(下载地址+图解+详细步骤)
    删除表中的数据
    猿创征文|【Linux】Linux中的gdb调试器的使用
    BIOS < UEFI
  • 原文地址:https://blog.csdn.net/s07aser123/article/details/137957048