Kafka 依赖 Java 运行时环境,因此在启动 Kafka 或 ZooKeeper 之前,确保你的系统上已经安装了 Java。
通过以下步骤安装 OpenJDK:
sudo apt updatesudo apt install default-jdkjava -version下载 Kafka: 前往 Apache Kafka 官方网站 下载最新的 Kafka 版本。
解压文件: 解压下载的 Kafka 压缩包到你选择的目录。
tar -xzf kafka_2.12-3.6.0.tgz
启动 ZooKeeper: Kafka 依赖于 ZooKeeper,因此在启动 Kafka 之前,需要先启动 ZooKeeper。进入 Kafka 解压目录中的 bin 目录,执行以下命令启动 ZooKeeper:
./zookeeper-server-start.sh ../config/zookeeper.properties
启动 Kafka 服务器: 打开一个新的终端,进入 Kafka 解压目录中的 bin 目录,执行以下命令启动 Kafka 服务器:
./kafka-server-start.sh ../config/server.properties
在 Kafka 中,数据被发布到主题(Topic)中,消费者订阅这些主题以接收数据。
./kafka-topics.sh --create --topic my-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
--topic my-topic 指定创建的主题的名称为 “my-topic”。--bootstrap-server localhost:9092 指定用于创建主题的 Kafka 集群的引导服务器地址。--partitions 1 指定创建的主题将包含的分区数量。分区是 Kafka 中消息并行处理的单位。在这里,指定的分区数量是1,表示该主题只有一个分区。分区数量的选择通常取决于并行性需求和消息负载的分布。--replication-factor 1 指定每个分区的副本数量。在这里,指定的副本数量是1,表示每个分区只有一个副本。副本用于提高数据的可靠性和容错性。如果有多个副本,数据会在多个 broker 上进行复制。通常,你会希望设置大于1的副本数量,以确保数据的可靠性。在 Kafka 中,生产者负责将消息发布到主题。
启动生产者: 打开一个新的终端,进入 Kafka 解压目录中的 bin 目录,执行以下命令启动一个生产者:
./kafka-console-producer.sh --topic my-topic --bootstrap-server localhost:9092
发送消息: 在生产者终端中,输入一些消息并按回车键发送。
在 Kafka 中,消费者订阅主题以接收生产者发送的消息。
启动消费者: 打开一个新的终端,进入 Kafka 解压目录中的 bin 目录,执行以下命令启动一个消费者:
./kafka-console-consumer.sh --topic my-topic --bootstrap-server localhost:9092 --from-beginning
--from-beginning接收消息: 在消费者终端中,你将看到生产者发送的消息。
go get github.com/Shopify/saramaimport "github.com/Shopify/sarama"package main
import (
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
// 设置Kafka broker地址,根据你的实际环境修改
brokers := []string{"localhost:9092"}
producer, err := sarama.NewSyncProducer(brokers, config)
if err != nil {
log.Fatal("Error creating producer: ", err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatal("Error closing producer: ", err)
}
}()
// 生产消息
message := &sarama.ProducerMessage{
Topic: "my-topic",
Value: sarama.StringEncoder("Hello, Kafka!"),
}
partition, offset, err := producer.SendMessage(message)
if err != nil {
log.Fatal("Error sending message: ", err)
}
log.Printf("Produced message to partition %d at offset %d\n", partition, offset)
}
package main
import (
"log"
"os"
"os/signal"
"github.com/Shopify/sarama"
)
func main() {
config := sarama.NewConfig()
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
if err != nil {
log.Fatal("Error creating consumer: ", err)
}
defer func() {
if err := consumer.Close(); err != nil {
log.Fatal("Error closing consumer: ", err)
}
}()
// 订阅主题
topic := "my-topic"
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
log.Fatal("Error creating partition consumer: ", err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatal("Error closing partition consumer: ", err)
}
}()
// 处理消息
// 创建一个通道 signals 用于接收操作系统发送的信号
signals := make(chan os.Signal, 1)
// 通过 signal.Notify 将 os.Interrupt 信号(通常由用户按下 Ctrl+C 触发)注册到这个通道。
signal.Notify(signals, os.Interrupt)
ConsumerLoop:
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message: %s\n", string(msg.Value))
case <-signals:
break ConsumerLoop
}
}
}