• K8s - 安装部署Kafka、Zookeeper集群教程(支持从K8s外部访问)


    本文演示如何在K8s集群下部署Kafka集群,并且搭建后除了可以K8s内部访问Kafka服务,也支持从K8s集群外部访问Kafka服务。服务的集群部署通常有两种方式:一种是 StatefulSet,另一种是 Service&Deployment。本次我们使用 StatefulSet 方式搭建 ZooKeeper 集群,使用 Service&Deployment 搭建 Kafka 集群。

    一、创建 NFS 存储

        NFS 存储主要是为了给 Kafka、ZooKeeper 提供稳定的后端存储,当 Kafka、ZooKeeper 的 Pod 发生故障重启或迁移后,依然能获得原先的数据。

    1,安装 NFS

    这里我选择在 master 节点创建 NFS 存储,首先执行如下命令安装 NFS:
    yum -y install nfs-utils
    yum -y install rpcbind

    2,创建共享文件夹

    (1)执行如下命令创建 6 个文件夹:
    mkdir -p /usr/local/k8s/zookeeper/pv{1..3}
    mkdir -p /usr/local/k8s/kafka/pv{1..3}

    (2)编辑 /etc/exports 文件:

    vim /etc/exports

    (3)在里面添加如下内容:

    复制代码
    /usr/local/k8s/kafka/pv1 *(rw,sync,no_root_squash)
    /usr/local/k8s/kafka/pv2 *(rw,sync,no_root_squash)
    /usr/local/k8s/kafka/pv3 *(rw,sync,no_root_squash)
    /usr/local/k8s/zookeeper/pv1 *(rw,sync,no_root_squash)
    /usr/local/k8s/zookeeper/pv2 *(rw,sync,no_root_squash)
    /usr/local/k8s/zookeeper/pv3 *(rw,sync,no_root_squash)
    复制代码

    (4)保存退出后执行如下命令重启服务:

    如果执行 systemctl restart nfs 报“Failed to restart nfs.service: Unit nfs.service not found.”错误,可以尝试改用如下命令:

      • sudo service nfs-server start
    systemctl restart rpcbind
    systemctl restart nfs
    systemctl enable nfs

    (5)执行 exportfs -v 命令可以显示出所有的共享目录:

     (6)而其他的 Node 节点上需要执行如下命令安装 nfs-utils 客户端:

    yum -y install nfs-util

    (7)然后其他的 Node 节点上可执行如下命令(ip 为 Master 节点 IP)查看 Master 节点上共享的文件夹:

    showmount -e  107.106.37.33(nfs服务端的IP)

     

    二、创建 ZooKeeper 集群

    1,创建 ZooKeeper PV

    (1)首先创建一个 zookeeper-pv.yaml 文件,内容如下:

    注意:170.106.37.33 需要改成实际 NFS 服务器地址:
    复制代码
    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: k8s-pv-zk01
      labels:
        app: zk
      annotations:
        volume.beta.kubernetes.io/storage-class: "anything"
    spec:
      capacity:
        storage: 1Gi
      accessModes:
        - ReadWriteOnce
      nfs:
        server: 170.106.37.33
        path: "/usr/local/k8s/zookeeper/pv1"
      persistentVolumeReclaimPolicy: Recycle
    ---
    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: k8s-pv-zk02
      labels:
        app: zk
      annotations:
        volume.beta.kubernetes.io/storage-class: "anything"
    spec:
      capacity:
        storage: 1Gi
      accessModes:
        - ReadWriteOnce
      nfs:
        server: 170.106.37.33
        path: "/usr/local/k8s/zookeeper/pv2"
      persistentVolumeReclaimPolicy: Recycle
    ---
    apiVersion: v1
    kind: PersistentVolume
    metadata:
      name: k8s-pv-zk03
      labels:
        app: zk
      annotations:
        volume.beta.kubernetes.io/storage-class: "anything"
    spec:
      capacity:
        storage: 1Gi
      accessModes:
        - ReadWriteOnce
      nfs:
        server: 170.106.37.33
        path: "/usr/local/k8s/zookeeper/pv3"
      persistentVolumeReclaimPolicy: Recycle
    复制代码

    (2)然后执行如下命令创建 PV:

    kubectl apply -f zookeeper-pv.yaml

    (3)执行如下命令可以查看是否创建成功:

    kubectl get pv

    2,创建 ZooKeeper 集群

    (1)我们这里要搭建一个包含 3 个节点的 ZooKeeper 集群。首先创建一个 zookeeper.yaml 文件,内容如下:
    复制代码
    apiVersion: v1
    kind: Service
    metadata:
      name: zk-hs
      labels:
        app: zk
    spec:
      selector:
        app: zk
      clusterIP: None
      ports:
        - name: server
          port: 2888
        - name: leader-election
          port: 3888
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: zk-cs
      labels:
        app: zk
    spec:
      selector:
        app: zk
      type: NodePort
      ports:
        - name: client
          port: 2181
          nodePort: 31811
    ---
    apiVersion: apps/v1
    kind: StatefulSet
    metadata:
      name: zk
    spec:
      serviceName: "zk-hs"
      replicas: 3 # by default is 1
      selector:
        matchLabels:
          app: zk # has to match .spec.template.metadata.labels
      updateStrategy:
        type: RollingUpdate
      podManagementPolicy: Parallel
      template:
        metadata:
          labels:
            app: zk # has to match .spec.selector.matchLabels
        spec:
          containers:
            - name: zk
              imagePullPolicy: Always
              image: leolee32/kubernetes-library:kubernetes-zookeeper1.0-3.4.10
              ports:
                - containerPort: 2181
                  name: client
                - containerPort: 2888
                  name: server
                - containerPort: 3888
                  name: leader-election
              command:
                - sh
                - -c
                - "start-zookeeper \
            --servers=3 \
            --data_dir=/var/lib/zookeeper/data \
            --data_log_dir=/var/lib/zookeeper/data/log \
            --conf_dir=/opt/zookeeper/conf \
            --client_port=2181 \
            --election_port=3888 \
            --server_port=2888 \
            --tick_time=2000 \
            --init_limit=10 \
            --sync_limit=5 \
            --heap=4G \
            --max_client_cnxns=60 \
            --snap_retain_count=3 \
            --purge_interval=12 \
            --max_session_timeout=40000 \
            --min_session_timeout=4000 \
            --log_level=INFO"
              readinessProbe:
                exec:
                  command:
                    - sh
                    - -c
                    - "zookeeper-ready 2181"
                initialDelaySeconds: 10
                timeoutSeconds: 5
              livenessProbe:
                exec:
                  command:
                    - sh
                    - -c
                    - "zookeeper-ready 2181"
                initialDelaySeconds: 10
                timeoutSeconds: 5
              volumeMounts:
                - name: datadir
                  mountPath: /var/lib/zookeeper
      volumeClaimTemplates:
        - metadata:
            name: datadir
            annotations:
              volume.beta.kubernetes.io/storage-class: "anything"
          spec:
            accessModes: [ "ReadWriteOnce" ]
            resources:
              requests:
                storage: 1Gi
    复制代码

    (2)然后执行如下命令开始创建:

    kubectl apply -f zookeeper.yaml

    (3)执行如下命令可以查看是否创建成功:

    kubectl get pods
    kubectl get service

     

    三、创建 Kafka 集群

    (1)我们这里要搭建一个包含 3 个节点的 Kafka 集群。首先创建一个 kafka.yaml 文件,内容如下:
    注意:
    • nfs 地址需要改成实际 NFS 服务器地址。
    • status.hostIP 表示宿主机的 IP,即 Pod 实际最终部署的 Node 节点 IP(本文我是直接部署到 Master 节点上),将 KAFKA_ADVERTISED_HOST_NAME 设置为宿主机 IP 可以确保 K8s 集群外部也可以访问 Kafka
    复制代码

    apiVersion: v1
    kind: Service
    metadata:
      name: kafka-service-1
    labels:
      app: kafka-service-1
    spec:
      type: NodePort
      ports:
        - port: 9092
          name: kafka-service-1
          targetPort: 9092
          nodePort: 30901
          protocol: TCP
        selector:
          app: kafka-1
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: kafka-service-2
    labels:
      app: kafka-service-2
    spec:
      type: NodePort
      ports:
        - port: 9092
          name: kafka-service-2
          targetPort: 9092
          nodePort: 30902
          protocol: TCP
        selector:
          app: kafka-2
    ---
    apiVersion: v1
    kind: Service
    metadata:
      name: kafka-service-3
    labels:
      app: kafka-service-3
    spec:
      type: NodePort
      ports:
        - port: 9092
          name: kafka-service-3
          targetPort: 9092
          nodePort: 30903
          protocol: TCP
        selector:
          app: kafka-3
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: kafka-deployment-1
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kafka-1
      template:
        metadata:
          labels:
            app: kafka-1
        spec:
          containers:
            - name: kafka-1
              image: wurstmeister/kafka
              imagePullPolicy: IfNotPresent
              ports:
                - containerPort: 9092
              env:
                - name: KAFKA_ZOOKEEPER_CONNECT
                  value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
                - name: KAFKA_BROKER_ID
                  value: "1"
                - name: KAFKA_CREATE_TOPICS
                  value: mytopic:2:1
                - name: KAFKA_LISTENERS
                  value: PLAINTEXT://0.0.0.0:9092
                - name: KAFKA_ADVERTISED_PORT
                  value: "30901"
                - name: KAFKA_ADVERTISED_HOST_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: status.hostIP
              volumeMounts:
                - name: datadir
                  mountPath: /var/lib/kafka
          volumes:
             - name: datadir
               nfs:
               server: 170.106.37.33
               path: "/usr/local/k8s/kafka/pv1"
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: kafka-deployment-2
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kafka-2
      template:
        metadata:
          labels:
            app: kafka-2
        spec:
          containers:
            - name: kafka-2
              image: wurstmeister/kafka
              imagePullPolicy: IfNotPresent
              ports:
                - containerPort: 9092
              env:
                - name: KAFKA_ZOOKEEPER_CONNECT
                  value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
                - name: KAFKA_BROKER_ID
                  value: "2"
                - name: KAFKA_LISTENERS
                  value: PLAINTEXT://0.0.0.0:9092
                - name: KAFKA_ADVERTISED_PORT

                  value: "30902"
                - name: KAFKA_ADVERTISED_HOST_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: status.hostIP
             volumeMounts:
                - name: datadir
                  mountPath: /var/lib/kafka
          volumes:
            - name: datadir
              nfs:
                server: 170.106.37.33
                path: "/usr/local/k8s/kafka/pv2"
    ---
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: kafka-deployment-3
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kafka-3
      template:
        metadata:
          labels:
            app: kafka-3
        spec:
          containers:
            - name: kafka-3
              image: wurstmeister/kafka
              imagePullPolicy: IfNotPresent
              ports:
                - containerPort: 9092
              env:
                - name: KAFKA_ZOOKEEPER_CONNECT
                  value: zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181
                - name: KAFKA_BROKER_ID
                  value: "3"
                - name: KAFKA_LISTENERS
                  value: PLAINTEXT://0.0.0.0:9092
                - name: KAFKA_ADVERTISED_PORT
                  value: "30903"
                - name: KAFKA_ADVERTISED_HOST_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: status.hostIP
             volumeMounts:
                - name: datadir
                  mountPath: /var/lib/kafka
          volumes:
             - name: datadir
               nfs:
                 server: 170.106.37.33
                 path: "/usr/local/k8s/kafka/pv3"

    复制代码

    (2)然后执行如下命令开始创建:

    kubectl apply -f kafka.yaml

    (3)执行如下命令可以查看是否创建成功:

    kubectl get pods
    kubectl get service

     

    四、开始测试

    1,K8s 集群内部测试

    (1)首先执行如下命令进入一个容器:
    kubectl exec -it kafka-deployment-1-59f87c7cbb-99k46 /bin/bash

    (2)接着执行如下命令创建一个名为 test_topic 的 topic:

    kafka-topics.sh --create --topic test_topic --zookeeper zk-0.zk-hs.default.svc.cluster.local:2181,zk-1.zk-hs.default.svc.cluster.local:2181,zk-2.zk-hs.default.svc.cluster.local:2181 --partitions 1 --replication-factor 1

    (3)创建后执行如下命令开启一个生产者,启动后可以直接在控制台中输入消息来发送,控制台中的每一行数据都会被视为一条消息来发送。

    kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic

    (4)重新再打开一个终端连接服务器,然后进入容器后执行如下命令开启一个消费者:

    kafka-console-consumer.sh --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic test_topic

    (5)再次打开之前的消息生产客户端来发送消息,并观察消费者这边对消息的输出来体验 Kafka 对消息的基础处理。

     

    2,集群外出测试

        使用 Kafka 客户端工具(Offset Explorer)连接 Kafka 集群(可以通过 zookeeper 地址连接,也可以通过 kafka 地址连接),可以连接成功并能查看到数据。

     

    更多的测试命令参考:

    复制代码
    二、Kafka生产者消费者实例(基于命令行)
    1.创建一个itcasttopic的主题
    代码如下(示例):
    kafka-topics.sh --create --topic itcasttopic --partitions 3 --replication-factor 2 -zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181
    
    2.hadoop01当生产者
    代码如下(示例):
    kafka-console-producer.sh --broker-list kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092 --topic itcasttopic
    
    3.hadoop02当消费者
    代码如下(示例):
    kafka-console-consumer.sh --from-beginning --topic itcasttopic --bootstrap-server kafka-service-1:9092,kafka-service-2:9092,kafka-service-3:9092
    
    3.–list查看所有主题
    代码如下(示例):
    kafka-topics.sh --list --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181
    
    4.删除主题
    代码如下(示例):
    kafka-topics.sh --delete --zookeeper 10.0.0.27:2181,10.0.0.103:2181,10.0.0.37:2181 --topic itcasttopic
    
    5.关闭kafka
    代码如下(示例):
    bin/kafka-server-stop.sh config/server.properties
    复制代码

     

     

     

     

     

  • 相关阅读:
    MySQL高级篇01【字符集、SQL规范和sql_mode设置】
    Linux 命令【10】:curl
    Vue的学习补充
    QChartView显示实时更新的温度曲线图,即动态曲线图。
    动态规划--区间dp
    自学 TypeScript 第二天 编译选项
    汽车电子时钟硬件设计指南
    MongoDB聚合运算符:$shift
    扑克牌大小(Java语言)
    线上价格监测,如何才算到手价
  • 原文地址:https://www.cnblogs.com/fengyuanfei/p/17789107.html