• Linux Kafka 3.5 KRaft模式集群部署


    这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

    背景

    kafkaKIP-500引入了KRaft替代Zookeeper来实现自我管理元数据

    详细信息可以看原文链接

    KRaft简介

    KRaft是kafka用来取代zookeeper的分布式协调管理组件。

    架构改变

    原先依赖于Zookeeper选举出一个controller
    现在由KRaft集群中自己选举,产生一个controller

    优点

    • Kafka不用再依赖外部框架,能够做到独立运行
    • Kafka集群扩展时不用再受到Zookeeper读写能力的限制

    更多优点和缺点这里暂时不太多讨论主要以部署为主

    部署3节点kafaka集群

    KRaft部署方式支持controllerbroker在同一进程。也支持分开部署
    线上推荐分开部署。这里由于是测试集群,打算controllerbroker在同一进程部署

    记得所有机器90929093端口打开

    下载Kafka

    wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
    
    • 1

    这里第一次下载报错说证书已过期,添加证书忽略下载

    wget --no-check-certificate https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz
    
    • 1

    发现国内服务器下载国外软件还是非常慢。最终决定找国内镜像。

    • 阿里云Kafka镜像:http://mirrors.aliyun.com/apache/kafka/3.5.0/?spm=a2c6h.25603864.0.0.3c7d126emg02YS

    使用国内镜像下载

    wget http://mirrors.aliyun.com/apache/kafka/3.5.0/kafka_2.13-3.5.0.tgz
    
    • 1

    三台机器都执行

    解压

    tar -xzf kafka_2.13-3.5.0.tgz
    
    • 1

    三台机器都执行

    给集群生成一个UUID

    我们进入到解压的bin目录,我这里是/data/kafka_2.13-3.5.0/bin
    然后执行如下命令

    kafka_2.13-3.0.0/bin/kafka-storage.sh random-uuid
    
    • 1

    单台机器生成即可

    执行完会生产一个字符串,类似这样xgK3spReSO7ijVK4rEbbbQ

    格式化存储路径

    sh kafka-storage.sh format -t xgK3spReSO7ijVK4rEbbbQ  -c ../config/kraft/server.properties
    
    • 1

    三台机器都执行

    修改配置

    我们这里要修改三台机器的server.properties配置
    我这里的路径是在/data/kafka_2.13-3.5.0/config/kraft/server.properties

    • node1
    node.id = 1
    controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
    process.roles = broker,controller
    listeners=PLAINTEXT://192.168.1.1:9092,CONTROLLER://92.168.1.1:9093
    log.dirs=/data/kakfa01/logs
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • node2
    node.id = 2
    controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
    process.roles = broker,controller
    listeners=PLAINTEXT://192.168.1.2:9092,CONTROLLER://92.168.1.2:9093
    log.dirs=/data/kakfa02/logs
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • node3
    node.id = 3
    controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
    process.roles = broker,controller
    listeners=PLAINTEXT://192.168.1.3:9092,CONTROLLER://92.168.1.3:9093
    log.dirs=/data/kakfa03/logs
    
    • 1
    • 2
    • 3
    • 4
    • 5

    启动集群

    export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"&&nohup sh /data/kafka_2.13-3.5.0/bin/kafka-server-start.sh /data/kafka_2.13-3.5.0/config/kraft/server.properties &
    
    • 1

    三台机器都执行

    启动完我们就有了一个三节点的kafka集群

    测试

    创建topic

    sh kafka-topics.sh --create --topic xiaozou --partitions 1 --replication-factor 1 --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
    
    • 1

    查看topic

    sh kafka-topics.sh --list --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092
    
    • 1

    代码测试

    • 生产消息
    public class KafkaProducer {
    
        private static final String TOPIC = "xiaozou";
        private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";
    
        public static void main(String[] args) {
            // 生产消息
            produceMessage();
        }
    
        private static void produceMessage() {
            Properties props = new Properties();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
            Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
    
            try {
                for (int i = 0; i < 10; i++) {
                    String message = "小奏message " + i;
                    System.out.println("开始发送消息");
                    Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC, message));
                    RecordMetadata recordMetadata = send.get();
                    System.out.println("Produced message: " + message);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 消费消息
    public class KafkaConsumerExample {
    
        private static final String TOPIC_NAME = "xiaozou";
    
        private static final String GROUP_ID = "xiaozou_gid";
    
        private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";
    
    
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
            props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
    
    
            // 创建消费者实例
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            // 订阅主题
            consumer.subscribe(Collections.singletonList(TOPIC_NAME));
    
            // 消费消息
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("接收到消息:key = " + record.key() + ", value = " + record.value() +
                            ", partition = " + record.partition() + ", offset = " + record.offset());
                    }
                    consumer.commitSync(); // 手动提交偏移量
                }
            } finally {
                consumer.close();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
  • 相关阅读:
    “Sources, Summary” report——“来源,摘要”报告
    算法学习笔记(18): 平衡树(一)
    opencv4第二章
    人工智能(AI)是一种快速发展的技术,其未来发展前景非常广阔。
    IIC通信----基本原理
    Remote Desktop Service (RDS) 远程桌面服务漏洞简介 BlueKeep DejaBlue
    Leetcode3200. 三角形的最大高度
    6.DApp-用Web3实现前端与智能合约的交互
    删除的照片怎么找回?教你5个好方法快速恢复
    docker基础认知(镜像+容器+仓库+客户端与服务器)
  • 原文地址:https://blog.csdn.net/qq_42651904/article/details/132736141