• Docker部署kafka|Go操作实践


    前言

    写作本文的背景是由于字节的暑期青训营中,某个项目要求编写一个简易的流处理引擎(flink),开发语言不限,推荐Java,本着好奇心的驱使,我打算使用Go语言进行部分尝试。

    既然是流处理引擎,那么首先需要有流式的数据源,一般而言,flink会配合从kafka中获取数据流,先不考虑后续编写引擎的部分,本文将着重于kafka的部署,并且后半段将给出使用Go语言编写kafka生产者消费者

    如果你只是希望完成kafka的部署,而不想局限于Go语言,只需要着重阅读文章的前半部分,后文的Go语言操作部分可以给你提供一些思路,你只需要找寻适合语言如Javakafka client库去完成生产者消费者的编写即可。

    部署kafka

    docker前置知识

    下文的实践需要你拥有基本的docker操作能力,如果未曾掌握docker知识点,推荐阅读这两篇文章:

    docker | jenkins 实现自动化部署项目,后端躺着把运维的钱挣了!(上)

    docker | jenkins 自动化CI/CD,后端躺着把运维的钱挣了!(下)

    docker-compose

    编写docker-compose.yml,通过docker容器部署单节点kafka

    version: '3'
    services:
        zookeeper: 
            image: wurstmeister/zookeeper:3.4.6 
            volumes: 
                - ./zookeeper_data:/opt/zookeeper-3.4.6/data 
            container_name: zookeeper 
            ports: 
                - "10002:2181" 
                - "10003:2182" 
            restart: always
    
        kafka: 
            image: wurstmeister/kafka 
            container_name: kafka_01 
            depends_on: 
                - zookeeper 
            ports: 
                - "10004:9092" 
            volumes: 
                - ./kafka_log:/kafka 
            environment: 
                - KAFKA_BROKER_NO=0 
                - KAFKA_BROKER_ID=0 
                - KAFKA_LISTENERS=PLAINTEXT://kafka_01:9092                     # kafka tcp 侦听的ip
                - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://服务器ip:10004        # kafka broker侦听的ip
                - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT 
                - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
                - KAFKA_HEAP_OPTS=-Xmx512M -Xms16M
            restart: always
        # kafka集群管理面板
        kafka_manager: 
            image: sheepkiller/kafka-manager 
            ports: 
                - "10005:9000" 
            environment: 
                - ZK_HOSTS=zookeeper:2181 
            depends_on: 
                - zookeeper 
                - kafka 
            restart: always
    

    后台运行

    docker-compose up -d
    

    docker ps命令查看容器是否启动成功

    通过上述docker-compose.yml部署会运行三个容器,选择进入kafka容器

    docker exec -it kafka容器id /bin/bash
    # 进入kafka目录
    cd /opt/kafka_2.13-2.8.1/
    

    在容器内创建topictopic是kafka中数据管理的基本单位,或者说集合,每一个topic可以管理多个partition,编码操作时:你可以往对应kafka服务器ip+port+topic+partition去发送和读取数据。

    bin/kafka-topics.sh --create --zookeeper 服务器ip:2181 --replication-factor 1 -partitions 1 --topic test
    

    业务编写

    Go语言中连接kafka使用第三方库: github.com/Shopify/sarama

    go get github.com/segmentio/kafka-go
    

    sarama库的简易操作可以参照文档(消费者的编写文档中有坑):文档地址

    如下使用kafka client库进行编码所涉及的API操作比较简单,流程上或许不够规范,请酌情参考。

    producer

    文档中生产者只发送了一条数据后就会关闭,这里我改成了每秒钟发送一次。

    consumer

    文档中消费者虽然开启了Go协程(类比于Java的线程)去读取kafka的数据,但是由于主程序执行顺序执行完毕后,子协程也会终止,导致子协程还没有读取成功/打印数据,整个程序就已经关闭运行了。

    因此我做了一些改动,在子协程退出之前,保持主程序不会退出(使用Go语言的WaitGroup),如果简单粗暴在main函数末尾设置一个很长的程序sleep时间,也是可以实现打印输出的。

    生产&消费

    确保kafka容器正常运行,kafka服务器防火墙端口正常开放,运行消费者程序,运行生产者程序。这个生产者每秒向kafka发送一条测试数据:this is a test log,你也可以添加上程序运行时间进行测试。

    事实上被客户端消费后的数据并没有马上从kafka删除,这里不多做介绍,各位自行了解~

    小结

    本文讲解了使用docker-compose部署单节点kafka的流程,后续通过修改docker-compose.yml的内容也可以实现kafka集群的部署,并且,在较新版本的kafka中,集群的部署可以脱离zookeeper,但是经过了解,由于功能并不完善,这里还是选择了基于zookeeper的部署。

  • 相关阅读:
    嵌入式Qt 实现用户界面与业务逻辑分离
    【SemiDrive源码分析】【驱动BringUp】41 - LCM 驱动 backlight 背光控制原理分析
    MySQL基础学习笔记
    内网渗透(八)横向移不动
    BP神经网络参数设置总结
    “蔚来杯“2022牛客暑期多校训练营7,签到题CFGJ
    【离散数学】代数结构
    Troubleshooting 专题 - 问正确的问题 得到正确的答案
    Dopamine-PEG-NH2,NH2-PEG-DOP,氨基聚乙二醇多巴胺,材料改性用科研试剂
    计算机毕业设计 基于微信小程序的学习资料销售平台的设计与实现 Java实战项目 附源码+文档+视频讲解
  • 原文地址:https://www.cnblogs.com/YLTFY1998/p/16550406.html