• go-zero整合Kafka实现消息生产和消费


    go-zero整合Kafka实现消息生产和消费

    本教程基于go-zero微服务入门教程,项目工程结构同上一个教程。
    go-zero微服务入门教程(点击进入)

    本教程主要实现go-zero框架整合单机版Kafka,并暴露接口实现Kafka消息的生产和消费。
    本文源码:https://gitee.com/songfayuan/go-zero-demo (教程源码分支:3.zero整合单机kafka)

    准备工作

    • 如不熟悉go-zero项目的,请先查看上一篇go-zero微服务入门教程
    • 请自行安装好单机版Kafka,建议采用docker安装。

    common工具

    在common目录下创建task/kafkaconf新目录,在kafkaconf目录下创建conf.go文件,内容如下:

    package kafkaconf
    
    type Conf struct {
    	Host        string
    	Brokers     []string
    	Group       string
    	Topic       string
    	Offset      string `json:",options=first|last,default=last"`
    	OffsetId    int64  `json:",default=-1"` //-1时表示不使用该配置
    	Consumers   int    `json:",default=8"`
    	Processors  int    `json:",default=8"`
    	MinBytes    int    `json:",default=10240"`    // 10K
    	MaxBytes    int    `json:",default=10485760"` // 10M
    	Username    string `json:",optional"`
    	Password    string `json:",optional"`
    	ForceCommit bool   `json:",default=true"`
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    rpc新增Kafka配置

    以下操作在rpc模块执行。

    sys.yaml

    sys.yaml配置文件新增Kafka配置信息,如下:

    # Kafka配置
    KafkaConf:
      Host: 192.168.2.204:9092
      Brokers:
        - 192.168.2.204:9092
      Group: "consumer-group-id"
      Topic: kafka-test-topic3
      Consumers: 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    config.go

    config.go文件中新增KafkaConf配置信息,如下:

    KafkaConf kafkaconf.Conf
    
    • 1

    servicecontext.go

    servicecontext.go文件新增Kafka配置信息,完整代码如下:

    package svc
    
    import (
    	"github.com/zeromicro/go-zero/core/stores/redis"
    	"github.com/zeromicro/go-zero/core/stores/sqlx"
    	"go-zero-demo/common/task/kafkaconf"
    	"go-zero-demo/rpc/model/sysmodel"
    	"go-zero-demo/rpc/sys/internal/config"
    )
    
    type ServiceContext struct {
    	Config      config.Config
    	RedisClient *redis.Redis
    
    	KafkaConf *kafkaconf.Conf
    
    	UserModel sysmodel.SysUserModel
    }
    
    func NewServiceContext(c config.Config) *ServiceContext {
    	sqlConn := sqlx.NewMysql(c.Mysql.Datasource)
    
    	conf := redis.RedisConf{
    		Host: c.RedisConf.Host,
    		Type: c.RedisConf.Type,
    		Pass: c.RedisConf.Pass,
    		Tls:  c.RedisConf.Tls,
    	}
    
    	return &ServiceContext{
    		Config:      c,
    		RedisClient: redis.MustNewRedis(conf),
    
    		KafkaConf: &c.KafkaConf,
    
    		UserModel: sysmodel.NewSysUserModel(sqlConn),
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    编写rpc服务

    修改sys.proto文件

    新增Kafka操作请求的配置,如下:

    message KafkaReq{
      string name = 1;
      string nickName = 2;
      string password = 3;
      string email = 4;
    }
    
    message  KafkaResp{
      string name = 1;
      string nickName = 2;
      string password = 3;
      string email = 4;
    }
    
    message Empty {
    }
    
    service Sys{
      //Kafka生产者演示请求
      rpc KafkaProducer(KafkaReq)returns(KafkaResp);
      //Kafka消费者演示请求
      rpc KafkaConsumer(Empty)returns(KafkaResp);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    用goctl生成rpc代码

    生成方法同上篇文章,自行查看。

    编写API Gateway代码

    编写api文件

    kafka.api

    在api目录下创建新目录doc/kafka,在kafka目录下创建kafka.api文件。

    syntax = "v1"
    
    info(
    	title: "Kafka生产消费案例演示"
    	desc: "Kafka生产消费案例演示"
    	author: "songfayuan"
    )
    
    type (
    	ApiKafkaReq {
    		Name     string `json:"name"`
    		NickName string `json:"nickName"`
    		Password string `json:"password"`
    		Email    string `json:"email"`
    	}
    
    	ApiKafkaResp {
    		Code    int64       `json:"code"`
    		Message string      `json:"message"`
    		Data    ApiKafkaReq `json:"data"`
    	}
    )
    
    @server (
    	group : kafka/test
    	prefix : /kafka/test
    )
    
    service admin-api{
    	@doc(
    		summary : "Kafka生产者演示"
    	)
    	@handler KafkaProducer
    	post /kafkaProducer(ApiKafkaReq)returns(ApiKafkaResp)
    
    	@doc (
    		summary :"Kafka消费者演示"
    	)
    	@handler KafkaConsumer
    	get /kafkaConsumer returns(ApiKafkaResp)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    admin.api

    在api/doc/admin.api文件添加配置信息。

    import "kafka/kafka.api"
    
    • 1

    用goctl生成API Gateway代码

    生成方法同上篇文章,自行查看。但是此处要基于admin.api文件去生成代码,如果基于kafka.api生成,则生成的代码只有kafka.api定义的接口代码,其他api文件定义的接口代码不被生成。

    修改API Gateway代码调用rpc服务

    kafkaproducerlogic.go

    修改api/internal/logic/kafka/test/kafkaproducerlogic.go里的KafkaProducer方法,如下:

    func (l *KafkaProducerLogic) KafkaProducer(req *types.ApiKafkaReq) (resp *types.ApiKafkaResp, err error) {
    	producer, err := l.svcCtx.Sys.KafkaProducer(l.ctx, &sysclient.KafkaReq{
    		Name:     req.Name,
    		NickName: req.NickName,
    		Password: req.Password,
    		Email:    req.Email,
    	})
    
    	if err != nil {
    		resJson, _ := json.Marshal(producer)
    		logx.WithContext(l.ctx).Errorf("Kafka生产者演示:操作失败,请求参数param = %s,异常信息errMsg = %s", resJson, err.Error())
    		return nil, rpcerror.New(err)
    	}
    
    	return &types.ApiKafkaResp{
    		Code:    200,
    		Message: "操作成功",
    		Data: types.ApiKafkaReq{
    			Name:     producer.Name,
    			NickName: producer.NickName,
    			Password: producer.Password,
    			Email:    producer.Email,
    		},
    	}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    kafkaconsumerlogic.go

    修改api/internal/logic/kafka/test/kafkaconsumerlogic.go里的KafkaConsumer方法,如下:

    func (l *KafkaConsumerLogic) KafkaConsumer() (resp *types.ApiKafkaResp, err error) {
    	consumer, err := l.svcCtx.Sys.KafkaConsumer(l.ctx, &sysclient.Empty{})
    
    	if err != nil {
    		resJson, _ := json.Marshal(consumer)
    		logx.WithContext(l.ctx).Errorf("Kafka消费者演示:操作失败,请求参数param = %s,异常信息errMsg = %s", resJson, err.Error())
    		return nil, rpcerror.New(err)
    	}
    
    	return &types.ApiKafkaResp{
    		Code:    200,
    		Message: "操作成功",
    		Data:    types.ApiKafkaReq{},
    	}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    修改rpc代码完成消息生产和消费

    kafkaproducerlogic.go

    • 修改rpc/sys/internal/logic/kafkaproducerlogic.go,如下内容:
    // Kafka生产者演示请求
    func (l *KafkaProducerLogic) KafkaProducer(in *sysclient.KafkaReq) (*sysclient.KafkaResp, error) {
    	if in.Name == "" {
    		return nil, errors.New("账号不能为空")
    	}
    	if in.NickName == "" {
    		return nil, errors.New("姓名不能为空")
    	}
    	if in.Email == "" {
    		return nil, errors.New("邮箱不能为空")
    	}
    
    	// 创建一个writer,向topic发送消息
    	w := &kafka.Writer{
    		Addr:         kafka.TCP(l.svcCtx.Config.KafkaConf.Host),
    		Topic:        l.svcCtx.Config.KafkaConf.Topic,
    		Balancer:     &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
    		RequiredAcks: kafka.RequireAll,    // ack模式
    		Async:        true,
    	}
    
    	// 定义消息内容
    	messages := []string{in.Name, in.NickName, in.Password, in.Email}
    
    	// 循环发送消息
    	for i, msg := range messages {
    		err := w.WriteMessages(context.Background(),
    			kafka.Message{
    				Key:   []byte(fmt.Sprintf("Key-%d", i+1)), // 使用不同的分区键
    				Value: []byte(msg),
    			},
    		)
    		if err != nil {
    			log.Fatalf("Kafka生产者演示请求,向kafka写入数据失败: %v", err)
    		}
    	}
    
    	if err := w.Close(); err != nil {
    		log.Fatal("Kafka生产者演示请求,failed to close writer:", err)
    	}
    
    	return &sysclient.KafkaResp{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    kafkaconsumerlogic.go

    • 修改rpc/sys/internal/logic/kafkaconsumerlogic.go,如下内容:
    // Kafka消费者演示请求
    // 这里演示手动请求触发kafka消费,实际项目中要做成项目启动后就一直监听消费。
    func (l *KafkaConsumerLogic) KafkaConsumer(in *sysclient.Empty) (*sysclient.KafkaResp, error) {
    	// 创建一个reader,指定GroupID,消费消息
    	reader := kafka.NewReader(kafka.ReaderConfig{
    		Brokers:  []string{l.svcCtx.KafkaConf.Host},
    		GroupID:  l.svcCtx.KafkaConf.Group, //指定消费者组ID
    		Topic:    l.svcCtx.KafkaConf.Topic,
    		MaxBytes: 10e6, //10MB
    	})
    
    	//接收消息
    	for {
    		//ReadMessage会自动提交偏移量
    		message, err := reader.ReadMessage(context.Background())
    		if err != nil {
    			break
    		}
    		fmt.Printf("Kafka消费者演示请求:message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
    	}
    
    	//程序退出前关闭Reader
    	if err := reader.Close(); err != nil {
    		log.Fatal("Kafka消费者演示请求:failed to close reader:", err)
    	}
    
    	return &sysclient.KafkaResp{}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    完整调用演示

    最后,在根目录go-zero-demo执行下命令。

    go mod tidy
    
    • 1

    运行rpc服务

    运行方法同上篇文章,自行查看。

    运行api

    运行方法同上篇文章,自行查看。

    api调用

    以下调用采用curl进行,你也可以用postman调用。

    消息生产
     songfayuan@MacBook-Pro  ~  curl -X POST -H "Content-Type: application/json" -d '{"name":"songfayuan","nickName":"宋发元6666","email":"1414@qq.com","password":"123456"}' localhost:8888/kafka/test/kafkaProducer
     
    {"code":200,"message":"操作成功","data":{"name":"","nickName":"","password":"","email":""}}%
    
    • 1
    • 2
    • 3

    此时,查看Kafka相关Topic,即可看到成功生产的数据。

    消息消费
     songfayuan@MacBook-Pro  ~  curl "localhost:8888/kafka/test/kafkaConsumer"
    
    • 1

    此时,即可看到运行日志打印出消费成功的信息。

    附录

    Kafka消息生产和多消费者消费同一个Topic测试案例。

    kafka_demo.go

    package main
    
    import (
    	"context"
    	"errors"
    	"fmt"
    	"github.com/segmentio/kafka-go"
    	"log"
    	"time"
    )
    
    // 演示kafka读写
    func main() {
    	//writeByConn()
    	writeByWriter3()
    	//writeByWriter2()
    	//readByConn()
    	//readByReader()
    	//readByReaderGroup()
    }
    
    // writeByConn基于Conn发送消息
    func writeByConn() {
    	topic := "kafka-test-topic3"
    	partition := 0
    
    	//连接至kafka集群的Leader节点
    	conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.2.204:9092", topic, partition)
    	if err != nil {
    		log.Fatal("failed to dial leader:", err)
    	}
    
    	//设置发送消息的超时时间
    	conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
    
    	//发送消息
    	_, err = conn.WriteMessages(
    		kafka.Message{Value: []byte("one!")},
    		kafka.Message{Value: []byte("two!")},
    		kafka.Message{Value: []byte("three!")},
    	)
    	if err != nil {
    		log.Fatal("failed to write messages:", err)
    	}
    
    	//关闭连接
    	if err := conn.Close(); err != nil {
    		log.Fatal("failed to close writer:", err)
    	}
    }
    
    func writeByWriter() {
    	//创建一个writer,向topic发送消息
    	w := &kafka.Writer{
    		Addr:         kafka.TCP("192.168.2.204:9092"),
    		Topic:        "kafka-test-topic",
    		Balancer:     &kafka.LeastBytes{}, //指定分区的balancer模式为最小字节分布
    		RequiredAcks: kafka.RequireAll,    //ack模式
    		Async:        true,
    	}
    
    	err := w.WriteMessages(context.Background(),
    		kafka.Message{
    			Key:   []byte("Key-A"),
    			Value: []byte("Hello World!"),
    		},
    		kafka.Message{
    			Key:   []byte("Key-B"),
    			Value: []byte("One!"),
    		},
    		kafka.Message{
    			Key:   []byte("Key-C"),
    			Value: []byte("Two!"),
    		},
    	)
    	if err != nil {
    		log.Fatal("failed to write messages:", err)
    	}
    	if err := w.Close(); err != nil {
    		log.Fatal("failed to close writer:", err)
    	}
    }
    
    func writeByWriter3() {
    	// 创建一个writer,向topic发送消息
    	w := &kafka.Writer{
    		Addr:         kafka.TCP("192.168.2.204:9092"),
    		Topic:        "kafka-test-topic3",
    		Balancer:     &kafka.LeastBytes{}, // 指定分区的balancer模式为最小字节分布
    		RequiredAcks: kafka.RequireAll,    // ack模式
    		Async:        true,
    	}
    
    	// 定义消息内容
    	messages := []string{"Hello World!", "One!", "Two!", "song", "fa", "yuan"}
    
    	// 循环发送消息
    	for i, msg := range messages {
    		err := w.WriteMessages(context.Background(),
    			kafka.Message{
    				Key:   []byte(fmt.Sprintf("Key-%d", i+1)), // 使用不同的分区键
    				Value: []byte(msg),
    			},
    		)
    		if err != nil {
    			log.Fatalf("failed to write message: %v", err)
    		}
    	}
    
    	if err := w.Close(); err != nil {
    		log.Fatal("failed to close writer:", err)
    	}
    }
    
    // 创建不存在的topic
    // 如果给Writer配置了AllowAutoTopicCreation:true,那么当发送消息至某个不存在的topic时,则会自动创建topic。
    func writeByWriter2() {
    	writer := kafka.Writer{
    		Addr:                   kafka.TCP("192.168.2.204:9092"),
    		Topic:                  "kafka-test-topic",
    		AllowAutoTopicCreation: true, //自动创建topic
    	}
    
    	messages := []kafka.Message{
    		{
    			Key:   []byte("Key-A"),
    			Value: []byte("Hello World!"),
    		},
    		{
    			Key:   []byte("Key-B"),
    			Value: []byte("One!"),
    		},
    		{
    			Key:   []byte("Key-C"),
    			Value: []byte("Tow!"),
    		},
    	}
    
    	const retries = 3
    	//重试3次
    	for i := 0; i < retries; i++ {
    		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    		defer cancel()
    
    		err := writer.WriteMessages(ctx, messages...)
    		if errors.Is(err, kafka.LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) {
    			time.Sleep(time.Millisecond * 250)
    			continue
    		}
    
    		if err != nil {
    			log.Fatal("unexpected error %v", err)
    		}
    		break
    	}
    
    	//关闭Writer
    	if err := writer.Close(); err != nil {
    		log.Fatal("failed to close writer:", err)
    	}
    }
    
    // readByConn连接到kafka后接收消息
    func readByConn() {
    	//指定要连接的topic和partition
    	topic := "kafka-test-topic"
    	partition := 0
    
    	//连接至kafka的Leader节点
    	conn, err := kafka.DialLeader(context.Background(), "tcp", "192.168.2.204:9092", topic, partition)
    	if err != nil {
    		log.Fatal("failed to dial leader:", err)
    	}
    
    	//设置读取超时时间
    	conn.SetReadDeadline(time.Now().Add(10 * time.Second))
    	//读取一批信息,得到的batch是一系列消息的迭代器
    	batch := conn.ReadBatch(10e3, 1e6) //fetch 10KB min, 1MB max
    
    	//遍历读取消息
    	//b := make([]byte, 10e3) //10KB max per message
    	fmt.Println("******遍历读取消息******")
    	for {
    		//使用batch.Read更高效一些,但是需要根据消息长度选择合适的buffer,如果传入的buffer太小(消息装不下),就会返回io.ErrShortBuffer
    		//n, err := batch.Read(b)
    		//如果不考虑内存分配的效率问题,可以使用batch.ReadMessage读取消息
    		mag, err := batch.ReadMessage()
    		if err != nil {
    			break
    		}
    		//fmt.Println(string(b[:n]))
    		fmt.Println(string(mag.Value))
    	}
    
    	//关闭batch
    	if err := batch.Close(); err != nil {
    		log.Fatal("failed to close batch:", err)
    	}
    
    	//关闭连接
    	if err := conn.Close(); err != nil {
    		log.Fatal("failed to close connection:", err)
    	}
    }
    
    // readByReader通过Reader接收消息
    func readByReader() {
    	//创建Reader
    	reader := kafka.NewReader(kafka.ReaderConfig{
    		Brokers:   []string{"192.168.2.204:9092"},
    		Topic:     "kafka-test-topic",
    		Partition: 0,
    		MaxBytes:  10e6, //10MB
    	})
    	//设置Offset
    	reader.SetOffset(1)
    
    	//接收消息
    	for {
    		message, err := reader.ReadMessage(context.Background())
    		if err != nil {
    			break
    		}
    		fmt.Printf("message at offset %d: %s = %s\n", message.Offset, string(message.Key), string(message.Value))
    	}
    
    	if err := reader.Close(); err != nil {
    		log.Fatal("failed to close reader:", err)
    	}
    }
    
    // 消费者组
    func readByReaderGroup() {
    	// 创建一个reader,指定GroupID,消费消息
    	reader := kafka.NewReader(kafka.ReaderConfig{
    		Brokers:  []string{"192.168.2.204:9092"},
    		GroupID:  "consumer-group-id", //指定消费者组ID
    		Topic:    "kafka-test-topic",
    		MaxBytes: 10e6, //10MB
    	})
    
    	//接收消息
    	for {
    		//ReadMessage会自动提交偏移量
    		message, err := reader.ReadMessage(context.Background())
    		if err != nil {
    			break
    		}
    		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
    	}
    
    	//程序退出前关闭Reader
    	if err := reader.Close(); err != nil {
    		log.Fatal("failed to close reader:", err)
    	}
    }
    
    // 消费者组,手动提交
    func readByReaderGroup2() {
    	// 创建一个reader,指定GroupID,消费消息
    	reader := kafka.NewReader(kafka.ReaderConfig{
    		Brokers:  []string{"192.168.2.204:9092"},
    		GroupID:  "consumer-group-id", //指定消费者组ID
    		Topic:    "kafka-test-topic",
    		MaxBytes: 10e6, //10MB
    	})
    
    	//接收消息
    	ctx := context.Background()
    	for {
    		//获取消息
    		message, err := reader.FetchMessage(ctx)
    		if err != nil {
    			break
    		}
    		//处理消息
    		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
    
    		//显示提交
    		if err := reader.CommitMessages(ctx, message); err != nil {
    			log.Fatal("failed to commit messages:", err)
    		}
    	}
    
    	//程序退出前关闭Reader
    	if err := reader.Close(); err != nil {
    		log.Fatal("failed to close reader:", err)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289

    kafka_consumer_demo.go

    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/segmentio/kafka-go"
    	"log"
    	"sync"
    	"time"
    )
    
    // 多个消费者同时消费同一Topic的数据
    // 一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费
    func main() {
    	// 创建消费者组ID
    	consumerGroupID := "consumer-group-id16"
    
    	// 创建两个消费者
    	consumer1 := createConsumer(consumerGroupID, "Consumer1")
    	consumer2 := createConsumer(consumerGroupID, "Consumer2")
    	consumer3 := createConsumer(consumerGroupID, "Consumer3")
    	consumer4 := createConsumer(consumerGroupID, "consumer4")
    	consumer5 := createConsumer(consumerGroupID, "consumer5")
    
    	// 启动消费者
    	var wg sync.WaitGroup
    	wg.Add(4)
    	go consumeMessages(consumer1, &wg, "Consumer1")
    	go consumeMessages(consumer2, &wg, "Consumer2")
    	go consumeMessages(consumer3, &wg, "consumer3")
    	go consumeMessages(consumer4, &wg, "consumer4")
    	go consumeMessages(consumer5, &wg, "consumer5")
    
    	wg.Wait()
    }
    
    func createConsumer(groupID, consumerName string) *kafka.Reader {
    	return kafka.NewReader(kafka.ReaderConfig{
    		Brokers: []string{"192.168.2.204:9092"},
    		GroupID: groupID,
    		Topic:   "kafka-test-topic3",
    	})
    }
    
    func consumeMessages(reader *kafka.Reader, wg *sync.WaitGroup, consumerName string) {
    	defer wg.Done()
    	for {
    		message, err := reader.ReadMessage(context.Background())
    		if err != nil {
    			break
    		}
    		time.Sleep(1 * time.Second)
    
    		fmt.Printf("[%s] Message at topic/partition/offset %v/%v/%v: %s = %s\n", consumerName, message.Topic, message.Partition, message.Offset, string(message.Key), string(message.Value))
    	}
    	if err := reader.Close(); err != nil {
    		log.Fatalf("[%s] Failed to close reader: %v\n", consumerName, err)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
  • 相关阅读:
    移植EasyLogger
    Flutter高仿微信-第52篇-群聊-清空聊天记录
    day13【代码随想录】环形链表II、环形链表、快乐数、各位相加、丑数、丑数||
    优选策略的自适应蚁狮优化算法-附代码
    2020年最新最全的Java面试经历整理(一次性查缺补漏个够)
    【开源免费】使用Spring Boot和Html实现ChatGPT,1:亿还原,将就看
    PHP - Stream扩展 - 学习/实践
    基于蜻蜓优化算法的配电网重构求解(Python代码实现)【IEEE123节点算例】
    华为新设备升级示例
    网络通信深入解析:探索TCP/IP模型
  • 原文地址:https://blog.csdn.net/u011019141/article/details/138157030