• 【kafka】十、kafka消费者offset维护


    消费者offset维护

    offset维护

    由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consumer需要实时记录自己消费到了哪个offset,以便恢复后继续消费。

    消费者是按照消费者组来保存offset的,不是按照消费者单独保存的,如果某个消费者挂掉了,按消费者保存就无法获取上之前消费到的offset

    之后,是按消费者组+Topic+Partition来确定唯一的一个offset

    kafka0.9版本之前,consumer默认将offset保存在Zk中,从0.9版本开始,consumer默认将offset保存在kafka一个内置的topic中,该topic为_consumer_offsets

    offset存在zk

    创建一个新topic

    bin/kafka-topics.sh --zookeeper hll4:2181 --create --topic bigdata --partitions 2 --replication-factor 2
    
    • 1

    启动一个生产者

    bin/kafka-console-producer.sh --broker-list hll1:9092 --topic bigdata
    
    • 1

    启动一个消费者

    bin/kafka-console-consumer.sh --zookeeper hll4:2181 --topic bigdata
    
    • 1

    连接Zk观察

    zkCli.sh
    
    # ls /根目录可以看到kafka注册到zk的信息
    [zk: localhost:2181(CONNECTED) 16] ls /
    [cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config]
    
    #consumers就是启动的消费者组,注意这是消费者组
    [zk: localhost:2181(CONNECTED) 17] ls /consumers
    [console-consumer-7153]
    [zk: localhost:2181(CONNECTED) 18] ls /consumers/console-consumer-7153
    [ids, owners, offsets]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    从zk查询到的offsets中就可以看到消费者组存储的offset信息

    通过刚刚启动生产者生产一条消息,消费者也正常消费成功

    image-20220126113342076

    image-20220126113413148

    进入到zk中,查看刚刚的offsets

    [zk: localhost:2181(CONNECTED) 29] ls /consumers/console-consumer-7153/offsets           
    [bigdata] #主题
    [zk: localhost:2181(CONNECTED) 30] ls /consumers/console-consumer-7153/offsets/bigdata
    [0, 1] #这个0和1是分区
    [zk: localhost:2181(CONNECTED) 31] get /consumers/console-consumer-7153/offsets/bigdata/0
    1 #offset
    cZxid = 0x200000042
    ctime = Tue Jan 25 19:13:16 PST 2022
    mZxid = 0x200000042
    mtime = Tue Jan 25 19:13:16 PST 2022
    pZxid = 0x200000042
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x0
    dataLength = 1
    numChildren = 0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    通过get /consumers/console-consumer-7153/offsets/bigdata/0,就可以看到当前消费的offset

    image-20220126113616982

    查看get /consumers/console-consumer-7153/offsets/bigdata/1,offset是0,因为刚只生产了一条消息然后被消费

    image-20220126114251415

    现在通过生产者,再生产一条消息被消费者消息后,再get /consumers/console-consumer-7153/offsets/bigdata/1,变成1了

    注意:由于zk客户端的延迟,需要重新连接zkCli.sh才会观察到更新的数据

    image-20220126114431880

    offset存kafka本地

    修改配置文件consumer.properties

    exclude.internal.topics=false
    
    • 1

    这时消费者服务需要用bootstrap-server来启动:

    bin/kafka-console-consumer.sh --bootstrap-server hll1:9092 --topic bigdata
    
    • 1

    查看offset

    bin/kafka-console-consumer.sh --topic __consumer_offsets --zookeeper hll4:2181 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
    
    • 1

    image-20220126135330636

    重新执行查看offset命令异常的话,需要修改下consumer.properties修改下group.id配置

    Group + Topic + Partition确定唯一的offset

  • 相关阅读:
    QT提示适配器未调用怎么办?
    pb:类定义查找函数
    Flutter高仿微信-第38篇-单聊-转账
    用于加签验签的加解密算法
    Spring-依赖注入补充
    apollo通过域名访问-Portal挂载到nginx/slb后如何设置相对路径?
    Java中关键字packag和import的使用
    Vue组件样式设置,解决样式冲突问题
    多智能体强化学习的主要流程是什么?训练方式跟单智能体有什么不同?
    0基础跟我学python---进阶篇(3)python中的ORM框架(1)
  • 原文地址:https://blog.csdn.net/sinat_33151213/article/details/128044331