• 腾讯mini项目-【指标监控服务重构】2023-08-22


    今日已办

    50字项目价值和重难点

    项目价值

    通过将指标监控组件接入项目,对比包括其配套工具在功能、性能上的差异、优劣,给出监控服务瘦身的建议

    top3难点

    1. 减少监控服务资源成本,考虑性能优化
    2. 如何证明我们在监控服务差异、优劣方面的断言
    3. 监控服务无感化,支持代码可扩展

    总监回复

    小而美的监控服务

    怎么为之小? 怎么为之美? 要小要美的关键点是什么?是你们对于这个服务的核心的把握,最内核的把握,是如何做好减法。多看点做减法的文章,难的地方是,如何先想厚再做薄。

    Watermill

    Replace sarama.SyncProducer -> kafka-client.Producer

    • 测试发现 WriteKafka 的 Span 耗时较久,Watermill-kafka 的 Publisher 的 Producer 是 sarama.SyncProducer,同步写入耗时较久,而 sarama.ASyncProducer的 API 不够直观,需求也需要我们更改底层库为 kafka-go

    • 对比原先写入kafka,发现 kafka-client(依赖 kafka-go) 的 Producer 可以支持同步or异步消息写入,故修改 Publisher 的实现

    • profile/internal/watermill/watermillkafka/marshaler.go

      package watermillkafka
      
      import (
      	"github.com/Shopify/sarama"
      	"github.com/ThreeDotsLabs/watermill/message"
      	"github.com/pkg/errors"
      	"github.com/segmentio/kafka-go"
      )
      
      const UUIDHeaderKey = "_watermill_message_uuid"
      const HeaderKey = "_key"
      
      // Marshaler marshals Watermill's message to Kafka message.
      type Marshaler interface {
      	Marshal(topic string, msg *message.Message) (*kafka.Message, error)
      }
      
      // Unmarshaler unmarshals Kafka's message to Watermill's message.
      type Unmarshaler interface {
      	Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
      }
      
      type MarshalerUnmarshaler interface {
      	Marshaler
      	Unmarshaler
      }
      
      type DefaultMarshaler struct{}
      
      func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*kafka.Message, error) {
      	if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
      		return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
      	}
      
      	headers := []kafka.Header{{
      		Key:   UUIDHeaderKey,
      		Value: []byte(msg.UUID),
      	}}
      	var msgKey string
      	for key, value := range msg.Metadata {
      		if key == HeaderKey {
      			msgKey = value
      		} else {
      			headers = append(headers, kafka.Header{
      				Key:   key,
      				Value: []byte(value),
      			})
      		}
      	}
      
      	return &kafka.Message{
      		Topic:   topic,
      		Key:     []byte(msgKey),
      		Value:   msg.Payload,
      		Headers: headers,
      	}, nil
      }
      
      func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error) {
      	var messageID string
      	metadata := make(message.Metadata, len(kafkaMsg.Headers))
      
      	for _, header := range kafkaMsg.Headers {
      		if string(header.Key) == UUIDHeaderKey {
      			messageID = string(header.Value)
      		} else {
      			metadata.Set(string(header.Key), string(header.Value))
      		}
      	}
      
      	msg := message.NewMessage(messageID, kafkaMsg.Value)
      	msg.Metadata = metadata
      
      	return msg, nil
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
    • profile/internal/watermill/watermillkafka/publisher.go

    package watermillkafka
    
    import (
    	"context"
    	kc "github.com/Kevinello/kafka-client"
    	"github.com/ThreeDotsLabs/watermill"
    	"github.com/ThreeDotsLabs/watermill/message"
    	"github.com/pkg/errors"
    	"go.uber.org/zap"
    	"profile/internal/connector"
    	"profile/internal/log"
    	"profile/internal/watermill/model"
    )
    
    type Publisher struct {
    	config   PublisherConfig
    	producer *kc.Producer
    	logger   watermill.LoggerAdapter
    
    	closed bool
    }
    
    // NewPublisher creates a new Kafka Publisher.
    func NewPublisher(
    	config PublisherConfig,
    	logger watermill.LoggerAdapter,
    ) (*Publisher, error) {
    	if err := config.Validate(); err != nil {
    		return nil, err
    	}
    
    	if logger == nil {
    		logger = watermill.NopLogger{}
    	}
    
    	producer, err := kc.NewProducer(context.Background(), config.KcProducerConfig)
    	if err != nil {
    		return nil, errors.Wrap(err, "cannot create Kafka producer")
    	}
    
    	return &Publisher{
    		config:   config,
    		producer: producer,
    		logger:   logger,
    	}, nil
    }
    
    type PublisherConfig struct {
    	// Kafka brokers list.
    	Brokers []string
    
    	// Marshaler is used to marshal messages from Watermill format into Kafka format.
    	Marshaler Marshaler
    
    	// KcProducerConfig configuration object used to create new instances of Producer
    	KcProducerConfig kc.ProducerConfig
    }
    
    func (c PublisherConfig) Validate() error {
    	if len(c.Brokers) == 0 {
    		return errors.New("missing brokers")
    	}
    	if c.Marshaler == nil {
    		return errors.New("missing marshaler")
    	}
    	return c.KcProducerConfig.Validate()
    }
    
    // Publish publishes message to Kafka.
    //
    // Publish is blocking and wait for ack from Kafka.
    // When one of messages delivery fails - function is interrupted.
    func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
    	if p.closed {
    		return errors.New("publisher closed")
    	}
    
    	logFields := make(watermill.LogFields, 2)
    	logFields["topic"] = topic
    
    	for _, msg := range msgs {
    		logFields["message_uuid"] = msg.UUID
    		p.logger.Trace("Sending message to Kafka", logFields)
    
    		kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
    		if err != nil {
    			return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
    		}
    
    		data := msg.Context().Value("data").(*model.ConsumeCtxData)
    		err = p.producer.WriteMessages(msg.Context(), *kafkaMsg)
    		if err != nil {
    			log.Logger.ErrorContext(msg.Context(), "send message to kafka error", zap.Error(err))
    			data.WriteKafkaSpan.End()
    			data.RootSpan.End()
    			return errors.Wrapf(err, "cannot produce message %s", msg.UUID)
    		}
    		data.WriteKafkaSpan.End()
    		log.Logger.Info("[WriteKafka] write kafka success",
    			zap.String("topic", connector.GetTopic(data.Event.Category)),
    			zap.String("id", data.Event.ID), zap.Any("msg", data.Event),
    			zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
    
    		p.logger.Trace("Message sent to Kafka", logFields)
    		data.RootSpan.End()
    	}
    
    	return nil
    }
    
    func (p *Publisher) Close() error {
    	if p.closed {
    		return nil
    	}
    	p.closed = true
    
    	if err := p.producer.Close(); err != nil {
    		return errors.Wrap(err, "cannot close Kafka producer")
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 测试发现性能对比先前修改有了大量提升,(还测试了不同环境-docker/本地,不同配置-同步/异步的区别),docker环境,开启异步是效率最高的

    image-20230822164942693

    image-20230822165245908

    image-20230822165306809

    明日待办

    1. benchmark:watermill 和 baserunner
  • 相关阅读:
    HWS-CTF-第七期山大站-inverse
    C Primer Plus(6) 中文版 第1章 初识C语言 1.1 C语言的起源 1.2 选择C语言的理由 1.3 C语言的应用范围
    内存占用问题
    编程小白如何成为大神
    CUDA核函数
    Swift Combine — Publisher、Operator、Subscriber概念介绍
    【金九银十必问面试题】站在架构师角度分析问题,如何解决TCC中的悬挂问题
    Spring 如何进行编程式事务管理呢?
    1143 多少个Fibonacci数
    国外的SRE都是干啥工作的?薪资如何?
  • 原文地址:https://blog.csdn.net/xzx18822942899/article/details/132913940