• Golang处理gRPC请求/响应元数据


    前段时间实现内部gRPC框架时,为了实现在服务端拦截器中打印请求及响应的头部信息,便查阅了部分关于元数据的资料。因为中文网络上对于该领域的信息较少,于是在这做了一些简单的总结。

    元数据

    gRPC的元数据(metadata)是基于HTTP/2头部实现的键值对数据,它通常用来实现gRPC的鉴权、链路跟踪以及自定义头部数据等功能。

    gRPC的元数据分为两种类型,分别是HeaderTrailerHeader可以由客户端或服务端发送,它在客户端请求数据或服务器响应数据前发送。Trailer是一种特殊的头部信息,它仅可由服务端发送,且位于发送的数据之后。

    客户端处理

    在gRPC客户端中,无论是一元调用还是流调用,可以比较简单地通过google.golang.org/grpc/metadata包提供的AppendToOutgoingContextNewOutgoingContext方法向请求中加入头部元数据,例如以下几种方式:

    // 通过metadata创建新的context
    md := metadata.Pairs("k1", "v1", "k2", "v2")
    ctx := metadata.NewOutgoingContext(ctx, md)
    
    // 或是向context中添加元数据
    ctx = metadata.AppendToOutgoingContext(ctx, "k3", "v3")
    
    // ... 通过ctx进行RPC调用
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    对于服务端返回的响应中的元数据,一元调用与流调用的处理方式就较为不同。对于一元调用,需要提前定义好用于存储元数据的变量,然后在调用时通过grpc.Headergrpc.Trailer增加调用的选项:

    var header, trailer metadata.MD
    resp, err := cli.UnaryCall(ctx, req, grpc.Header(&header), grpc.Trailer(&trailer))
    
    // 处理header或trailer
    
    • 1
    • 2
    • 3
    • 4

    而对于任意方式的流调用,都可以简单地通过流调用返回流的HeaderTrailer方法获得元数据:

    stream, err := cli.StreamCall(ctx)
    
    header, err := stream.Header()
    trailer, err := stream.Trailer()
    
    • 1
    • 2
    • 3
    • 4

    服务端处理

    对于服务端,请求的元数据需要通过metadata.FromIncomingContext从context中获取:

    // 一元调用
    md, ok := metadata.FromIncomingContext(ctx)
    
    // 流调用
    ctx := stream.Context() // 需要先从流中得到context
    md, ok := metadata.FromIncomingContext(ctx)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    同样,在服务端发送元数据需要根据一元调用与流调用使用不同的方式。对于一元调用,可以通过grpc.SendHeadergrpc.SetHeader以及grpc.SetTrailer方法设置发送的元数据,例如:

    header := metadata.Pairs("header-key", "header-val")
    grpc.SendHeader(ctx, header)
    trailer := metadata.Pairs("trailer-key", "trailer-val")
    grpc.SetTrailer(ctx, trailer)
    
    • 1
    • 2
    • 3
    • 4

    对于上述的SendHeaderSetHeader方法,其区别为SendHeader方法只能调用一次,而SetHeader方法将会对所有调用的元数据进行合并发送。

    对于流调用,服务端发送元数据则是通过流对象中的上述方法:

    header := metadata.Pairs("header-key", "header-val")
    stream.SendHeader(,header)
    trailer := metadata.Pairs("trailer-key", "trailer-val")
    stream.SetTrailer(trailer)
    
    • 1
    • 2
    • 3
    • 4

    服务器拦截器处理

    对于gRPC服务端一元调用及流调用拦截器,请求元数据的读取与响应元数据的发送与上一节中的实现相同,便不再赘述。下面我们将讨论一下在拦截器中更新请求元数据,以及读取响应的元数据。

    一元调用拦截器更新请求元数据

    在服务端拦截器中更新请求的元数据,其实现的方式与客户端发送元数据类似,即需要通过更新后的元数据创建新的context。对于一元调用拦截器,其简单实现如下所示:

    md, ok := metadata.FromIncomingContext(ctx)
    
    md.Append("new-key", "new-value")
    ctx = metadata.NewIncomingContext(ctx, md)
    
    resp, err := handler(ctx, req) // 传递context至handler中
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    一元调用拦截器读取响应元数据

    对于一元调用响应的元数据,gRPC未提供直接访问的方法响应的元数据。为了在拦截器中能读取到响应的元数据,我们可以通过覆盖原始grpc.ServerTransportStream并对设置的元数据进行备份的方式进行实现。

    type WrappedServerTransportStream struct {
      grpc.ServerTransportStream
    
      header  metadata.MD
      trailer metadata.MD
    }
    
    func (s *WrappedServerTransportStream) SendHeader(md metadata.MD) error {
      if err := s.ServerTransportStream.SendHeader(md); err != nil {
        return err
      }
    
      s.header = md
    
      return nil
    }
    
    // 在需要的情况下继续实现下面的几个方法:
    // func (s *WrappedServerTransportStream) SetHeader(metadata.MD) error
    // func (s *WrappedServerTransportStream) SetTrailer(metadata.MD) error
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在定义带有元数据副本的ServerTransportStream实现后,我们需要通过grpc.ServerTransportStreamFromContext获取到一元调用的原始流,在对其进行封装后,调用grpc.NewContextWithServerTransportStream创建新的context。

    stream := grpc.ServerTransportStreamFromContext(ctx)
    wrappedStream := &WrappedServerTransportStream{
      ServerTransportStream: stream,
    }
    ctx = grpc.NewContextWithServerTransportStream(ctx, wrappedStream)
    
    resp, err := handler(ctx, req)
    
    // 通过wrappedStream.header、wrappedStream.trailer读取响应的元数据
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    需要注意,grpc.ServerTransportStream接口是一个实验性的接口,在后续版本中可能会被移除,所以本节中描述的方法在后续版本中可能不再可用。

    流调用拦截器更新请求元数据

    而对于流调用,gRPC没有提供修改其context的方法,为了实现修改流调用请求元数据,就需要实现grpc.ServerStream接口并加入带有修改后元数据的context。以下是一个简单的实现:

    type WrappedStream struct {
      grpc.ServerStream
      ctx context.Context
    }
    
    func (s *WrappedStream) Context() context.Context {
      return s.ctx
    }
    
    func ExampleStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
      md, ok := metadata.FromIncomingContext(ss.Context())
      md.append("new-key", "new-value")
    
      ctx := metadata.NewIncomingContext(ss.Context(), md)
    
      return handler(srv, &WrappedStream{ss, ctx})
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    流调用拦截器读取响应元数据

    与在一元调用拦截器中相同,若需要在流调用拦截器中读取响应的元数据,我们可以实现grpc.ServerStream接口,并在其中保存元数据的副本。例如我们可以在上节的WrappedStream的基础上,对其进行一定修改:

    type WrappedStream struct {
      grpc.ServerStream
    
      header  metadata.MD
      trailer metadata.MD
    }
    
    func (s *WrappedStream) SendHeader(md metadata.MD) error {
      if err := s.ServerStream.SendHeader(md); err != nil {
        return err
      }
    
      s.header = md
    
      return nil
    }
    
    // 继续实现SetHeader、SetTrailer等方法
    
    func ExampleStreamInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
      stream := &WrappedStream{ServerStream: ss}
      err := handler(srv, stream)
    
      // 通过stream.header、stream.trailer读取响应元数据
    
      return err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    参考资料

  • 相关阅读:
    SpringCloud的新闻资讯项目11 --- 热点文章-实时计算Kafka Stream
    SCM供应链管理系统实施困难及解决方案
    软件测试 app自动化03 toast元素的定位 滑屏操作 触屏操作
    花椰菜的 8 大健康益处,为你一一盘点
    【Python数据科学快速入门系列 | 02】创建ndarray对象的十多种方法
    【STM32基础 CubeMX】外部中断
    临近期末,这些题不来看看吗?(上)
    Jupyternotebook修改默认目录无效No such notebook dir
    使用Navicat对比多环境数据库数据差异和结构差异,以及自动DML和DDL脚本
    reflow-回流 和 repaint-重绘
  • 原文地址:https://blog.csdn.net/ghosind/article/details/136383252