通过将指标监控组件接入项目,对比包括其配套工具在功能、性能上的差异、优劣,给出监控服务瘦身的建议
小而美的监控服务
怎么为之小? 怎么为之美? 要小要美的关键点是什么?是你们对于这个服务的核心的把握,最内核的把握,是如何做好减法。多看点做减法的文章,难的地方是,如何先想厚再做薄。
sarama.SyncProducer
-> kafka-client.Producer
测试发现 WriteKafka 的 Span 耗时较久,Watermill-kafka 的 Publisher 的 Producer 是 sarama.SyncProducer
,同步写入耗时较久,而 sarama.ASyncProducer
的 API 不够直观,需求也需要我们更改底层库为 kafka-go
对比原先写入kafka,发现 kafka-client(依赖 kafka-go) 的 Producer 可以支持同步or异步消息写入,故修改 Publisher 的实现
profile/internal/watermill/watermillkafka/marshaler.go
package watermillkafka
import (
"github.com/Shopify/sarama"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"github.com/segmentio/kafka-go"
)
const UUIDHeaderKey = "_watermill_message_uuid"
const HeaderKey = "_key"
// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*kafka.Message, error)
}
// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
Unmarshal(*sarama.ConsumerMessage) (*message.Message, error)
}
type MarshalerUnmarshaler interface {
Marshaler
Unmarshaler
}
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*kafka.Message, error) {
if value := msg.Metadata.Get(UUIDHeaderKey); value != "" {
return nil, errors.Errorf("metadata %s is reserved by watermill for message UUID", UUIDHeaderKey)
}
headers := []kafka.Header{{
Key: UUIDHeaderKey,
Value: []byte(msg.UUID),
}}
var msgKey string
for key, value := range msg.Metadata {
if key == HeaderKey {
msgKey = value
} else {
headers = append(headers, kafka.Header{
Key: key,
Value: []byte(value),
})
}
}
return &kafka.Message{
Topic: topic,
Key: []byte(msgKey),
Value: msg.Payload,
Headers: headers,
}, nil
}
func (DefaultMarshaler) Unmarshal(kafkaMsg *sarama.ConsumerMessage) (*message.Message, error) {
var messageID string
metadata := make(message.Metadata, len(kafkaMsg.Headers))
for _, header := range kafkaMsg.Headers {
if string(header.Key) == UUIDHeaderKey {
messageID = string(header.Value)
} else {
metadata.Set(string(header.Key), string(header.Value))
}
}
msg := message.NewMessage(messageID, kafkaMsg.Value)
msg.Metadata = metadata
return msg, nil
}
profile/internal/watermill/watermillkafka/publisher.go
package watermillkafka
import (
"context"
kc "github.com/Kevinello/kafka-client"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/pkg/errors"
"go.uber.org/zap"
"profile/internal/connector"
"profile/internal/log"
"profile/internal/watermill/model"
)
type Publisher struct {
config PublisherConfig
producer *kc.Producer
logger watermill.LoggerAdapter
closed bool
}
// NewPublisher creates a new Kafka Publisher.
func NewPublisher(
config PublisherConfig,
logger watermill.LoggerAdapter,
) (*Publisher, error) {
if err := config.Validate(); err != nil {
return nil, err
}
if logger == nil {
logger = watermill.NopLogger{}
}
producer, err := kc.NewProducer(context.Background(), config.KcProducerConfig)
if err != nil {
return nil, errors.Wrap(err, "cannot create Kafka producer")
}
return &Publisher{
config: config,
producer: producer,
logger: logger,
}, nil
}
type PublisherConfig struct {
// Kafka brokers list.
Brokers []string
// Marshaler is used to marshal messages from Watermill format into Kafka format.
Marshaler Marshaler
// KcProducerConfig configuration object used to create new instances of Producer
KcProducerConfig kc.ProducerConfig
}
func (c PublisherConfig) Validate() error {
if len(c.Brokers) == 0 {
return errors.New("missing brokers")
}
if c.Marshaler == nil {
return errors.New("missing marshaler")
}
return c.KcProducerConfig.Validate()
}
// Publish publishes message to Kafka.
//
// Publish is blocking and wait for ack from Kafka.
// When one of messages delivery fails - function is interrupted.
func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
}
logFields := make(watermill.LogFields, 2)
logFields["topic"] = topic
for _, msg := range msgs {
logFields["message_uuid"] = msg.UUID
p.logger.Trace("Sending message to Kafka", logFields)
kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
if err != nil {
return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
}
data := msg.Context().Value("data").(*model.ConsumeCtxData)
err = p.producer.WriteMessages(msg.Context(), *kafkaMsg)
if err != nil {
log.Logger.ErrorContext(msg.Context(), "send message to kafka error", zap.Error(err))
data.WriteKafkaSpan.End()
data.RootSpan.End()
return errors.Wrapf(err, "cannot produce message %s", msg.UUID)
}
data.WriteKafkaSpan.End()
log.Logger.Info("[WriteKafka] write kafka success",
zap.String("topic", connector.GetTopic(data.Event.Category)),
zap.String("id", data.Event.ID), zap.Any("msg", data.Event),
zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
p.logger.Trace("Message sent to Kafka", logFields)
data.RootSpan.End()
}
return nil
}
func (p *Publisher) Close() error {
if p.closed {
return nil
}
p.closed = true
if err := p.producer.Close(); err != nil {
return errors.Wrap(err, "cannot close Kafka producer")
}
return nil
}