• golang grpc——header和trailer元数据传输


    header和trailer元数据传输

    header和trailer元数据最后会设置到http header。 用于传输除了proto定义以外的额外信息。比如用户身份认证信息,代理信息,访问令牌等。在grpc里统称为metadata.MD

    元数据构建

    元数据数据结构

    // MD is a mapping from metadata keys to values. Users should use the following
    // two convenience functions New and Pairs to generate MD.
    type MD map[string][]string
    
    • 1
    • 2
    • 3
    //grpc 元数据处理
    func getMetadataByMap(mp map[string]string) metadata.MD {
        // 返回值 type MD map[string][]string
    
        // 通过map 初始化元数据,后续要放到上下文中
        md := metadata.New(mp)
        return md
    }
    
    // 根据键值对获取 元数据
    func getMetadataByKV(kv ...string) metadata.MD {
        md := metadata.Pairs(kv...)
        return md
    }
    
    // 元数据附加
    func appendMetadata(md *metadata.MD, k string, kv ...string) {
        md.Append(k, kv...)
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    构造主要方法有如下

    • metadata.New(mp) 根据map构造元数据
    • metadata.Pairs(kv…) 根据键值对构造如下数据
    • md.Append(k, kv…) 根据键值对添加元数据
    元数据使用

    构造好元数据,然后再看客户端和服务端分别如何使用

    客户端发送

    首先将元数据设置到上下文中,方法如下

    • metadata.NewOutgoingContext(ctx, md) 用于发送方设置,该方法会重置ctx中的元数据。IncomingContext则是用于接收
    • metadata.AppendToOutgoingContext(ctx, kv…) 将键值 附加到上下文中
    package client
    
    import (
        "context"
        "google.golang.org/grpc/metadata"
    )
    
    //grpc 元数据处理
    func getMetadataByMap(mp map[string]string) metadata.MD {
        // 返回值 type MD map[string][]string
    
        // 通过map 初始化元数据,后续要放到上下文中
        md := metadata.New(mp)
        return md
    }
    
    // 根据键值对获取 元数据
    func getMetadataByKV(kv ...string) metadata.MD {
        md := metadata.Pairs(kv...)
        return md
    }
    
    // 元数据附加
    func appendMetadata(md *metadata.MD, k string, kv ...string) {
        md.Append(k, kv...)
        
    }
    
    // 元数据设置到上下文中,传递出去的ctx(发送数据) 和接收的ctx(接收数据)
    func getOutgoingContext(ctx context.Context, md metadata.MD) context.Context{
        // OutgoingContext 用于请求发送方,包装数据传递出去
        // IncomingContext 用于请求接收方,用于获取发送方传递的数据
        // Context 通过序列化放到http2里的header里进行传输
        // new 方法会覆盖ctx 原有的元数据,如果不覆盖,则用append
        return metadata.NewOutgoingContext(ctx, md)
    }
    
    // 将数据附加到OutgoingContext
    func appendOutgoingContext(ctx context.Context, kv ...string) context.Context{
        return metadata.AppendToOutgoingContext(ctx, kv...)
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    以一元请求为例,流式通信是一样的,都是设置到上下文中

    func getContext(ctx context.Context) context.Context {
        md := getMetadataByMap(map[string]string{
            "time":time.Now().Format("2006-01-02 15:04:05"),
            "header_data": "true",
            })
        // 将数据写入context上下文 (覆盖的形式)
        ctx = getOutgoingContext(ctx, md)
        // 附加数据
        ctx = appendOutgoingContext(ctx, "token", "zsdfww+", "user", "aka")
    
        // 打印一下元数据
        //md1, _ := metadata.FromOutgoingContext(ctx)
        //fmt.Println(md1)
        return ctx
    }
    
    // CallUnary 一元请求
    func CallUnary(client echo.EchoClient) {
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    
        ctx = getContext(ctx)
    
        defer cancel()
        in := &echo.EchoRequest{
            Message: "client send message",
            Time: timestamppb.New(time.Now()),
        }
        res, err := client.UnaryEcho(ctx, in, grpc.Header(&header), grpc.Trailer(&trailer))
        if err != nil {
            log.Fatal(err)
        }
    
        fmt.Printf("client recv: %v\n", res.Message)
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    服务端接收

    服务端接收

    一元请求接收

    func (EchoServer) UnaryEcho(ctx context.Context, in *echo.EchoRequest) (*echo.EchoResponse, error) {
    
       	// 业务
    
        fmt.Printf("server recv :%v\n", in.Message)
    	// 接收客户端元数据
        md, ok:= metadata.FromIncomingContext(ctx)
        if !ok {
            log.Fatal("get metadata error")
        } else {
            log.Printf("get metadata :%v\n", md)
            fmt.Println(md)
        }
        return &echo.EchoResponse{
            Message: "server send message",
        }, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    流式,三种流通信方式都是一样的,下面是服务端流例子

    func (EchoServer) ServerStreamingEcho(in *echo.EchoRequest, stream echo.Echo_ServerStreamingEchoServer) error {
        
        fmt.Printf("server recv :%v\n", in.Message)
    
    	// 接收客户端元数据
        md, ok:= metadata.FromIncomingContext(stream.Context())
        if !ok {
            log.Fatal("get metadata error")
        } else {
            fmt.Println("get metadata :", md)
        }
      
    	buf := make([]byte, 1024)
        for {
            ... 填充buf
            stream.Send(&echo.EchoResponse{
                Message: "server sending files",
                Bytes: buf[:n],
                Time: timestamppb.New(time.Now()),
                Length: int32(n),
            })
            ...
        }
        //服务端流结束 return nil
        return nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    服务端发送

    服务端发送的元数据有header 和 trailer, header位于 grpc服务调用前发送,trailer位于grpc服务结束后发送

    对于一元请求来说

    func (EchoServer) UnaryEcho(ctx context.Context, in *echo.EchoRequest) (*echo.EchoResponse, error) {
    
        // 响应请求,发送元数据
        header, trailer := getMetadata()
        // 发送头部元数据
        grpc.SendHeader(ctx, header)
        // 发送尾部元数据
        defer func() {
            grpc.SetTrailer(ctx, trailer)
        }()
    
        .....
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    对于流通信来说,以服务端流为例

    func (EchoServer) ServerStreamingEcho(in *echo.EchoRequest, stream echo.Echo_ServerStreamingEchoServer) error {
        // 响应请求,发送元数据
        header, trailer := getMetadata()
        // 发送头部元数据,服务端开始调用时填充的数据
        err := stream.SendHeader(header)
        if err != nil {
            log.Println("send header error")
        }
        // 发送尾部元数据,服务端调用结束收填充的数据
        defer stream.SetTrailer(trailer)
        
        ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    客户端接收

    客户端就接收来自服务端的header和trailer

    一元请求

    // CallUnary 一元请求
    func CallUnary(client echo.EchoClient) {
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    
        ctx = getContext(ctx)
    
        defer cancel()
        in := &echo.EchoRequest{
            Message: "client send message",
            Time: timestamppb.New(time.Now()),
        }
    
        // 响应的头部元数据和尾部元数据
        var header, trailer metadata.MD
    
        res, err := client.UnaryEcho(ctx, in, grpc.Header(&header), grpc.Trailer(&trailer))
        if err != nil {
            log.Fatal(err)
        }
    
        fmt.Printf("client recv: %v\n", res.Message)
    
        fmt.Println("Unary echo header:", header)
        fmt.Println("Unary echo trailer:",trailer)
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    流通信,以服务端流为例

    func CallServerStream(client echo.EchoClient) {
    
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    
        ctx = getContext(ctx)
        defer cancel()
        in := &echo.EchoRequest{
            Message: "client send message",
            Time: timestamppb.New(time.Now()),
        }
    
        stream, err := client.ServerStreamingEcho(ctx, in)
        if err != nil {
            log.Fatal(err)
        }
        // 获取头部元数据
        header, _:= stream.Header()
    
        fmt.Println("CallServerStream recv header", header)
    
        // 尝试获取尾部元数据
        trailer := stream.Trailer()
        // 打印没有数据,因为还没调用完
        fmt.Println("try get CallServerStream recv trailer", trailer)
    
    	...
    
        for {
            res, err := stream.Recv()
            ....
        }
        stream.CloseSend()
    
        // 获取尾部元数据
        trailer = stream.Trailer()
        fmt.Println("CallServerStream recv trailer", trailer)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    PS: 在grpc结束前获取尾部元数据是获取不到的。

    echo.proto文件

    syntax = "proto3";
    option go_package = "grpc/echo";
    import  "google/protobuf/timestamp.proto";
    package grpc.echo;
    
    message EchoRequest {
      string message = 1;
      bytes bytes = 2;
      int32 length = 3;
      google.protobuf.Timestamp time = 4;
    
    }
    message EchoResponse {
      string message = 1;
      bytes bytes = 2;
      int32 length = 3;
      google.protobuf.Timestamp time = 4;
    }
    
    service Echo {
      //一元请求
      rpc UnaryEcho(EchoRequest) returns(EchoResponse) {}
      //服务端流
      rpc ServerStreamingEcho(EchoRequest) returns(stream EchoResponse){}
      //客户端流
      rpc ClientStreamingEcho(stream EchoRequest) returns( EchoResponse){}
      //双向流
      rpc BidirectionalStreamingEcho(stream EchoRequest) returns(stream EchoResponse){}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
  • 相关阅读:
    【Day32】LeetCode刷刷刷。[1700. 无法吃午餐的学生数量 ]
    416. 分割等和子集
    2.go-GIN快速入门
    centos8 安装nginx
    QT系列教程(11) TextEdit实现Qt 文本高亮
    SpringSecurity(一)前言以及框架介绍
    mysql 5.7 登录时报:ERROR 1862 (HY000): Your password has expired
    frp内网穿透服务
    Spring 6【p命名空间和c命名空间】(五)-全面详解(学习总结---从入门到深化)
    wangeditor富文本编辑器使用(详细)
  • 原文地址:https://blog.csdn.net/qq_43058348/article/details/133356126