与线程安全的 KafkaProducer 不同,KafkaConsumer 是非线程安全的
KafkaConsumer的每个公用方法在执行操作前都会调用 acquire() 方法,该方法用来检测当前是否只有一个线程在操作,若有其他线程正在操作则会抛出 ConcurrentModifcationException 异常
acquire() 方法的实现如下:
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
refcount.incrementAndGet();
}
可以看到实质就是通过CAS的方式来获取当前 KafkaConsumer 的使用权,如果获取不到则抛异常
当执行的线程执行完毕后,就会调用 release() 方法来释放 KafkaConsumer 的使用权:
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
KafkaConsumer 非线程安全不意味着在消费消息的时候只能以单线程的方式执行,如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费造成延迟。因此,可以采用多线程的方式来提高消费者的整体消费能力。
实现消费者多线程最常见的方式:线程封闭——即为每个线程实例化一个 KafkaConsumer对象
使用该方式实现,一般所有的消费线程都属于同一个消费者组,一个消费线程可以消费一个或多个分区中的消息,因此并发数也受限于分区的实际个数。(如果消费线程的个数大于分区数,就有部分消费线程一直处于空闲状态)
这种实现方式的好处是每个线程可以按顺序消费各个分区中的消息;缺点是每个消费线程都要维护一个独立的TCP连接,造成额外的系统开销
消费者吞吐量的瓶颈实际是在处理消息的效率上, 因此为了解决上面的问题,可以使用这样的线程模型:消费者线程专门用来接收消息,接收到消息后采用多线程的方式(线程池)来处理消息。(实际就是 Netty 中的 Reactor 模型)
这种方式解决了上述的系统开销问题,缺点是对于消息的顺序处理比较困难,需要做额外的开发来保障
此外,如果需要手动提交,该种方式的实现也更加困难,有可能会有数据丢失的风险