• 在Go项目中二次封装Kafka客户端功能


    1.摘要

    在上一章节中,我利用Docker快速搭建了一个Kafka服务,并测试成功Kafka生产者和消费者功能,本章内容尝试在Go项目中对Kafka服务进行封装调用, 实现从Kafka自动接收消息并消费。

    在本文中使用了Kafka的一个高性能开源库Sarama, Sarama是一个遵循MIT许可协议的Apache Kafka Go客户端库, 该开源库地址为:GitHub - IBM/sarama: Sarama is a Go library for Apache Kafka.

    2.功能结构组织

    为了能在项目中快速使用, 我在项目目录中专门新建了一个名为kafka的文件夹,在该文件夹下新建了四个文件,分别为:

    1. kafka (目录)
    2. |
    3. ----- consumer.go (消费者方法实现)
    4. |
    5. ----- producer.go (生产者方法实现)
    6. |
    7. ----- kafka.go (定义接口)
    8. |
    9. ----- kafka_test.go (单元功能测试)

    为方便项目使用,在此基础上做了二次封装。

    3.消费者实现

    第一步首先定义了一个结构体, 里面包含了Kafka的主机、topic、接收通道和消费者对象信息:

    1. type KafkaConsumer struct {
    2. Hosts string // Kafka主机IP:端口,例如:192.168.201.206:9092
    3. Ctopic string // topic名称
    4. Kchan chan string // 接收信息通道
    5. Consumer sarama.Consumer // 消费者对象
    6. }

    接下来是消费者初始化函数:

    1. func (k *KafkaConsumer) kafkaInit() {
    2. // 定义配置选项
    3. config := sarama.NewConfig()
    4. config.Consumer.Return.Errors = true
    5. config.Version = sarama.V0_10_2_0
    6. // 初始化一个消费对象
    7. consumer, err := sarama.NewConsumer(k.Hosts, config)
    8. if err != nil {
    9. err = errors.New("NewConsumer错误,原因:" + err.Error())
    10. fmt.Println(err.Error())
    11. return
    12. }
    13. // 获取所有Topic
    14. topics, err := consumer.Topics()
    15. if err != nil {
    16. fmt.Println(err.Error())
    17. return
    18. }
    19. // 判断是否有自定义的Topic
    20. var topicsName = ""
    21. for _, e := range topics {
    22. if e == k.Ctopic {
    23. topicsName = e
    24. break
    25. }
    26. }
    27. // 没有自定义的Topic则报错
    28. if topicsName == "" {
    29. err = errors.New("找不到topics内容")
    30. fmt.Println(err.Error())
    31. return
    32. }
    33. // 将消费对象保存到结构体以备后面使用
    34. k.Consumer = consumer
    35. }

    在上面的初始化函数中, 首先初始化一个消费对象, 然后获取所有的Topic名称,并判断了在这些Topic名称中是否有我自定义的名称,获取成功后则将消费对象保存到我们绑定的结构体中。

    接下来是消费监控函数实现,代码如下:

    1. func (k *KafkaConsumer) kafkaProcess() {
    2. var wg sync.WaitGroup
    3. // 遍历指定Topic分区持续监控消息
    4. Partitions, _ := k.Consumer.Partitions(k.Ctopic)
    5. for _, subPartitions := range Partitions {
    6. pc, err := k.Consumer.ConsumePartition(k.Ctopic, subPartitions, sarama.OffsetNewest)
    7. if err != nil {
    8. continue
    9. }
    10. wg.Add(1)
    11. go func() {
    12. defer wg.Done()
    13. // 这里进入另一个函数可以过滤消息内容
    14. k.processPartition(pc)
    15. }()
    16. }
    17. wg.Wait()
    18. }

    函数processPartition()的实现代码如下:

    1. func (k *KafkaConsumer) processPartition(pc sarama.PartitionConsumer) {
    2. defer pc.AsyncClose()
    3. for msg := range pc.Messages() {
    4. // 这里可以过滤不需要的Topic的信息
    5. if strings.Contains(string(msg.Value), "group_state2") {
    6. continue
    7. }
    8. // 这里将获取到的Topic信息发送到通道
    9. k.Kchan <- string(msg.Value)
    10. }
    11. }

    4.生产者实现

    为了跟消费者代码配套,这里也同步实现了生产者代码,主要功能是完成工作后,给指定Topic的生产方返回一个指定消息。

    定义生产者的结构体如下:

    1. type KafkaProducer struct {
    2. hosts string // Kafka主机
    3. sendmsg string // 消费方返回给生产方的消息
    4. ptopic string // Topic
    5. AsyncProducer sarama.AsyncProducer // Kafka生产者接口对象
    6. }

    对应的生产者初始化函数实现如下:

    1. func (k *KafkaProducer) kafkaInit() {
    2. // 定义配置参数
    3. config := sarama.NewConfig()
    4. config.Producer.RequiredAcks = sarama.WaitForAll
    5. config.Producer.Retry.Max = 5
    6. config.Producer.Return.Successes = true
    7. config.Version = sarama.V0_10_2_0
    8. // 初始化一个生产者对象
    9. producer, err := sarama.NewAsyncProducer(k.hosts, config)
    10. if err != nil {
    11. err = errors.New("NewAsyncProducer错误,原因:" + err.Error())
    12. fmt.Println(err.Error())
    13. return
    14. }
    15. // 保存对象到结构体
    16. k.AsyncProducer = producer
    17. }

    给生产者回复信息的函数实现如下:

    1. func (k *KafkaProducer) kafkaProcess() {
    2. msg := &sarama.ProducerMessage{
    3. Topic: k.ptopic,
    4. }
    5. // 信息编码
    6. msg.Value = sarama.ByteEncoder(k.sendmsg)
    7. // 将信息发送给通道
    8. k.AsyncProducer.Input() <- msg
    9. }

    5.接口定义实现

    首先对于生产者和消费者,都有对应的初始化和执行操作,因此定义接口函数如下:

    1. // Kafka方法接口
    2. type IKafkaMethod interface {
    3. kafkaInit() // 初始化方法
    4. kafkaProcess() // 执行方法
    5. }

    为了方便管理接口的赋值操作, 这里定义了一个接口管理方法, 并用Set()函数进行接口类型赋值, Run()函数负责运行对应的成员函数:

    1. // 接口管理结构体
    2. type KafkaManager struct {
    3. kafkaMethod IKafkaMethod // 接口对象
    4. }
    5. // 定义实现Set方法
    6. func (km *KafkaManager) Set(m IKafkaMethod) {
    7. km.kafkaMethod = m // 将指定的方法赋给接口
    8. }
    9. // 定义实现Run方法
    10. func (km *KafkaManager) Run() {
    11. km.kafkaMethod.kafkaInit()
    12. go km.kafkaMethod.kafkaProcess()
    13. }

    最后一部分是供外部调用的函数,首先定义一个结构体,该结构体中保存了Kafka的基础信息和三个对象指针:

    1. type KafkaMessager struct {
    2. KafkaManager *KafkaManager // 接口管理对象指针
    3. KafkaProducer *KafkaProducer // 生产者对象指针
    4. KafkaConsumer *KafkaConsumer // 消费者对象指针
    5. Hosts string // Kafka主机
    6. topic string // topic
    7. }
    8. // 供外部调用初始化的函数,传入Kafka主机IP和Topic,返回操作对象指针,并初始化结构体成员变量
    9. func NewKafkaMessager(hosts, topic string) *KafkaMessager {
    10. km := &KafkaMessager{
    11. KafkaManager: new(KafkaManager),
    12. KafkaProducer: new(KafkaProducer),
    13. KafkaConsumer: new(KafkaConsumer),
    14. Hosts: hosts,
    15. topic: topic,
    16. }
    17. return km
    18. }

    6.功能调用和验证

    在Kafka_test.go文件中,定义一个用于单元测试的函数,格式如下:

    1. func TestKafka(t *testing.T) {
    2. ....
    3. }

    使用单元测试函数的好处是可以单独调试, 专注核心功能本身。

    我使用的编辑器是Goland, 在TestKafka函数前面有个三角形小图标,点击可以选择各种调试选项,如图:

    下面是我模拟用户调用的客户端代码片段:

    1. // 这里选择我自己搭建的Kafka所在服务器,Topic为test123
    2. // 注意:这里的hosts格式是IP:端口的格式,例如:192.168.201.206:9092
    3. hosts := "192.168.201.206:9092"
    4. topic := "test123"
    5. // 调用初始化函数,并将上面的内容作为参数传进去
    6. nkm := NewKafkaMessager(hosts, topic)
    7. // 初始化消费者,当生产者发出消息,消费者自动消费
    8. nkm.KafkaConsumer.Hosts = hosts // 消费者host赋值
    9. nkm.KafkaConsumer.Ctopic = topic // 消费者topic赋值
    10. nkm.KafkaConsumer.Kchan = make(chan string) // 初始化消息通道
    11. nkm.KafkaManager.Set(nkm.KafkaConsumer) // 接口赋值,设置成操作消费者方法
    12. nkm.KafkaManager.Run() // 执行消费者初始化方法
    13. // 监听通道,接收生产客户端发过来的消息
    14. recv := <- nkm.KafkaConsumer.Kchan
    15. fmt.Println(recv) // 打印接收到的消息

    现在我们可以选择直接运行程序了,然后在Kafka的生产者控制台中输入字符:Hello,Goland发送:

    可以看到,我们的程序成功接收到Kafka生产者发送过来的信息。

    --- END --

  • 相关阅读:
    浅谈Rust--学习心得及rust的优势与劣势
    Python进阶学习----一闭三器
    export和source
    2023年2月份CKA考试历程
    java计算机毕业设计小微企业人事管理系统源代码+数据库+系统+lw文档
    C# OpenCvSharp 颜色反转
    Windows 安装 Python 环境&PyCharm
    分享119个ASP.NET源码总有一个是你想要的
    烧写系统镜像
    Apache虚拟主机企业应用
  • 原文地址:https://blog.csdn.net/suntiger/article/details/134041692