• Kafka消费者的线程安全问题和多线程实现


    一、消费者的线程安全问题

    与线程安全的 KafkaProducer 不同,KafkaConsumer 是非线程安全的

    KafkaConsumer的每个公用方法在执行操作前都会调用 acquire() 方法,该方法用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常

    acquire() 方法的实现如下:

    private void acquire() {
            long threadId = Thread.currentThread().getId();
            if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
                throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
            refcount.incrementAndGet();
    }
    

    可以看到实质就是通过CAS的方式来获取当前 KafkaConsumer 的使用权,如果获取不到则抛异常

    当执行的线程执行完毕后,就会调用 release() 方法来释放 KafkaConsumer 的使用权:

        private void release() {
            if (refcount.decrementAndGet() == 0)
                currentThread.set(NO_CURRENT_THREAD);
        }
    

    KafkaConsumer 非线程安全不意味着在消费消息的时候只能以单线程的方式执行,如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费造成延迟。因此,可以采用多线程的方式来提高消费者的整体消费能力。

    二、线程封闭

    实现消费者多线程最常见的方式:线程封闭——即为每个线程实例化一个 KafkaConsumer对象

    使用该方式实现,一般所有的消费线程都属于同一个消费者组,一个消费线程可以消费一个或多个分区中的消息,因此并发数也受限于分区的实际个数。(如果消费线程的个数大于分区数,就有部分消费线程一直处于空闲状态)

    这种实现方式的好处是每个线程可以按顺序消费各个分区中的消息;缺点是每个消费线程都要维护一个独立的TCP连接,造成额外的系统开销

    三、多线程处理消息

    消费者吞吐量的瓶颈实际是在处理消息的效率上, 因此为了解决上面的问题,可以使用这样的线程模型:消费者线程专门用来接收消息,接收到消息后采用多线程的方式(线程池)来处理消息。(实际就是 Netty 中的 Reactor 模型)

    这种方式解决了上述的系统开销问题,缺点是对于消息的顺序处理比较困难,需要做额外的开发来保障

    此外,如果需要手动提交,该种方式的实现也更加困难,有可能会有数据丢失的风险

  • 相关阅读:
    【Leetcode】1035. Uncrossed Lines
    分享66个Python管理系统源代码总有一个是你想要的
    kubeadm部署k8s教程(2)---部署
    微服务项目雪崩的解决思路
    pycharm运行不出结果
    MySQL面试八股文(2022最新整理)
    Matlab论文插图绘制模板第122期—函数折线图(fplot)
    Vue(3.3.4)+three.js(0.161.0)实现3D可视化地图
    2022HBCPC 优美的字符串
    webpack原理篇(五十九):loader 的链式调用与执行顺序
  • 原文地址:https://blog.csdn.net/wanger61/article/details/127118085