• Window 安装 Kafka ,使用GO开发操作


    Window 安装 Kafka ,使用GO开发操作

    Kafka是使用scala开发,支持多语言客户端(java、go等)
    最开始是由LinkedIn开发,之后成为Apache的顶级项目。
    是一个分布式、分区化、可扩展、高可靠、可复制提交的日志服务。
    而且scale out :无需停机即可扩展机器
    持久化是通过将数据持久化到磁盘以及replication防止数据丢失。

    Kafka的特点:
    是分布式,其所有的构件borker(服务器集群)、producer(消息生产)、consumer(消息消费者)都可以是分布式的。
    在消息的生产者可以用一个标识topic来区别,可进行分区,每一个分区都是一个顺序、不可变的消息队列,并且可以持续的添加。

    常用的场景:监控、消息队列、站点的用户活动追踪、流处理、日志聚合、持久化日志

    Kafka中包含以下基础概念:

    1. Topic(话题):用于区分不同类型信息的类别名称。有producer指定。
    2. producer(生产者):讲消息发布到Kafka特定的Topic的对象(过程),消息的入口
    3. Consumers(消费者):订阅并处理特定的Topic中的消息的对象(过程),消息的出口
    4. Broker(服务集群):已发布的消息保存在一组服务器中,集群中的每一个服务都是一个代理(broker),消费者可以订阅一个或多个话题,并从broker拉数据,从而消费这些已发布的消息。
    5. Partition(分区):topic物理上的分组,一个topic可以分为多个partition,每一个partition是一个有序的队列,每条消息都会被分配一个有序的id(offset)
    6. message:消息 是通信的基本单位,买个producer可以向一个topic(主题)发布一些消息。

    选择partition的原则:

    如果topic有多个partition,pruducer 要怎么知道把该数据发往哪一个partition呢

    1. partition再写入的时候可以指定需要写入的partition,如有指定就写入对应的partition
    2. 如没指定partition,但是设置了数据的key会根据key的值hash出一个partition、
    3. 如没指定partition又没设置key,则会采用轮询方式,即每次去一小段时间的数据写入某个partition,下一小段的时间写入下一个partition。

    安装Kafka
    kafka环境基于zookeeper,zookeeper环境基于JAVA-JDK。需要安装Java-JDK

    请自行安装:配置Java环境变量

    下载

    下载地址:http://kafka.apache.org/downloads 下载kafka_2.12-2.3.1.tgz。

    安装

    将下载好的压缩包解压到本地即可。
    在这里插入图片描述

    配置

        1.打开config目录下的server.properties文件
        2.修改log.dirs=E:\\kafkalogs
        3.打开config目录下的zookeeper.properties文件
        4.修改dataDir=E:\\kafka\\zookeeper
    
    • 1
    • 2
    • 3
    • 4
    启动

    需要以管理员的身份启动。
    在Kafka目录(F:\kafka\kafka_2.12-3.2.1)

    先执行:bin\windows\zookeeper-server-start.bat config\zookeeper.properties
    
    再执行:bin\windows\kafka-server-start.bat config\server.properties
    
    • 1
    • 2
    • 3

    在这里插入图片描述

    两个服务都成功则不关闭命令页面的

    创建生产者产生消息
    .\bin\windows\kafka-console-producer.bat –broker-list localhost:9092 –topic test
    
    • 1
    创建消费者接收消息,不关闭页面
    .\bin\windows\kafka-console-consumer.bat –bootstrap-server localhost:9092 –topic test –from-beginning
    
    • 1

    在这里插入图片描述

    go 操作Kafka

    Go语言中连接kafka使用第三方库: github.com/Shopify/sarama。

    下载及安装

        go get github.com/Shopify/sarama
    
    • 1

    连接Kafka进行生产者和消费者,并实现发送短信或指定的内容(producer.go的sarama.StringEncoder(“指定内容”))

    consumer.go

    package main
    
    import (
    	"fmt"
    	"github.com/Shopify/sarama"
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    //连接kafka消费消息
    func main() {
    	consumer, err := sarama.NewConsumer([]string{"127.0.0.1:9092"}, nil)
    	if err != nil {
    		fmt.Printf("fail to start consumer, err:%v\n", err)
    		return
    	}
    	// 根据topic获取所有的分区列表
    	partitionList, err := consumer.Partitions("web_log") // 根据topic取到所有的分区
    	if err != nil {
    		fmt.Printf("根据topic获取所有的分区列表失败:err%v\n", err)
    		return
    	}
    
    	fmt.Println(partitionList)
    
    	for partition := range partitionList { // 遍历所有的分区
    		// 针对每个分区创建一个对应的分区消费者
    		pc, err := consumer.ConsumePartition("web_log", int32(partition), sarama.OffsetNewest)
    		if err != nil {
    			fmt.Printf("消费者消费指定分区失败:partition(分区) %d,err:%v\n", partition, err)
    			return
    		}
    		defer pc.AsyncClose()
    		wg.Add(1)
    		// 异步从每个分区消费信息
    		go func(sarama.PartitionConsumer) {
    			for msg := range pc.Messages() {
    				fmt.Printf("Partition:%d Offset:%d Key:%v Value:%s", msg.Partition, msg.Offset, msg.Key, msg.Value)
    			}
    		}(pc)
    	}
    	wg.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    producer.go

    //连接kafka发送消息
    // 基于sarama第三方库开发的kafka producer
    
    func main() {
    	config := sarama.NewConfig()
    	config.Producer.RequiredAcks = sarama.WaitForAll          // 发送完数据需要leader和follow都确认
    	config.Producer.Partitioner = sarama.NewRandomPartitioner // 新选出一个partition
    	config.Producer.Return.Successes = true                   // 成功交付的消息将在success channel返回
    
    	// 构造一个消息
    	msg := &sarama.ProducerMessage{}
    	msg.Topic = "web_log"
    	msg.Value = sarama.StringEncoder("【柠檬地拥抱】您正在注册账号,验证码是:666666,五分钟内有效,请及时使用哦。")
    	// 连接kafka
    	client, err := sarama.NewSyncProducer([]string{"127.0.0.1:9092"}, config)
    	if err != nil {
    		fmt.Println("创建生产者失败, err:", err)
    		return
    	}
    	defer client.Close()
    	// 发送消息
    	pid, offset, err := client.SendMessage(msg)
    	if err != nil {
    		fmt.Println("send msg failed, err:", err)
    		return
    	}
    	fmt.Printf("pid:%v offset:%v\n", pid, offset)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • 相关阅读:
    Kubernetes2
    WPF入门教程系列二十四——DataGrid使用示例(1)
    java旅游信息分享网站计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
    如何高效管理多个 Outlook 邮箱?
    【毕业设计】基于Stm32的智能疫情防控门禁系统 - 单片机 嵌入式 物联网
    Godot 4 教程《勇者传说》依赖注入 学习笔记(0):环境配置
    线程应用实例
    windows mysql5.7 开启binlog日志
    open-webui与ollama的部署最后完整之命令
    MFC Windows 程序设计[140]之多样消息对话框属性页(附源码)
  • 原文地址:https://blog.csdn.net/qq_18108159/article/details/126913322