• 腾讯mini项目-【指标监控服务重构】2023-08-19


    今日已办

    benchmark

    How can we create a configuration for gobench with -benchmem – IDEs Support (IntelliJ Platform) | JetBrains

    本机进行watermill-benchmark

    1. 使用 apifox 自动化测试上报固定数量的消息

    2. 启动watermill-pub/sub的 benchmark 函数

      func BenchmarkPubSub(b *testing.B) {
      	for i := 0; i < 1; i++ {
      		b.StopTimer()
      		logger := watermillzap.NewLogger(log.Logger)
      		publisher, subscriber := consumer.NewPubSub(logger)
      		router, err := message.NewRouter(message.RouterConfig{}, logger)
      		if err != nil {
      			log.Logger.Fatal("create router error", zap.Error(err))
      		}
      
      		router.AddPlugin(plugin.SignalsHandler)
      		router.AddMiddleware(
      			middleware.InstantAck,
      			middleware.Recoverer,
      		)
      
      		router.AddMiddleware(consumer.UnpackKafkaMessage, consumer.InitPerformanceEvent, consumer.AnalyzeEvent)
      		router.AddHandler("crash", "to_analyzer__0.PERF_CRASH", subscriber, "solar-dev.PERF_CRASH", publisher, consumer.CrashHandler)
      		router.AddHandler("lag", "to_analyzer__0.PERF_LAG", subscriber, "solar-dev.PERF_LAG", publisher, consumer.LagHandler)
      		go func() {
      			for {
      				if kafka.PublishCount >= 10000 && router.IsRunning() {
      					err = router.Close()
      					fmt.Printf("router close err:%v\n", err)
      					break
      				}
      			}
      		}()
      		b.StartTimer()
      		if err = router.Run(context.Background()); err != nil {
      			log.Logger.Error("router run error", zap.Error(err))
      		}
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34

    单 topic

    Event numbermsns/opB/opallocs/op
    crash 106980.0130600
    crash 1007010.0137300
    crash 100011431164375900131129392815484
    crash 100006174616030730013000890888179332

    双 topic

    Event numbermsns/opB/opallocs/op
    crash & lag 106890.0148600
    crash & lag 1007180.0143800
    crash & lag 1000166116772692002699498881797754
    crash & lag 100001169711573685900268443070417945041

    add publisher send message trace & log logic

    1. 在 handler 了中将 data 用 message 的 Context 传递下来
    2. 获取 WriteKafkaSpan 和 RootSpan 手动 End
    3. 打日志

    profile/internal/watermill/pubsub/consumer_stage.go

    // crashHandler
    // @Description
    // @Author xzx 2023-08-12 15:09:15
    // @Param msg
    // @Return []*message.Message
    // @Return error
    func crashHandler(msg *message.Message) ([]*message.Message, error) {
    	data := GetDataFromMsg(msg)
    
    	writeKafkaCtx, span := otelclient.ConsumerTracer.Start(data.RootSpanCtx, "crashHandler",
    		trace.WithSpanKind(trace.SpanKindProducer))
    	setSpanAttributes(span, data)
    	data.WriteKafkaSpan = span
    	toWriteBytes, contextErr := json.Marshal(data.Event)
    	if contextErr != nil {
    		data.Status = state.StatusUnmarshalError
    		handlerErr(writeKafkaCtx, "marshal error", contextErr)
    		data.WriteKafkaSpan.End()
    		data.RootSpan.End()
    		return nil, contextErr
    	}
    
    	msg = message.NewMessage(data.Event.BackendID, toWriteBytes)
    	msg.Metadata.Set(watermillkafka.HeaderKey, data.Event.ID)
    	log.Logger.Info("[4-crashHandler]", zap.String("topic", connector.GetTopic(data.Event.Category)), zap.String("id", data.Event.ID), zap.String("msg", string(toWriteBytes)))
    
    	SetDataInMsg(msg, data)
    	return message.Messages{msg}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    profile/internal/watermill/watermillkafka/publisher.go

    // Publish publishes message to Kafka.
    //
    // Publish is blocking and wait for ack from Kafka.
    // When one of messages delivery fails - function is interrupted.
    func (p *Publisher) Publish(topic string, msgs ...*message.Message) error {
    	if p.closed {
    		return errors.New("publisher closed")
    	}
    
    	logFields := make(watermill.LogFields, 4)
    	logFields["topic"] = topic
    
    	for _, msg := range msgs {
    		logFields["message_uuid"] = msg.UUID
    		p.logger.Trace("Sending message to Kafka", logFields)
    
    		kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
    		if err != nil {
    			return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
    		}
    
    		// todo: add otel-trace and log about sendMessage
    		data := pubsub.GetDataFromMsg(msg)
    		partition, offset, err := p.producer.SendMessage(kafkaMsg)
    		if err != nil {
    			log.Logger.ErrorContext(msg.Context(), "send message to kafka error", zap.Error(err))
    			data.WriteKafkaSpan.End()
    			data.RootSpan.End()
    			return errors.Wrapf(err, "cannot produce message %s", msg.UUID)
    		}
    		log.Logger.Info("[4-WriteKafka] write kafka success",
    			zap.String("topic", connector.GetTopic(data.Event.Category)),
    			zap.String("id", data.Event.ID), zap.Any("msg", data.Event),
    			zap.String("profile_root_span_id", data.RootSpan.SpanContext().SpanID().String()))
    
    		data.WriteKafkaSpan.End()
    		logFields["kafka_partition"] = partition
    		logFields["kafka_partition_offset"] = offset
    
    		p.logger.Trace("Message sent to Kafka", logFields)
    		data.RootSpan.End()
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    明日待办

    1. ppt 制作
  • 相关阅读:
    【通信原理笔记】【二】随机信号分析——2.2 平稳随机过程
    实验2 Python数字类型实验
    微服务篇-B 深入理解SOA框架(Dubbo)_I 服务注册和发现(学习总结)
    电脑为什么会蓝屏的原因
    力扣240.搜索二维矩阵II
    玩转Mysql系列 - 第24篇:如何正确的使用索引?
    深度解析linux内核模块编译makefile
    JavaWeb_第5章_会话技术_Cookie+Session
    井冈山大学专属中秋月饼
    ue5 小知识点 ue的world type,pie editor game
  • 原文地址:https://blog.csdn.net/xzx18822942899/article/details/132913922