application-id设置为从来没用过的,也无法消费最新的消息,但是从另一个角度去思考,如果把就消息都消费完了,那么该
application-id的消费者不就可以消费到最新的消息了吗?是的,该思路是目前比较有效的手段,做法是需要在业务逻辑上兼容处理过去的消息,待kafka streams应用消费完旧消息,后续监听到的消息就为新消息。
<dependencies>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka_2.13artifactId>
<version>2.8.1version>
dependency>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka-streamsartifactId>
<version>2.8.1version>
dependency>
dependencies>
虽然消费了旧消息就可以消费新消息,但是在有些场景下,默认的消费配置并不能跟得上生产的速度,或者需要消费很久才能把堆积的消息消费完,因此在一些配置上需要做一些变动,不适用默认配置项。
经实测:更改一些默认配置项可以改变消费速度【接受消息的容量】,从单线程1s3000条消息消费提升至1s30000条左右的消息,由于当时该改动已经符合我的需求,便没有将配置值设置的特别大。
这里进行了简单的配置,其中比较重要的是后续的两个参数
import org.apache.kafka.common.serialization.Serdes;
import