他的项目上 kafka 使用的是云服务,版本号是 2.12,现有一个生产者和一个消费者,生产十分钟的消息,需要二十分钟才能消费完。
消费者每次拉取1000条消息进行处理,使用的是 newCachedThreadPool 线程池。
–zookeeper: 指定了kafka所连接的zookeeper服务地址
–topic: 指定了所要创建的主题的名称
–partitions: 指定了分区个数
–replication-factor: 指定了副本因子
–create: 创建主题的动作指令
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic test
./kafka-topics.sh 在 kafka 的 bin 文件夹下,下面的命令一样。
./kafka-topics.sh --zookeeper localhost:2181 --list
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group 消费组名
max.poll.records:一次 poll 返回的最大记录数默认是500条。
max.poll.interval.ms:两次 poll 方法最大时间间隔这个参数,默认是300s。
这两个值设置要合理,如果一次拉取的消息处理完的时间,超过了 poll 方法最大时间间隔,会导致偏移量报错,导致重复消费。
这两个值在 springboot 上的配置信息为
spring:
kafka:
consumer:
max-poll-records: 500
properties:
max:
poll:
interval:
ms: 300000
300000 单位是毫秒。
这两个值设置合理的话,是能解决消费慢的问题。
除了设置上面的两个值,还建议他增加消费者,在同一个消费组中,每个消费者的 ID 设置不同即可,相关参数为
spring:
kafka:
consumer:
client-id: client01
group-id: groupName
然后朋友优化了他的代码,取消 newCachedThreadPool 线程池,改为了缓存模式,每次缓存1000条消息一起处理。
听朋友说他已经花了一个月时间了还没有解决,我怀疑他在摸鱼。
我这边就花了两个半上午,干活的空档,查了一些资料。
看来我还是可以的。
不过想起来他那将近两倍于我的工资,嗯嗯嗯嗯嗯,