• 记一次线程堵塞(挂起)导致消息队列积压


    1 背景

    A服务作为生产者,每天发送上千万的mq消息,每一个消息包含500个用户ids数据。B服务作为消费者,接受MQ消息并通过http调用第三方请求进行业务处理,消费组启用了rabbitmq的多线程消费组,一个实例并发40个mq消费者线程,每个线程一次获取10个消息进行消费。

    Mq消费者配置如下:

    1. # mq配置
    2. rabbitmq:
    3. connection-timeout: 15000
    4. cache:
    5. channel:
    6. size: 200
    7. # 消息发送到rabbitmq broker cluster需要回调
    8. publisher-confirms: true
    9. # 交换机将消息投递至队列失败时需要回调
    10. publisher-returns: true
    11. listener:
    12. # 手动确认消息已被消费
    13. simple:
    14. acknowledge-mode: manual
    15. # consumer的并发数
    16. concurrency: 40
    17. max-concurrency: 50
    18. # 每个消息者每次取10
    19. prefetch: 10

    Mq挤压消息如下

    2 排查

    2.1 复制rabbitmq挤压消息数据进行模拟复现

    找出rabbitmq挤压的消息,在本地模拟消费,找出没有进行消息确认的原因,通过rabbitmq控制台的Get messages功能

    复制payload的消息进行base64转码,转出来的消息是乱码不完整的,怀疑
    是rabbitmq还结合了其他加密处理,放弃这种排查思路

    2.2 检查报错日志

    rabbitmq的unack消息挤压,那就是消费者没有进行ack确认,怀疑消费者代码有异常导致没能执行到ack的代码。
    查询服务器日志,没发现有报错的日志,梳理业务代码,消费者使用了spring aop around机制进行消息确认,所以不管代码有没有报错,按理说都会手动进行mq消息ack确认

    2.3 检查服务是否宕机

    消费组实例数量符合服务器大小配置,因此服务器应用没有宕机

    2.4 检查java线程

    使用IBM的TMDA工具进行分析线程堆栈,工具下载地址
    TMDA工具下载地址

    TMDA工具简介

    TMDA分析线程堆栈结果如下

    通过分析图,看到大量park线程,确实是符合现状,应用的线程挂起了

    3 分析和解决

    通过stack深度高到底排序,业务代码存在线程等待情况,具体代码CountDownLatch.await

    3.1 结合业务代码分析

    通过上图stack提示,找到关联的业务代码

    伪代码如下:

    1. // new一个CompletableFuture
    2. public CompletableFuture httpCall(String tokenData){
    3. CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
    4. long time = 3000L;
    5. try {
    6. Thread.sleep(time);
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. return Integer.parseInt(tokenData);
    11. });
    12. return completableFuture;
    13. }
    14. httpCall(tokenData).whenCompleteAsync((returnValue, ex)->{
    15. // do business
    16. // ex.getMessage()
    17. // 其中ex对象为空,使用ex.getMessage()报了空指针,导致没能执行如下的countDown
    18. countDownLatch.countDown();
    19. })

    消费者服务通过http调用第三方服务,为了提高并发,使用了多线程,每一组(数十个为一组)http请求批量调用完成后再把请求响应结果异步存入数据库,
    主线程使用了countDownLatch.await进行等待,
    其中whenCompleteAsync方法存在空指针问题,导致没能执行如下的countDown方法。

    这里有人会问, 上面错误日志检查步骤,不是说日志没有空指针异常吗?
    对,子线程报了空指针,因为CompletableFuture执行每次都是new 一个新的CompletableFuture对象,并把结果作为下一个CompletableFuture执行的入参,
    通过伪代码可以发现,执行whenCompleteAsync后,没有新的CompletableFuture方法执行,所以异常没有抛出来,使得排查变得困难

    3.2 解决

    因为存在whenCompleteAsync报错的情况,添加多一个新的异常捕获处理方法,捕获异常也进行countDown的操作。

    代码如下:

    1. httpCall(tokenData).whenCompleteAsync((returnValue, ex)->{
    2. // do business
    3. // ex.getMessage()
    4. // 其中ex对象为空,使用ex.getMessage()报了空指针,导致没能执行如下的countDown
    5. countDownLatch.countDown();
    6. }).exceptionally(e ->{
    7. log.info("exceptionally捕获到异常,tokenData={}, e={}", tokenData, e.getMessage());
    8. countDownLatch.countDown();
    9. return null;
    10. });

    4 结论

    • 熟练CompletableFuture的使用,要看源码的实现(实现原理cas + 多个future采用入stack,每次把前一个future的结果作为参数传入下一个future去执行)

    • 使用多线程需要考虑异常、超时等情况

    • 熟练使用jvm stack分析工具

    5 文章参考

    CompletableFuture流程图

    CompletableFuture参考文章如下

    CompletableFuture 原理浅析

  • 相关阅读:
    RocketMQ入门指南:从零开始学习分布式消息队列技术
    [office] Excel中函数进行计算两个日期参数差值的方法 #职场发展#学习方法#媒体
    裸辞半年,靠着这套Java面试宝典,拿下了腾讯T3
    【毕业季·进击的技术er】自己的选择,跪着也要走
    使用OpenVINO实现飞桨版PGNet推理程序
    Java线程安全
    Mutisim仿真软件使用
    day19每日一考
    【Amazon】云上探索实验室—了解 AI 编程助手 Amazon Codewhisperer
    docker学习(十三)docker安装dejavu
  • 原文地址:https://blog.csdn.net/ffyyhh995511/article/details/132810006