• Opentelemetry SDK的简单用法


    Opentelemetry SDK的简单用法

    概述

    Opentelemetry trace的简单架构图如下,客户端和服务端都需要启动一个traceProvider,主要用于将trace数据传输到registry(如jaeger、opencensus等)。client和server通过context将整个链路串起来。

    traceProvider会周期性的将数据推送到Registry,默认是5s

    func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
       ...
       o := BatchSpanProcessorOptions{
          BatchTimeout:       time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
          ExportTimeout:      time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
          MaxQueueSize:       maxQueueSize,
          MaxExportBatchSize: maxExportBatchSize,
       }
       ...
    }

    下面是官方提供的SDK,它实现了opentelemetry的API,也是操作opentelemetry所使用的基本库:

    tracesdk "go.opentelemetry.io/otel/sdk/trace"

    创建 TracerProvider

    要使用trace,首先要创建一个TracerProvider,定义exporter以及相关属性。

    使用 全局TracerProvider

    参数表示应用名称或代码库名称

    var tracer = otel.Tracer("app_or_package_name")

    创建TracerProvider

    下面展示了使用 Jaeger 作为exporter的tracerProvider,其中包含两个概念: exporter 和resource。前者为发送遥测数据的目的地,如jaeger、zepkin、opencensus等;后者通常用于添加非临时的底层元数据信息,如主机名,实例ID等。

    // tracerProvider returns an OpenTelemetry TracerProvider configured to use
    // the Jaeger exporter that will send spans to the provided url. The returned
    // TracerProvider will also use a Resource configured with all the information
    // about the application.
    func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
    	// Create the Jaeger exporter
    	exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
    	if err != nil {
    		return nil, err
    	}
    	tp := tracesdk.NewTracerProvider(
    		// Always be sure to batch in production.
    		tracesdk.WithBatcher(exp),
    		// Record information about this application in a Resource.
    		tracesdk.WithResource(resource.NewWithAttributes(
    			semconv.SchemaURL,
    			semconv.ServiceNameKey.String(service),
    			attribute.String("environment", environment),
    			attribute.Int64("ID", id),
    		)),
    	)
    	return tp, nil
    }

    可以使用如下方式创建resource, semconv 包可以为资源属性提供规范化的名称。

    // newResource returns a resource describing this application.
    func newResource() *resource.Resource {
    	r, _ := resource.Merge(
    		resource.Default(),
    		resource.NewWithAttributes(
    			semconv.SchemaURL,
    			semconv.ServiceNameKey.String("fib"),
    			semconv.ServiceVersionKey.String("v0.1.0"),
    			attribute.String("environment", "demo"),
    		),
    	)
    	return r
    }

    注册tracerProvider

    如果使用自定义的tracerProvider,需要将其注册为全局tracerProvider:

    tp, err := tracerProvider("http://localhost:14268/api/traces")
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	// Register our TracerProvider as the global so any imported
    	// instrumentation in the future will default to using it.
    	otel.SetTracerProvider(tp)

    启动tracerProvider

    tr := tp.Tracer("component-main")
    
    	ctx, span := tr.Start(ctx, "foo")
    	defer span.End()

    关闭tracerProvider

    当程序退出前,需要关闭tracerProvider,执行数据清理工作:

    ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    
        // Cleanly shutdown and flush telemetry when the application exits.
    	defer func(ctx context.Context) {
    		// Do not make the application hang when it is shutdown.
    		ctx, cancel = context.WithTimeout(ctx, time.Second*5)
    		defer cancel()
    		if err := tp.Shutdown(ctx); err != nil {
    			log.Fatal(err)
    		}
    	}(ctx)

    span的简单用法

    tracer会创建span,为了创建span,需要一个 context.Context 实例。该 context 通常来自于请求对象,或已经存在的父span。Go的 context 用于保存活动的span,当span启用后,就可以操作创建好的span以及其包含的已修改的上下文。当span结束后,其将成为不可变状态。

    下面为从请求中获取span:

    func httpHandler(w http.ResponseWriter, r *http.Request) {
    	ctx, span := tracer.Start(r.Context(), "hello-span")
    	defer span.End()
    
    	// do some work to track with hello-span
    }

    获取当前span

    // This context needs contain the active span you plan to extract.
    ctx := context.TODO()
    span := trace.SpanFromContext(ctx)
    
    // Do something with the current span, optionally calling `span.End()` if you want it to en

    创建嵌套的span

    下面将 childSpan 嵌套在了 parentSpan 中,表示串行执行:

    func parentFunction(ctx context.Context) {
    	ctx, parentSpan := tracer.Start(ctx, "parent")
    	defer parentSpan.End()
    
    	// call the child function and start a nested span in there
    	childFunction(ctx)
    
    	// do more work - when this function ends, parentSpan will complete.
    }
    
    func childFunction(ctx context.Context) {
    	// Create a span to track `childFunction()` - this is a nested span whose parent is `parentSpan`
    	ctx, childSpan := tracer.Start(ctx, "child")
    	defer childSpan.End()
    
    	// do work here, when this function returns, childSpan will complete.
    }

    设置span相关的信息

    添加属性

    属性是一组key/value元数据,用于聚合、过滤以及对traces进行分组。

    // setting attributes at creation...
    ctx, span = tracer.Start(ctx, "attributesAtCreation", trace.WithAttributes(attribute.String("hello", "world")))
    // ... and after creation
    span.SetAttributes(attribute.Bool("isTrue", true), attribute.String("stringAttr", "hi!"))

    可以使用如下方式预设置属性,然后再添加到span中:

    var myKey = attribute.Key("myCoolAttribute")
    span.SetAttributes(myKey.String("a value"))

    注:trace的属性并不是随便定义的,它有一些特定的约束,参见 官方约定 以及 uptrace总结的约束

    添加事件

    事件为可读的消息,表示在span的生命周期中"发生了某些事情"。例如,假设某个函数需要获取锁来访问互斥的资源时,可以在两个节点创建事件,一个是尝试访问资源时,另一个是获取到锁时。如:

    span.AddEvent("Acquiring lock")
    mutex.Lock()
    span.AddEvent("Got lock, doing work...")
    // do stuff
    span.AddEvent("Unlocking")
    mutex.Unlock()

    事件的一个有用的特点是,它们的时间戳显示为从span开始的偏移量(即事件发生的真实时间)。

    事件也可以配置属性:

    span.AddEvent("Cancelled wait due to external signal", trace.WithAttributes(attribute.Int("pid", 4328), attribute.String("signal", "SIGHUP")))

    设置span状态

    通常用于表示操作是否有异常。默认状态为 Unset ,可以手动将其设置为 Ok ,但通常没必要这么做。

    result, err := operationThatCouldFail()
    if err != nil {
    	span.SetStatus(codes.Error, "operationThatCouldFail failed")
    }

    记录错误

    用于记录错误日志或调用栈等信息。强烈建议在使用 RecordError 的同时,通过 SetStatus 将span状态设置为 Error :

    result, err := operationThatCouldFail()
    if err != nil {
    	span.SetStatus(codes.Error, "operationThatCouldFail failed")
    	span.RecordError(err)
    }

    完整代码

    下面是对本地的一个函数 bar 生成trace信息:

    func tracerProvider(url string) (*tracesdk.TracerProvider, error) {
    	// Create the Jaeger exporter
    	exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url)))
    	if err != nil {
    		return nil, err
    	}
    	tp := tracesdk.NewTracerProvider(
    		// Always be sure to batch in production.
    		tracesdk.WithBatcher(exp),
    		// Record information about this application in a Resource.
    		tracesdk.WithResource(resource.NewWithAttributes(
    			semconv.SchemaURL,
    			semconv.ServiceNameKey.String(service),
    			attribute.String("environment", environment),
    			attribute.Int64("ID", id),
    		)),
    	)
    	return tp, nil
    }
    
    func main() {
    	tp, err := tracerProvider("http://localhost:14268/api/traces")
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	// Register our TracerProvider as the global so any imported
    	// instrumentation in the future will default to using it.
    	otel.SetTracerProvider(tp)
    
    	ctx, cancel := context.WithCancel(context.Background())
    	defer cancel()
    
    	// Cleanly shutdown and flush telemetry when the application exits.
    	defer func(ctx context.Context) {
    		// Do not make the application hang when it is shutdown.
    		ctx, cancel = context.WithTimeout(ctx, time.Second*5)
    		defer cancel()
    		if err := tp.Shutdown(ctx); err != nil {
    			log.Fatal(err)
    		}
    	}(ctx)
    
    	tr := tp.Tracer("component-main")
    
    	ctx, span := tr.Start(ctx, "foo")
    	defer span.End()
    
    	bar(ctx)
    }
    
    func bar(ctx context.Context) {
    	// Use the global TracerProvider.
    	tr := otel.Tracer("component-bar")
    	_, span := tr.Start(ctx, "bar")
    	span.SetAttributes(attribute.Key("testset").String("value"))
    	defer span.End()
    
    	// Do bar...
    }

    Trace context的跨服务传播

    为了跨服务传播Trace context需要注册一个propagator ,通常在创建注册TracerProvider之后执行。

    func initTracer() (*sdktrace.TracerProvider, error) {
    	// Create stdout exporter to be able to retrieve
    	// the collected spans.
    	exporter, err := stdout.New(stdout.WithPrettyPrint())
    	if err != nil {
    		return nil, err
    	}
    
    	// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
    	// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
    	tp := sdktrace.NewTracerProvider(
    		sdktrace.WithSampler(sdktrace.AlwaysSample()),
    		sdktrace.WithBatcher(exporter),
    	)
    	otel.SetTracerProvider(tp)
    	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
    	return tp, err
    }

    如上注册了两种propagator :TraceContext和Baggage,因此可以使用这两种数据结构传播上下文。

    TraceContext

    下面是 gorilla/mux 的服务端代码,通过 trace.SpanFromContext(r.Context()) 从请求的context构建span,当然也可以通过 tracer.Start(c.Context(), "getUser", oteltrace.WithAttributes(attribute.String("id", id))) 这种方式启动一个新的span:

    func TestPropagationWithCustomPropagators(t *testing.T) {
    	prop := propagation.TraceContext{}
    
    	r := httptest.NewRequest("GET", "/user/123", nil)
    	w := httptest.NewRecorder()
    
    	ctx := trace.ContextWithRemoteSpanContext(context.Background(), sc)
    	prop.Inject(ctx, propagation.HeaderCarrier(r.Header))
    
    	var called bool
    	router := mux.NewRouter()
    	router.Use(Middleware("foobar", WithPropagators(prop)))
    	router.HandleFunc("/user/{id}", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
    		called = true
    		span := trace.SpanFromContext(r.Context())
            defer span.End()
    		assert.Equal(t, sc, span.SpanContext())
    		w.WriteHeader(http.StatusOK)
    	}))
    
    	router.ServeHTTP(w, r)
    	assert.True(t, called, "failed to run test")
    }

    baggage

    下面是使用baggage的客户端和服务端代码,需要注意的是,客户端需要使用 otelhttp 。

    客户端代码:

    package main
    
    import (
    	"context"
    	"flag"
    	"fmt"
    	"io/ioutil"
    	"log"
    	"net/http"
    	"time"
    
    	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    
    	"go.opentelemetry.io/otel"
    	"go.opentelemetry.io/otel/baggage"
    	stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
    	"go.opentelemetry.io/otel/propagation"
    	sdktrace "go.opentelemetry.io/otel/sdk/trace"
    	semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
    	"go.opentelemetry.io/otel/trace"
    )
    
    func initTracer() (*sdktrace.TracerProvider, error) {
    	// Create stdout exporter to be able to retrieve
    	// the collected spans.
    	exporter, err := stdout.New(stdout.WithPrettyPrint())
    	if err != nil {
    		return nil, err
    	}
    
    	// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
    	// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
    	tp := sdktrace.NewTracerProvider(
    		sdktrace.WithSampler(sdktrace.AlwaysSample()),
    		sdktrace.WithBatcher(exporter),
    	)
    	otel.SetTracerProvider(tp)
    	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
    	return tp, err
    }
    
    func main() {
    	tp, err := initTracer()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer func() {
    		if err := tp.Shutdown(context.Background()); err != nil {
    			log.Printf("Error shutting down tracer provider: %v", err)
    		}
    	}()
    	url := flag.String("server", "http://localhost:7777/hello", "server url")
    	flag.Parse()
    
    	client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
    
    	bag, _ := baggage.Parse("username=donuts")
    	ctx := baggage.ContextWithBaggage(context.Background(), bag)
    
    	var body []byte
    
    	tr := otel.Tracer("example/client")
    	err = func(ctx context.Context) error {
    		ctx, span := tr.Start(ctx, "say hello", trace.WithAttributes(semconv.PeerServiceKey.String("ExampleService")))
    		defer span.End()
    		req, _ := http.NewRequestWithContext(ctx, "GET", *url, nil)
    
    		fmt.Printf("Sending request...\n")
    		res, err := client.Do(req)
    		if err != nil {
    			panic(err)
    		}
    		body, err = ioutil.ReadAll(res.Body)
    		_ = res.Body.Close()
    
    		return err
    	}(ctx)
    
    	if err != nil {
    		log.Fatal(err)
    	}
    
    	fmt.Printf("Response Received: %s\n\n\n", body)
    	fmt.Printf("Waiting for few seconds to export spans ...\n\n")
    	time.Sleep(10 * time.Second)
    	fmt.Printf("Inspect traces on stdout\n")
    }

    服务端代码:

    package main
    
    import (
    	"context"
    	"io"
    	"log"
    	"net/http"
    
    	"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    
    	"go.opentelemetry.io/otel"
    	"go.opentelemetry.io/otel/attribute"
    	"go.opentelemetry.io/otel/baggage"
    	stdout "go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
    	"go.opentelemetry.io/otel/propagation"
    	"go.opentelemetry.io/otel/sdk/resource"
    	sdktrace "go.opentelemetry.io/otel/sdk/trace"
    	semconv "go.opentelemetry.io/otel/semconv/v1.10.0"
    	"go.opentelemetry.io/otel/trace"
    )
    
    func initTracer() (*sdktrace.TracerProvider, error) {
    	// Create stdout exporter to be able to retrieve
    	// the collected spans.
    	exporter, err := stdout.New(stdout.WithPrettyPrint())
    	if err != nil {
    		return nil, err
    	}
    
    	// For the demonstration, use sdktrace.AlwaysSample sampler to sample all traces.
    	// In a production application, use sdktrace.ProbabilitySampler with a desired probability.
    	tp := sdktrace.NewTracerProvider(
    		sdktrace.WithSampler(sdktrace.AlwaysSample()),
    		sdktrace.WithBatcher(exporter),
    		sdktrace.WithResource(resource.NewWithAttributes(semconv.SchemaURL, semconv.ServiceNameKey.String("ExampleService"))),
    	)
    	otel.SetTracerProvider(tp)
    	otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
    	return tp, err
    }
    
    func main() {
    	tp, err := initTracer()
    	if err != nil {
    		log.Fatal(err)
    	}
    	defer func() {
    		if err := tp.Shutdown(context.Background()); err != nil {
    			log.Printf("Error shutting down tracer provider: %v", err)
    		}
    	}()
    
    	uk := attribute.Key("username")
    
    	helloHandler := func(w http.ResponseWriter, req *http.Request) {
    		ctx := req.Context()
    		span := trace.SpanFromContext(ctx) // span为Hello
            defer span.End()
    		bag := baggage.FromContext(ctx)
    		span.AddEvent("handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
    
    		_, _ = io.WriteString(w, "Hello, world!\n")
    	}
    
        // otelhttp.NewHandler会在处理请求的同时创建一个名为Hello的span
    	otelHandler := otelhttp.NewHandler(http.HandlerFunc(helloHandler), "Hello")
    
    	http.Handle("/hello", otelHandler)
    	err = http.ListenAndServe(":7777", nil)
    	if err != nil {
    		log.Fatal(err)
    	}
    }

    上述代码生成的链路跟踪如下,client的 HTTP GET 会调用server端的 Hello 。Server的 Hello span是在处理请求时生成的,上述用的是 otelhttp ,其他registry也是类似的处理方式。

    使用如下代码则可以启动两个独立的span,可以表示两个并行的任务:

    helloHandler := func(w http.ResponseWriter, req *http.Request) {
    		ctx := req.Context()
    		ctx, span1 := tracer.Start(ctx, "span1 proecss", trace.WithLinks())
    		defer span1.End()
    		bag := baggage.FromContext(req.Context())
    		span1.SetAttributes(attribute.String("span1", "test1"))
    		span1.AddEvent("span1 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
    
    		ctx, span2 := tracer.Start(req.Context(), "span2 proecss", trace.WithLinks())
    		defer span2.End()
    		span2.SetAttributes(attribute.String("span2", "test2"))
    		span2.AddEvent("span2 handling this...", trace.WithAttributes(uk.String(bag.Member("username").Value())))
    
    		_, _ = io.WriteString(w, "Hello, world!\n")
    	}

    此外还可以通过 baggage.NewKeyValueProperty("key", "value") 等方式创建baggage。

    注:baggage要遵循 W3C Baggage 规范 。

    支持otel的工具

    官方给出了很多 Registry ,如 Gorilla Mux 、 GORM 、 Gin-gonic 、 gRPC 等。更多可以参见 官方代码库 。

    采样

    provider := sdktrace.NewTracerProvider(
    	sdktrace.WithSampler(sdktrace.AlwaysSample()),
    )
    .5
    

    生产中可以考虑使用 TraceIDRatioBased 和 ParentBased 。

  • 相关阅读:
    FireMonkey 做界面的一个小技巧
    绪论
    【python】Jenkins实现携带commit_log钉钉/企微机器人通知
    工作中常用的5种加密算法
    红外遥控器实验
    java ssm旅游景点景区门票预订及信息发布系统
    【吐槽&脑洞】关于逛B站时偶然体验的弹幕互动游戏魏蜀吴三国争霸游戏的一些思考
    【踩坑记录】创建Go模块-从其他远程模块调用代码的教程
    C#进阶-ASP.NET常用控件总结
    SpringCloud搭建微服务之Gateway网关
  • 原文地址:https://blog.csdn.net/JavaMonsterr/article/details/125412381