• 配置Kafka消息保留时间


    生产者发送消息给kafka,消息被追加值日志文件并保留一定周期(基于配置)。本文探讨对Kafk主题配置消息保留时间。

    基于时间保留

    通过保留期属性,消息就有了TTL(time to live 生存时间)。到期后,消息被标记为删除,从而释放磁盘空间。对于kafka主题中所有消息具有相同的生存时间,但可以在创建主题之前设置属性,或对已存在的主题在运行时修改属性。

    接下来我们将学习如何通过代理配置属性进行调整,以设置新主题的保留周期,并通过主题级配置在运行时控制它。

    服务器级配置

    Apache Kafka支持服务器级配置保留策略,我们可以通过配置以下三个基于时间的配置属性中的一个来进行优化:

    • log.retention.hours
    • log.retention.minutes
    • log.retention.ms

    注意:Kafka用更高精度值覆盖低精度值,所以log.retention.ms具有最高的优先级。

    查看默认值

    首先让我们检查保留时间的缺省值,在kafka目录下执行下面命令:

    $ grep -i 'log.retention.[hms].*\=' config/server.properties
    log.retention.hours=168
    

    显示默认周期为7天。如果要设置消息保留周期为10分钟,可以通过config/server.properties配置文件的log.retention.minutes 属性进行配置。

    log.retention.minutes=10
    

    配置新主题

    kafka提供了几个Shell脚本用于执行管理任务,利用它们创建工具脚本functions.sh。下面增加两个函数,分别为创建主题、展示配置:

    function create_topic {
        topic_name="$1"
        bin/kafka-topics.sh --create --topic ${topic_name} --if-not-exists \
          --partitions 1 --replication-factor 1 \
          --zookeeper localhost:2181
    }
    
    function describe_topic_config {
        topic_name="$1"
        ./bin/kafka-configs.sh --describe --all \
          --bootstrap-server=0.0.0.0:9092 \
          --topic ${topic_name}
    }
    

    现在创建两个独立脚本,create-topic.sh、get-topic-retention-time.sh:

    bash-5.1# cat create-topic.sh
    #!/bin/bash
    ../functions.sh
    topic_name="$1"
    create_topic "${topic_name}"
    exit $?
    
    bash-5.1# cat get-topic-retention-time.sh
    #!/bin/bash
    ../functions.sh
    topic_name="$1"
    describe_topic_config "${topic_name}" | awk 'BEGIN{IFS="=";IRS=" "} /^[ ]*retention.ms/{print $1}'
    exit $?
    

    简单解释下脚本的特殊符号:

    $?-表示上一个命令执行状态.

    $0-当前脚本的文件名称.

    $#-在脚本中使用参数,如$1,$2分别表示第一个参数和第二参数.

    $$-当前脚本的进程号,就是当前执行脚本的进程ID.

    需要说明的是:describe_topic_config列出给定主题的所有属性配置,因此必须使用awk进行过滤,找出retention.ms property属性值。

    现在可以启动kafka环境并验证retention.ms property属性配置:

    bash-5.1# ./create-topic.sh test-topic
    Created topic test-topic.
    bash-5.1# ./get-topic-retention-time.sh test-topic
    retention.ms=600000
    

    通过脚本创建主题,列出描述,可以看到retention.ms 是 600000 (10分钟)。这是默认值,从之前设置server.properties文件中读出来的。

    主题级配置

    一旦kafka代理已经启动,log.retention.{hours|minutes|ms} 服务器级属性为只读属性。我们获得 retention.ms,但可以通过主题级参数进行调整。我们继续在functions.sh 脚本中增加方法配置主题属性:

    function alter_topic_config {
        topic_name="$1"
        config_name="$2"
        config_value="$3"
        ./bin/kafka-configs.sh --alter \
          --add-config ${config_name}=${config_value} \
          --bootstrap-server=0.0.0.0:9092 \
          --topic ${topic_name}
    }
    

    然后在alter-topic-config.sh 脚本使用它:

    #!/bin/sh
    ../functions.sh
    
    alter_topic_retention_config $1 $2 $3
    exit $?
    

    最后设置test-topic主题保存周期为5分钟,然后查看验证:

    bash-5.1# ./alter-topic-config.sh test-topic retention.ms 300000
    Completed updating config for topic test-topic.
    
    bash-5.1# ./get-topic-retention-time.sh test-topic
    retention.ms=300000
    

    验证

    我们已经配置kafka主题的消息保留周期。现在来验证在超时后消息确实过期。

    生产-消费

    在 functions.sh脚本增加两个produce_message 和 consume_message 函数,其内部分别使用kafka-console-producer.sh 和 kafka-console-consumer.sh,分别用于产生/消费消息:

    function produce_message {
        topic_name="$1"
        message="$2"
        echo "${message}" | ./bin/kafka-console-producer.sh \
        --bootstrap-server=0.0.0.0:9092 \
        --topic ${topic_name}
    }
    
    function consume_message {
        topic_name="$1"
        timeout="$2"
        ./bin/kafka-console-consumer.sh \
        --bootstrap-server=0.0.0.0:9092 \
        --from-beginning \
        --topic ${topic_name} \
        --max-messages 1 \
        --timeout-ms $timeout
    }
    

    我们看到消费总是从头开始读消息,因为我们需要消费者读主题中任何有效的消息。

    下面创建独立的生产者函数:

    bash-5.1# cat producer.sh
    #!/bin/sh
    ../functions.sh
    topic_name="$1"
    message="$2"
    
    produce_message ${topic_name} ${message}
    exit $?
    

    最后创建消费者函数:

    bash-5.1# cat consumer.sh
    #!/bin/sh
    ../functions.sh
    topic_name="$1"
    timeout="$2"
    
    consume_message ${topic_name} $timeout
    exit $?
    

    消息过期

    我们已经准备了工具函数,开始产生单个消息,然后消费两次:

    bash-5.1# ./producer.sh "test-topic-2" "message1"
    bash-5.1# ./consumer.sh test-topic-2 10000
    message1
    Processed a total of 1 messages
    bash-5.1# ./consumer.sh test-topic-2 10000
    message1
    Processed a total of 1 messages
    

    我们看到消费者重复消费所有有效消息。现在引入延迟机制延迟5分钟,然后再次消费消息:

    bash-5.1# sleep 300 && ./consumer.sh test-topic 10000
    [2021-02-06 21:55:00,896] ERROR Error processing message, terminating consumer process:  (kafka.tools.ConsoleConsumer$)
    org.apache.kafka.common.errors.TimeoutException
    Processed a total of 0 messages
    

    与我们期望一致,消费者没有发现任何消息,因为消息已经超过了它的保存周期。

    限制

    在Kafka Broker内部,维护另一个名为log.retention.check.interval.ms的属性,用于决定检查消息是否过期的频率。因此,为了保持保留策略的有效性,必须确保log.retention.check.interval.ms的值低于retention.ms 的属性值。对于任何给定的主题都一样。

    总结

    本文探索了Apache Kafka消息基于时间的保留策略。通过创建简单的shell脚本来简化管理过程,接着我们创建了独立的消费者和生产者,以验证在保留期之后消息的过期场景。

  • 相关阅读:
    Vue和Element UI 路由跳转,侧边导航的路由跳转,侧边栏拖拽
    基于Flask的岗位就业可视化系统(四)
    三维重建(10)之立体匹配算法详解:BM、SGBM
    TypeScript简记(一)
    HTTP四种请求方式,状态码,请求和响应报文
    世界粮食日:宏工科技有对策,赋能食品生产高效可持续发展
    猿创征文|Redis删除策略
    【SQL server速成之路】——身份验证及建立和管理用户账户
    C++中静态成员和非静态成员
    神经网络的英文缩写是啥,神经网络的英文是什么
  • 原文地址:https://blog.csdn.net/neweastsun/article/details/127109802