• Kubernetes HPA:基于 kafka_consumergroup_lag 指标实现 Consumer Pod 水平弹性伸缩


    在这里插入图片描述

    背景介绍

    在实际生产环境中,当请求激增时,kafka 生产者发送的消息数量会远远大于 kafka 消费者的消费能力,从而导致消息堆积和处理延迟。为了避免此种情况,就要求消费者能够感知到 kafka 消息堆积,并通过动态增加或减少自身的副本数,实现动态自适应消费,这就是本文即将介绍的内容,即基于 kafka_consumergroup_lag 指标实现 Consumer Pod 水平弹性伸缩。

    Kubernetes 通过 HPA 实现 Pod 的水平弹性伸缩,默认支持多种类型,包括 Resource、Pods、Object、External、ContainerResource。有关 HPA 的更多官方介绍请参考官方文档:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/,本文不作冗余说明。

    由于 kafka_consumergroup_lag 指标不是从待扩缩容的消费者 Pod 上采集上来的,没有与 K8s 资源对象建立关联关系,因此这里需要使用 External 类型的 HPA。

    整体架构图如下:
    在这里插入图片描述

    实验步骤

    1. 准备 Kubernetes 集群

    如果已经有 Kubernetes 集群了,可以跳过该步骤

    本文使用 Kind 创建一个测试集群, 准备如下配置文件,命名为 kind-cluster.yaml:

    kind: Cluster
    apiVersion: kind.x-k8s.io/v1alpha4
    nodes:
    - role: control-plane
      image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
    - role: worker
      image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
    - role: worker
      image: kindest/node:v1.28.0@sha256:b7a4cad12c197af3ba43202d3efe03246b3f0793f162afb40a33c923952d5b31
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    执行如下命令,创建 Kind 集群:

    kind create cluster --config kind-cluster.yaml
    
    • 1

    2. 安装部署 Kafka 组件

    这里使用 Helm 一键安装:

    helm repo add kafka-repo https://helm-charts.itboon.top/kafka
    helm repo update kafka-repo
    helm upgrade --install kafka \
      --namespace kafka \
      --create-namespace \
      --set broker.combinedMode.enabled="true" \
      --set broker.persistence.enabled="false" \
      kafka-repo/kafka
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    注意,此种安装关闭了持久化存储,单实例最小化运行,仅用于测试环境。

    3. 部署 Prometheus 监控组件

    git clone https://github.com/prometheus-operator/kube-prometheus.git
    cd kube-prometheus
    kubectl create -f manifests/setup
    kubectl create -f manifests/
    
    • 1
    • 2
    • 3
    • 4

    通过端口转发,访问 Prometheus 看板:

    kubectl port-forward service/prometheus-k8s 9090:9090 -n monitoring
    
    • 1

    执行上述端口转发命令后,浏览器访问 http://localhost:9090
    在这里插入图片描述

    4. 部署 Kafka Exporter 指标采集器

    准备如下 kafka-exporter.yaml 文件:

    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: kafka-exporter
      namespace: monitoring
      labels:
        app: kafka-exporter
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kafka-exporter
      template:
        metadata:
          labels:
            app: kafka-exporter
        spec:
          containers:
          - name: kafka-exporter
            image: danielqsj/kafka-exporter:v1.6.0
            imagePullPolicy: IfNotPresent
            args: ["--kafka.server=kafka-headless.kafka:9092"]
            ports:
            - containerPort: 9308
              name: http
    ---
    apiVersion: v1
    kind: Service
    metadata:
      labels:
        app: kafka-exporter
      name: kafka-exporter
      namespace: monitoring
    spec:
      type: ClusterIP
      ports:
      - name: http
        port: 9308
        protocol: TCP
        targetPort: 9308
      selector:
        app: kafka-exporter
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    执行如下命令安装:

    kubectl apply -f kafka-exporter.yaml 
    
    • 1

    5. 配置 Prometheus 采集 Kafka Exporter 的数据

    本文通过创建 ServiceMonitor 实现,准备如下 kafka-service-monitor.yaml 文件:

    ---
    apiVersion: monitoring.coreos.com/v1
    kind: ServiceMonitor
    metadata:
      labels:
        app: kafka-exporter
      name: prometheus-kafka-exporter
      namespace: monitoring
    spec:
      endpoints:
        - honorLabels: true
          interval: 1m
          path: /metrics
          port: http
          scheme: http
          params:
            target:
              - 'kafka-headless.kafka:9092'
          relabelings:
            - sourceLabels: [__param_target]
              targetLabel: instance
      namespaceSelector:
        matchNames:
          - monitoring
      selector:
        matchLabels:
          app: kafka-exporter
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    执行如下命令,创建 ServiceMonitor:

    kubectl apply -f kafka-service-monitor.yaml 
    
    • 1

    6. 创建测试 Topic

    通过 kubectl exec 进入 kafka 容器,执行 bin/kafka-topics.sh 命令创建 Topic:

    kubectl exec -it -n kafka kafka-broker-0 bash
    
    bin/kafka-topics.sh --bootstrap-server kafka-headless.kafka:9092 \
       --create --topic custom-topic \
       --replication-factor 1 \
       --partitions 3
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    7. 部署 kafka 消费者

    准备如下 kafka-consumer.yaml 文件:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: consumer-kafka-go-client
    spec:
      replicas: 1
      selector:
        matchLabels:
          lang: golang
          kafka: consumer
      template:
        metadata:
          labels:
            lang: golang
            kafka: consumer
        spec:
          containers:
          - name: consumer-kafka-go-client
            image: shidaqiu/kafka-client:1.1
            command:
            - ./consumer
            - kafka-headless.kafka:9092
            - custom-topic
            - golang-consumer
            - "100"
            # WaitMs
            - "2000"
            - plaintext
            resources:
              limits:
                cpu: 50m
                memory: 300Mi
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    执行如下命令创建:

    kubectl apply -f kafka-consumer.yaml 
    
    • 1

    8. 部署 kafka 生产者

    准备如下 kafka-producer.yaml 文件:

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: producer-kafka-go-client
    spec:
      replicas: 1
      selector:
        matchLabels:
          lang: golang
          kafka: producer
      template:
        metadata:
          labels:
            lang: golang
            kafka: producer
        spec:
          containers:
          - name: producer-kafka-go-client
            image: shidaqiu/kafka-client:1.1
            command:
            - ./producer
            - kafka-headless.kafka:9092
            - custom-topic
            - "100"
            - "10000"
            - none
            - "1000"
            # WaitMs
            - "2000"
            - plaintext
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    执行如下命令创建:

    kubectl apply -f kafka-producer.yaml 
    
    • 1

    9. 验证 Prometheus 采集到了相关指标

    登陆 Prometheus 看板,检查 kafka_consumergroup_lag 指标被成功采集

    在这里插入图片描述

    10. 部署 Prometheus Adaptor

    首先,下载 helm chart 包到本地

    helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
    helm repo update
    helm pull --untar prometheus-community/prometheus-adapter
    
    • 1
    • 2
    • 3

    然后,编辑其 values.yaml 文件,修改参数,包括设置 Prometheus URL 地址,以及增加 kafka_consumergroup_lag 的指标转换规则,示例如下:

    prometheus:
      # Value is templated
      url: http://prometheus-k8s.monitoring
      port: 9090
      path: ""
    
    rules:
      external: 
      - seriesQuery: '{topic!="", __name__=~"kafka_consumergroup_lag"}'
        resources:
          template: <<.Resource>>
        name:
          as: "hpa_kafka_consumergroup_lag"
        metricsQuery: sum(min_over_time(kafka_consumergroup_lag{<< range $key, $value :=.LabelValuesByName >><< if ne $key "namespace" >><< $key >>="<< $value >>",<>><< end >>}[1h])) by (topic,consumergroup)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    最后,使用新配置,部署 Prometheus Adaptor:

    helm upgrade --install prometheus-adaptor . -n monitoring
    
    • 1

    部署完成后,可以通过如下命令验证指标采集和转换无误:

    kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/monitoring/hpa_kafka_consumergroup_lag" | jq .
    
    • 1

    输入类似如下内容,则表示工作正常:

    {
      "kind": "ExternalMetricValueList",
      "apiVersion": "external.metrics.k8s.io/v1beta1",
      "metadata": {},
      "items": [
        {
          "metricName": "hpa_kafka_consumergroup_lag",
          "metricLabels": {
            "consumergroup": "golang-consumer",
            "topic": "custom-topic"
          },
          "timestamp": "2024-03-02T14:26:15Z",
          "value": "0"
        }
      ]
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    11. 配置 HPA 规则

    准备 consumer-hpa.yaml 文件:

    apiVersion: autoscaling/v2
    kind: HorizontalPodAutoscaler
    metadata:
      name: consumer-kafka-go-client-hpa
    spec:
      scaleTargetRef:
        apiVersion: apps/v1
        kind: Deployment
        name: consumer-kafka-go-client
      minReplicas: 1
      maxReplicas: 3
      metrics:
      - type: External
        external:
          metric: 
            name: hpa_kafka_consumergroup_lag
            selector:
              matchLabels:
                topic: custom-topic
          target:
            type: Value
            value: "300"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    执行如下命令,应用 HPA:

    kubectl apply -f consumer-hpa.yaml
    
    • 1

    12. 施加压力,观察 Pod 扩容

    通过如下命令扩容 producer 实例数量

    kubectl scale deploy/producer-kafka-go-client --replicas 2
    
    • 1

    可以观察到当 hpa_kafka_consumergroup_lag 超过 300 时,能够触发消费者实例扩容:

    在这里插入图片描述

    本文相关源码:https://github.com/SataQiu/kafka_metrics_hpa

    引用文献:
    • https://medium.com/@roman.noze/kubernetes-pods-autoscaling-with-kafka-metrics-9b7d5ec3c1d3
  • 相关阅读:
    python+selenium实现web自动化(基础入门)
    threejs 粒子系统和材质贴图
    【C++】1067:整数的个数(信息学奥赛)
    一文搞懂如何学习Android内部命令行工具集合
    Win10系统无法登录Xbox live的四种解决方法
    中国这么多 Java 开发者,应该诞生出生态级应用开发框架
    SLAM学了2年还是不会?每一步其实都是脚印
    vulhub struts2 s2-001漏洞复现
    List - Watch 通讯机制
    安卓桌面记事本便签软件哪个好用?
  • 原文地址:https://blog.csdn.net/shida_csdn/article/details/136443676