• Kafka入门,这一篇就够了(安装,topic,生产者,消费者)



    Kafka的安装

    前提,已安装docker和docker-compose。
    拉取镜像

    docker pull bitnami/zookeeper:latest
    docker pull bitnami/kafka:latest
    
    • 1
    • 2

    docker-compose.yaml如下

    version: '3'
    services:
      zookeeper:
        image: 'bitnami/zookeeper:latest'
        ports:
          - '2181:2181'
        environment:
          - ALLOW_ANONYMOUS_LOGIN=yes
      kafka:
        image: 'bitnami/kafka:latest'
        ports:
          - '9092:9092'
        environment:
          - KAFKA_BROKER_ID=1
          - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
          - KAFKA_CFG_ADVERTISED_LISTENER=PLAINTEXT://127.0.0.1:9092
          - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
          - ALLOW_PLAINTEXT_LISTENER=yes
        depends_on:
          - zookeeper
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    启动命令

    docker-compose up -d
    
    • 1

    截图
    在这里插入图片描述
    之后的相关命令若涉及容器id,请自行更换

    文件与配置

    目录

    docker exec -it a0 ls /opt/bitnami/kafka
    
    • 1

    查看目录命令

    截图
    在这里插入图片描述
    重要目录解释如下:

    • bin: 脚本目录
    • config:配置目录
    • libs:第三方依赖库目录
    • logs:日志

    bin

    重要的shell脚本加粗了,之后会用

    connect-distributed.sh kafka-dump-log.sh kafka-storage.sh
    connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh
    connect-standalone.sh kafka-get-offsets.sh kafka-topics.sh
    kafka-acls.sh kafka-leader-election.sh kafka-transactions.sh
    kafka-broker-api-versions.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh
    kafka-cluster.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh
    kafka-configs.sh kafka-mirror-maker.sh trogdor.sh
    kafka-console-consumer.sh kafka-producer-perf-test.sh windows
    kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh
    kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh
    kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh
    kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh
    kafka-delete-records.sh kafka-server-stop.sh

    config

    connect-console-sink.properties connect-mirror-maker.properties server.properties
    connect-console-source.properties connect-standalone.properties tools-log4j.properties
    connect-distributed.properties consumer.properties trogdor.conf
    connect-file-sink.properties kraft zookeeper.properties
    connect-file-source.properties log4j.properties
    connect-log4j.properties producer.properties

    配置文件

    server.properties

    • broker.id: 唯一id值,通过环境变量设置为了1
    • log.dirs: kafka集群日志目录,默认是log.dirs=/bitnami/kafka/data
    • zookeeper.connect:zookeeper地址端口,格式域名/ip:port,这块是zookeeper:2181,在docker的网络中可以解析为另一容器的ip

    更多配置可以查看参考中Dockerhub链接的Configuration部分
    在这里插入图片描述

    producer.properties

    • bootstrap.servers:kafka的ip:port,这里是localhost:9092
    • compression.type:压缩类型,默认是none, 一共有四种,none, gzip, snappy, lz4, zstd,推荐排序LZ4 > GZIP > Snappy,详见腾讯云压缩算法对比

    consumer.properties

    • group.id:消费者组id,默认为test-consumer-group
    • auto.offset.reset:offset设置,三种latest, earliest, none,看情况设置

    命令行简单使用

    kafka-topics.sh

    对主题topic进行增删改查的工具
    在这里插入图片描述

    常用选项如下:

    • --bootstrap-server:kafka服务器ip:port,必须的
    • --create:创建主题
    • --delete:删除主题
    • --describe:描述主题
    • --list:查看主题列表
    • --alter:修改主题的 partitions等
    • --topic :主题名
    • --topic-id :主题id
    • --partitions :主题的partition

    新增

    命令

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic lady_killer9
    
    • 1

    截图
    在这里插入图片描述

    查看列表

    命令

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list
    
    • 1

    截图
    在这里插入图片描述

    查看详情

    命令

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic lady_killer9
    
    • 1

    截图
    在这里插入图片描述

    修改

    命令
    以修改主题partiion数量为例

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic lady_killer9 --partitions 3
    
    • 1

    截图
    在这里插入图片描述

    删除

    命令

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic lady_killer9
    
    • 1

    截图
    在这里插入图片描述

    kafka-console-producer.sh

    标准输入读数据,发送到Kafka的工具
    在这里插入图片描述
    常用选项如下:

    • --bootstrap-server:kafka服务器ip:port,必须的
    • --topic :Kafka主题,必须的
    • --sync:同步发送
    • --compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.

    命令

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo
    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-producer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo
    
    • 1
    • 2

    截图
    在这里插入图片描述

    kafka-console-consumer.sh

    在这里插入图片描述
    常用选项如下:

    • --bootstrap-server:kafka服务器ip:port,必须的
    • --topic :Kafka主题,必须的
    • --group :消费者组id
    • --key-deserializer :key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
    • --value-deserializer :value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
    • --offset :消费的offset
    • --partition :消费的分区

    命令

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo
    
    • 1

    截图
    在这里插入图片描述
    命令

    docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2
    
    • 1

    截图
    在这里插入图片描述
    上手之后我们再来了解一些概念。

    概念

    集群

    已发布的消息保存在一组服务器中,称为Kafka集群。

    代理broker

    集群中的每一个服务器都是一个代理。

    主题topic

    每条发布到kafka集群的消息都有一个主题,这个主题被称为topic。每个topic都由一个或者多个分区构成。

    分区partition

    topic的partition数量可以在创建时配置,partition数量决定了每个消费者组中并发消费者的最大数量

    分区的原则:

    • 生产者指定了partition,则直接使用
    • 未指定partition但指定了key,通过对key的value进行hash出一个partition
    • partition和key都未指定,使用轮询选出一个partition

    偏移量offset

    任何发布到partition的消息都会被直接追加到partition尾部,每条消息的位置称为offset,offset是一个long型数字,它唯一标记一条消息。消费者可以通过(topic、partition、offset)跟踪记录。

    生产者producer

    push消息到topc的叫生产者,push后可以获得offset。生产者可以指定partition,但不建议这么做。

    消费者组consumer group

    包含多个消费者,有一个 group id,可以订阅topic进行消费。消费偏移以消费者组为单位。

    消费者consumer

    从topic中pull数据,可以指定partition和offset。

    FAQ

    如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?

    Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,因此,topic下只有一个partition时一定有序。

    如何设置分区和消费者数?

    建议分区数与消费者数一致,防止消费不过来。

    参考

    dockerhub-bitnami/kafka
    腾讯云CKafka 压缩算法对比
    python-kafka客户端封装

  • 相关阅读:
    束带机安全使用须知
    【Nginx27】Nginx学习:代理模块(一)基本配置与概念
    StringBuffer类 和StringBuilder类
    全面解读 AWS Private 5G 的革新理念
    队列的各个函数的实现
    hadoop can‘t installation $HADOOP_HOME or $HADOOP_PREFIX must be set
    【猫鼠游戏】一个半径为 1 的圆形水池圆心有一只老鼠,池边有一只猫。
    SpringBoot (profile)以及配置文件的加载顺序
    【04】Istio的pilot流量分发机制
    点云从入门到精通技术详解100篇-基于光谱共焦系统的三维点云数据处理(中)
  • 原文地址:https://blog.csdn.net/lady_killer9/article/details/128753786