• gRPC-go 元数据


    gRPC元数据

    基本定义

    gRPC元数据用于传输特定RPC调用的额外信息(如身份验证详细信息),其表现形式为键值对列表。其中键是字符串,值通常是字符串,但可以是二进制数据。

    键通常由字符串、数字、特殊字符(-, _, .)组成,且大小写不敏感。但是不能用grpc-开头,框架内部预留。二进制Key使用-bin结尾。允许客户端向服务端发送元数据,同样也允许服务端向客户端发送元数据。

    构建元数据

    type MD map[string][]string
    
    • 1

    可以将元数据看成普通的map结构,key 是字符串,value是字符串数组,所以可以给同一个key添加多个value值。创建元数据有两种方式

    • 使用New函数从map结构中创建

      md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
      
      • 1
    • 使用Pairs函数创建,相同的key自动合并为list

      md := metadata.Pairs(
          "key1", "val1",
          "key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"}
          "key2", "val2",
      )
      
      • 1
      • 2
      • 3
      • 4
      • 5

    注意:所有的key将自动转变为小写,因此 key1 和 kEy1被认为是同一个Key,它们的value将被合并为list

    二进制元数据

    元数据中的key一直是字符串,但是value值可以是字符串或二进制数据。gRPC在创建二进制元数据时,要求key必须以 “-bin”结尾, 对value值进行编码

    md := metadata.Pairs(
        "key", "string value",
        "key-bin", string([]byte{96, 102}), // 使用 base64编码
    )
    
    • 1
    • 2
    • 3
    • 4

    上下文获取元数据

    可以从上下文对象context中调用FromIncomingContext方法获取元数据

    func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
        md, ok := metadata.FromIncomingContext(ctx)
        // do something with metadata
    }
    
    • 1
    • 2
    • 3
    • 4

    客户端

    发送元数据

    有两种方式可以将元数据发送到服务器。建议使用AppendToOutgoingContext方法将key-value键值对添加到上下文。该方法可以与上下文现有的元数据一起使用。如果之前没有元数据,则添加元数据;如果存在元数据,则合并key-value键值对。

    AppendToOutgoingContext
    // 在上下文中创建 元数据
    ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
    
    // 在元数据中添加新的数据 (例如:在另一个拦截器中)
    ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
    
    // 简单 RPC 调用
    response, err := client.SomeRPC(ctx, someRequest)
    
    // 流式 RPC 调用
    stream, err := client.SomeStreamingRPC(ctx)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    NewOutgoingContext

    此外,使用使用NewOutgoingContext方法将元数据添加到context上下文,需要注意: 这将替换上下文已经存在的元数据

    // 在上下文中创建 元数据
    md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
    ctx := metadata.NewOutgoingContext(context.Background(), md)
    
    // 在元数据中添加新的数据 (例如:在另一个拦截器中)
    send, _ := metadata.FromOutgoingContext(ctx)
    newMD := metadata.Pairs("k3", "v3")
    ctx = metadata.NewOutgoingContext(ctx, metadata.Join(send, newMD))
    
    // 简单RPC调用
    response, err := client.SomeRPC(ctx, someRequest)
    
    // 流式RPC调用
    stream, err := client.SomeStreamingRPC(ctx)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    获取元数据

    客户端通过gRPC中 header、trailer 获取元数据。gRPC 在HTTP2协议的基础上构建数据传送通道,通常而言,都会先入为主的认为元数据会存放在HTTP header中,思考一下为什么还需要从trailer获取元数据?

    gRPC使用HTTP trailers 由以下两个目的:

    • 内容发送后,需要使用 trailer 发送最终的状态
      • RPC调用过程中出现应用程序运行时错误时,状态、状态消息在尾部传送
      • Response响应结束流,在最后trailer frame中使用END_STREAM 标记结束
    • 支持流式RPC用例,流式RPC通常比正常HTTP请求长,HTTP trailer 尾部需要标记请求/响应的处理结果。例如,流式数据处理过程中出现错误,可以在trailer 中发送错误代码,这种情况HTTP Header 是不可预知的,因此Header无法满足此类需求。
    Unary RPC

    使用CallOption中的Header、Trailer函数获取 unary RPC中的元数据

    var header, trailer metadata.MD // 定义变量存储 header、trailer 中的元数据
    r, err := client.SomeRPC(
        ctx,
        someRequest,
        grpc.Header(&header),    // 获取 header 元数据
        grpc.Trailer(&trailer),  // 获取 trailer 元数据
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    Streaming RPC

    可以使用ClientStream接口中的Header、Trailer函数获取响应流中的元数据。流式RPC调用包含:

    • 服务端流式RPC
    • 客户端流式RPC
    • 双向流式RPC调用
    stream, err := client.SomeStreamingRPC(ctx)
    
    // retrieve header
    header, err := stream.Header()
    
    // retrieve trailer
    trailer := stream.Trailer()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    服务端

    发送元数据

    Unary RPC

    在请求-响应RPC调用中,gRPC服务端通过SendHeader、SetTrailer方法发送Header、Trailer元数据,这两个函数都需要上下文context作为参数

    func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
        // create and send header
        header := metadata.Pairs("header-key", "val")
        grpc.SendHeader(ctx, header)
        // create and set trailer
        trailer := metadata.Pairs("trailer-key", "val")
        grpc.SetTrailer(ctx, trailer)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    Streaming RPC

    流式RPC调用,使用ServerStream中的SendHeader 、SetTrailer方法发送header、trailer元数据

    func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
        // create and send header
        header := metadata.Pairs("header-key", "val")
        stream.SendHeader(header)
        // create and set trailer
        trailer := metadata.Pairs("trailer-key", "val")
        stream.SetTrailer(trailer)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    接收元数据

    gRPC服务端从context上下文中获取客户端发送的元数据。但是请求-响应RPC、流式RPC接收元数据的方式稍有不同

    Unary call
    func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
        md, ok := metadata.FromIncomingContext(ctx)
        // do something with metadata
    }
    
    • 1
    • 2
    • 3
    • 4
    Streaming call
    func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
        md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
        // do something with metadata
    }
    
    • 1
    • 2
    • 3
    • 4

    Example 代码

    完整代码

    ├── README.md
    ├── client
    │   └── main.go
    └── server
        └── main.go
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 运行服务端

      go run server/main.go
      
      • 1
    • 运行客户端

      go run client/main.go
      
      • 1

    Unary RPC收发元数据

    • 客户端核心代码

      func unaryCallWithMetadata(c pb.EchoClient, message string) {
      	fmt.Printf("--- unary ---\n")
      	// Create metadata and context.
      	md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
      	ctx := metadata.NewOutgoingContext(context.Background(), md)
      
      	//注意此种方式,调用 NewOutgoingContext函数会覆盖 timestamp元数据
      	//newMD := metadata.Pairs("k3", "v3")
      	//ctx = metadata.NewOutgoingContext(ctx, newMD)
      
      	// 定义变量 存储服务端header、trailer 元数据
      	var header, trailer metadata.MD
      	r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer))
      	if err != nil {
      		log.Fatalf("failed to call UnaryEcho: %v", err)
      	}
      
      	// 判断response header是否返回timestamp元数据
      	if t, ok := header["timestamp"]; ok {
      		fmt.Printf("timestamp from header:\n")
      		for i, e := range t {
      			fmt.Printf(" %d. %s\n", i, e)
      		}
      	} else {
      		log.Fatal("timestamp expected but doesn't exist in header")
      	}
      	// 判断response header是否返回location元数据
      	if l, ok := header["location"]; ok {
      		fmt.Printf("location from header:\n")
      		for i, e := range l {
      			fmt.Printf(" %d. %s\n", i, e)
      		}
      	} else {
      		log.Fatal("location expected but doesn't exist in header")
      	}
      	fmt.Printf("------------------------------- response:\n")
      	fmt.Printf(" - %s\n", r.Message)
      
      	//读取trailer 元数据
      	if t, ok := trailer["timestamp"]; ok {
      		fmt.Printf("timestamp from trailer:\n")
      		for i, e := range t {
      			fmt.Printf(" %d. %s\n", i, e)
      		}
      	} else {
      		log.Fatal("timestamp expected but doesn't exist in trailer")
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
    • 服务端核心代码

      func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
      	fmt.Printf("--- UnaryEcho ---\n")
      	// 利用defer 最后发送trailer元数据
      	defer func() {
      		//设置trailer 元数据
      		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
      		grpc.SetTrailer(ctx, trailer)
      	}()
      
      	// 读取客户端的元数据
      	md, ok := metadata.FromIncomingContext(ctx)
      	if !ok {
      		return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
      	}
      	if t, ok := md["timestamp"]; ok {
      		fmt.Printf("timestamp from metadata:\n")
      		for i, e := range t {
      			fmt.Printf(" %d. %s\n", i, e)
      		}
      	}
      
      	// 发送header 元数据
      	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
      	grpc.SendHeader(ctx, header)
      
      	fmt.Printf("request received: %v, sending echo\n", in)
      
      	return &pb.EchoResponse{Message: in.Message}, nil
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
    • Wireshark抓包截图

      在这里插入图片描述

      img

    Streaming RPC收发元数据

    简单的分析下服务端Streaming RPC调用的代码,其他方式(客户端Streaming RPC、双向流模式RPC)大同小异只是调用函数不一致而已

    func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
    	fmt.Printf("--- ServerStreamingEcho ---\n")
    	// 利用defer 最后发送trailer元数据
    	defer func() {
    		trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
    		stream.SetTrailer(trailer)
    	}()
    
    	// 读取客户端发送的元数据
    	md, ok := metadata.FromIncomingContext(stream.Context())
    	if !ok {
    		return status.Errorf(codes.DataLoss, "ServerStreamingEcho: failed to get metadata")
    	}
    	if t, ok := md["timestamp"]; ok {
    		fmt.Printf("timestamp from metadata:\n")
    		for i, e := range t {
    			fmt.Printf(" %d. %s\n", i, e)
    		}
    	}
    
    	// 创建返回的header元数据
    	header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
    	stream.SendHeader(header)
    
    	fmt.Printf("request received: %v\n", in)
    
    	// 模拟多次发送
    	for i := 0; i < streamingCount; i++ {
    		fmt.Printf("echo message %v\n", in.Message)
    		err := stream.Send(&pb.EchoResponse{Message: in.Message})
    		if err != nil {
    			return err
    		}
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 测试结果

      在这里插入图片描述

      如上图,代码里里面设置的trailer信息在最后一个stream流中的header 信息中传输给客户端。

  • 相关阅读:
    技术Leader对下管理的法宝-SMART
    go sync.Map包装过的对象nil值的判断
    一文彻底理解synchronized(通俗易懂的synchronized)
    Power BI 的 各种限制 和 DataFlow模式
    硕士论文章节划分
    记录一下做工厂的打印pdf程序
    springboot基于协同过滤算法的书籍推荐毕业设计源码101555
    【C++学习】string的模拟实现
    Shell
    【C#/VB.NET】 将PDF转为SVG/Image, SVG/Image转PDF
  • 原文地址:https://blog.csdn.net/u013433591/article/details/127836093