• 快速使用 Kafka


    快速使用 Kafka

    Kafka 依赖 Java 运行时环境,因此在启动 Kafka 或 ZooKeeper 之前,确保你的系统上已经安装了 Java。
    通过以下步骤安装 OpenJDK:

    1. 更新包信息:sudo apt update
    2. 安装 OpenJDK:sudo apt install default-jdk
      这将安装默认版本的 OpenJDK。如果你需要特定版本的 OpenJDK,可以使用相应的软件包名称。
    3. 安装完成后,验证 Java 安装是否成功:java -version
    4. 重新运行 ZooKeeper 或 Kafka 命令。

    步骤1:安装 Kafka

    1. 下载 Kafka: 前往 Apache Kafka 官方网站 下载最新的 Kafka 版本。

    2. 解压文件: 解压下载的 Kafka 压缩包到你选择的目录。

      tar -xzf kafka_2.12-3.6.0.tgz
      
      • 1
      • -x: 表示解压缩。
      • -z: 表示使用 gzip 解压。
      • -f: 后面跟着压缩包的文件名。
    3. 启动 ZooKeeper: Kafka 依赖于 ZooKeeper,因此在启动 Kafka 之前,需要先启动 ZooKeeper。进入 Kafka 解压目录中的 bin 目录,执行以下命令启动 ZooKeeper:

      ./zookeeper-server-start.sh ../config/zookeeper.properties
      
      • 1
    4. 启动 Kafka 服务器: 打开一个新的终端,进入 Kafka 解压目录中的 bin 目录,执行以下命令启动 Kafka 服务器:

      ./kafka-server-start.sh ../config/server.properties
      
      • 1

    步骤2:创建一个主题(Topic)

    在 Kafka 中,数据被发布到主题(Topic)中,消费者订阅这些主题以接收数据。

    1. 创建一个主题: 打开一个新的终端,进入 Kafka 解压目录中的 bin 目录,执行以下命令创建一个名为 my-topic 的主题:
      ./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
      
      • 1
      • --topic my-topic 指定创建的主题的名称为 “my-topic”。
      • --bootstrap-server localhost:9092 指定用于创建主题的 Kafka 集群的引导服务器地址。
      • --partitions 1 指定创建的主题将包含的分区数量。分区是 Kafka 中消息并行处理的单位。在这里,指定的分区数量是1,表示该主题只有一个分区。分区数量的选择通常取决于并行性需求和消息负载的分布。
      • --replication-factor 1 指定每个分区的副本数量。在这里,指定的副本数量是1,表示每个分区只有一个副本。副本用于提高数据的可靠性和容错性。如果有多个副本,数据会在多个 broker 上进行复制。通常,你会希望设置大于1的副本数量,以确保数据的可靠性。

    步骤3:生产者发送消息

    在 Kafka 中,生产者负责将消息发布到主题。

    1. 启动生产者: 打开一个新的终端,进入 Kafka 解压目录中的 bin 目录,执行以下命令启动一个生产者:

      ./kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
      
      • 1
    2. 发送消息: 在生产者终端中,输入一些消息并按回车键发送。

    步骤4:消费者接收消息

    在 Kafka 中,消费者订阅主题以接收生产者发送的消息。

    1. 启动消费者: 打开一个新的终端,进入 Kafka 解压目录中的 bin 目录,执行以下命令启动一个消费者:

      ./kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning
      
      • 1
      • --from-beginning
    2. 接收消息: 在消费者终端中,你将看到生产者发送的消息。

    使用Go语言kafka客户端库-Sarama 进行生产者和消费者的基本示例

    1. 已经安装了 Sarama 库:go get github.com/Shopify/sarama
    2. 在你的Go项目中引入 Sarama 包:import "github.com/Shopify/sarama"

    使用 Sarama 生产者

    package main
    
    import (
    	"log"
    	"os"
    	"os/signal"
    	"github.com/Shopify/sarama"
    )
    
    func main() {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll
    	config.Producer.Retry.Max = 5
    	config.Producer.Return.Successes = true
    
    	// 设置Kafka broker地址,根据你的实际环境修改
    	brokers := []string{"localhost:9092"}
    
    	producer, err := sarama.NewSyncProducer(brokers, config)
    	if err != nil {
    		log.Fatal("Error creating producer: ", err)
    	}
    
    	defer func() {
    		if err := producer.Close(); err != nil {
    			log.Fatal("Error closing producer: ", err)
    		}
    	}()
    
    	// 生产消息
    	message := &sarama.ProducerMessage{
    		Topic: "my-topic",
    		Value: sarama.StringEncoder("Hello, Kafka!"),
    	}
    
    	partition, offset, err := producer.SendMessage(message)
    	if err != nil {
    		log.Fatal("Error sending message: ", err)
    	}
    
    	log.Printf("Produced message to partition %d at offset %d\n", partition, offset)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    使用 Sarama 消费者

    package main
    
    import (
    	"log"
    	"os"
    	"os/signal"
    	"github.com/Shopify/sarama"
    )
    
    func main() {
    	config := sarama.NewConfig()
    	consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    	if err != nil {
    		log.Fatal("Error creating consumer: ", err)
    	}
    
    	defer func() {
    		if err := consumer.Close(); err != nil {
    			log.Fatal("Error closing consumer: ", err)
    		}
    	}()
    
    	// 订阅主题
    	topic := "my-topic"
    	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
    	if err != nil {
    		log.Fatal("Error creating partition consumer: ", err)
    	}
    
    	defer func() {
    		if err := partitionConsumer.Close(); err != nil {
    			log.Fatal("Error closing partition consumer: ", err)
    		}
    	}()
    
    	// 处理消息
    	// 创建一个通道 signals 用于接收操作系统发送的信号
    	signals := make(chan os.Signal, 1)
    	// 通过 signal.Notify 将 os.Interrupt 信号(通常由用户按下 Ctrl+C 触发)注册到这个通道。
    	signal.Notify(signals, os.Interrupt)
    
    ConsumerLoop:
    	for {
    		select {
    		case msg := <-partitionConsumer.Messages():
    			log.Printf("Consumed message: %s\n", string(msg.Value))
    		case <-signals:
    			break ConsumerLoop
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
  • 相关阅读:
    绝地求生鼠标宏怎么设置?
    网络安全和隐私保护技术
    HTML5期末考核大作业 基于HTML+CSS+JavaScript仿王者荣耀首页 游戏网站开发 游戏官网设计与实现
    .NET 7 发布的最后一个预览版Preview 7, 下个月发布RC
    通信原理学习笔记2-2:复信号分析(复信号与负频率、Hilbert变换获得正半频谱、单边带SSB调制原理)
    S-3A5001 DPDK性能优化
    Linux编程基础:1~6章实训编程题
    重置Mac电脑的SMC怎么操作,重置SMC方法分享~
    yaml初识
    【必会】BM41 输出二叉树的右视图【中等+】
  • 原文地址:https://blog.csdn.net/trinityleo5/article/details/134495659