• kafka基础(2):kafka quickstart


    官网:APACHE KAFKA QUICKSTART
     
    官网提供的quickstart可以让读者快速入门kafka:启动server、创建topic、生产数据到topic、本地启动一个multi-broker 集群,导入导出数据等功能

    Step 1: Download the code

    $ tar -xzf kafka_2.13-3.2.1.tgz
    $ cd kafka_2.13-3.2.1
    
    • 1
    • 2

     

    Step 2: Start the server

    kafka依赖zk,所以需要你启动一个zk server;可以通过运行一个kafka脚本来获取一个单节点的zk实例

    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
    ...
    
    • 1
    • 2
    • 3
    • 4

    现在可以启动kafka server了

    bin/kafka-server-start.sh config/server.properties
    
    INFO Verifying properties (kafka.utils.VerifiableProperties)
    INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
    ..
    
    • 1
    • 2
    • 3
    • 4
    • 5

     

    Step 3: Create a topic

    创建一个分区和副本都=1的topic:“test”
    > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    查看topic的列表
    > bin/kafka-topics.sh --list --bootstrap-server localhost:9092
    test
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    除了手动创建之外,还可以配置broker自动创建主题。 when a non-existent topic is published to.

     

    Step 4: WRITE SOME EVENTS INTO THE TOPIC

    kafka命令行的客户端,可以从文件或标准输入中获取输入,并将其作为消息发送到kafka集群中。
    默认的,每一行作为单独的消息进行发送。

    运行producer,然后键入一些信息到控制台发送到server中。

    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    
    • 1
    • 2
    • 3

     

    Step 5: Start a consumer

    命令行下的kafka消费者可以将message输出到标准输出。

    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message
    
    • 1
    • 2
    • 3

     

    Step 6: Setting up a multi-broker cluster

    在本地机器上:拓展集群到3个节点
    1、在每一个broker中创建一个配置文件

    > cp config/server.properties config/server-1.properties
    > cp config/server.properties config/server-2.properties
    
    • 1
    • 2

    2、编辑这些配置文件

    config/server-1.properties:
        broker.id=1
        listeners=PLAINTEXT://:9093
        log.dirs=/tmp/kafka-logs-1
    
    config/server-2.properties:
        broker.id=2
        listeners=PLAINTEXT://:9094
        log.dirs=/tmp/kafka-logs-2
    
    broker.id:是唯一的,并且是集群中每个节点的永久名字
    需要重写端口和日志目录。因为我们是在一台机器上运行,并且保证brokers之间不会注册到相同的端口,或覆盖其它broker的数据。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    因为已经有了zk和一个开启的node。所以只需要开启两个新的node。

    > bin/kafka-server-start.sh config/server-1.properties &
    ...
    > bin/kafka-server-start.sh config/server-2.properties &
    ...
    
    • 1
    • 2
    • 3
    • 4

    现在创建一个复制因子为3的topic

    > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    
    • 1

    查看

    > bin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic my-replicated-topic
    Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
    
    第一行概要了所有的分区,每一个分区都会有一行描述的信息。这里只有一个分区所以只有一行。
    leader节点:负责所有给定分区的读写,每一个节点都可能以随机的方式当选为分区的leader
    replicas:复制该分区日志的节点列表。
    isr:一组同步副本。是副本列表的子集:哪些处于活动状态的节点,用于选取leader。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

     

    Step 7: Use Kafka Connect to import/export data ing

    IMPORT/EXPORT YOUR DATA AS STREAMS OF EVENTS WITH KAFKA CONNECT

    Kafka Connect是kafka数据导入导出的工具。它是一个可拓展工具,可以自定义逻辑去和外部系统进行交互。

    我们看一个Kafka Connect从一个文件导入到kafka topic,并从kafka topic中导出数据到一个文件:
     

    1.准备测试文件

    > echo -e "foo\nbar" > test.txt
    
    • 1

     
    2. 启动两个以独立模式运行的连接器。

    > bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
    
    
    我们提供三个配置文件作为参数。
    第一个配置文件总是Kafka连接过程的配置,包含常见的配置,如Kafka要连接的broker和数据的序列化格式。
    其余的配置文件每个都指定要创建的连接器。这些文件包括唯一的连接器名称、要实例化的连接器类和连接器所需的任何其他配置。
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

     
     

  • 相关阅读:
    使用java8对Map通过key进行排序
    【推荐技术】基于协同过滤的网络信息推荐技术matlab仿真
    RabbitMQ之Exchange(交换机)属性及备用交换机解读
    Element Plus组件库中的input组件如何点击查看按钮时不可编辑,点击编辑时可编辑使用setup
    2022年全球市场BVM袋阀面罩总体规模、主要生产商、主要地区、产品和应用细分研究报告
    【力扣题解】2155. 分组得分最高的所有下标
    软件项目管理案例教程-韩万江-期末复习
    网课查题公众号如何搭建查题系统
    MySQL无法查看系统默认字符集以及校验规则
    x265参数-early-skip
  • 原文地址:https://blog.csdn.net/hiliang521/article/details/126591419