gRPC元数据用于传输特定RPC调用的额外信息(如身份验证详细信息),其表现形式为键值对列表。其中键是字符串,值通常是字符串,但可以是二进制数据。
键通常由字符串、数字、特殊字符(-,
_,
.)组成,且大小写不敏感。但是不能用grpc-开头,框架内部预留。二进制Key使用-bin结尾。允许客户端向服务端发送元数据,同样也允许服务端向客户端发送元数据。
type MD map[string][]string
可以将元数据看成普通的map结构,key 是字符串,value是字符串数组,所以可以给同一个key添加多个value值。创建元数据有两种方式
使用New函数从map结构中创建
md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})
使用Pairs函数创建,相同的key自动合并为list
md := metadata.Pairs(
"key1", "val1",
"key1", "val1-2", // "key1" will have map value []string{"val1", "val1-2"}
"key2", "val2",
)
注意:所有的key将自动转变为小写,因此 key1 和 kEy1被认为是同一个Key,它们的value将被合并为list
元数据中的key一直是字符串,但是value值可以是字符串或二进制数据。gRPC在创建二进制元数据时,要求key必须以 “-bin”结尾, 对value值进行编码
md := metadata.Pairs(
"key", "string value",
"key-bin", string([]byte{96, 102}), // 使用 base64编码
)
可以从上下文对象context中调用FromIncomingContext方法获取元数据
func (s *server) SomeRPC(ctx context.Context, in *pb.SomeRequest) (*pb.SomeResponse, err) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
有两种方式可以将元数据发送到服务器。建议使用AppendToOutgoingContext方法将key-value键值对添加到上下文。该方法可以与上下文现有的元数据一起使用。如果之前没有元数据,则添加元数据;如果存在元数据,则合并key-value键值对。
// 在上下文中创建 元数据
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 在元数据中添加新的数据 (例如:在另一个拦截器中)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// 简单 RPC 调用
response, err := client.SomeRPC(ctx, someRequest)
// 流式 RPC 调用
stream, err := client.SomeStreamingRPC(ctx)
此外,使用使用NewOutgoingContext方法将元数据添加到context上下文,需要注意: 这将替换上下文已经存在的元数据
// 在上下文中创建 元数据
md := metadata.Pairs("k1", "v1", "k1", "v2", "k2", "v3")
ctx := metadata.NewOutgoingContext(context.Background(), md)
// 在元数据中添加新的数据 (例如:在另一个拦截器中)
send, _ := metadata.FromOutgoingContext(ctx)
newMD := metadata.Pairs("k3", "v3")
ctx = metadata.NewOutgoingContext(ctx, metadata.Join(send, newMD))
// 简单RPC调用
response, err := client.SomeRPC(ctx, someRequest)
// 流式RPC调用
stream, err := client.SomeStreamingRPC(ctx)
客户端通过gRPC中 header、trailer 获取元数据。gRPC 在HTTP2协议的基础上构建数据传送通道,通常而言,都会先入为主的认为元数据会存放在HTTP header中,思考一下为什么还需要从trailer获取元数据?
gRPC使用HTTP trailers 由以下两个目的:
使用CallOption中的Header、Trailer函数获取 unary RPC中的元数据
var header, trailer metadata.MD // 定义变量存储 header、trailer 中的元数据
r, err := client.SomeRPC(
ctx,
someRequest,
grpc.Header(&header), // 获取 header 元数据
grpc.Trailer(&trailer), // 获取 trailer 元数据
)
可以使用ClientStream接口中的Header、Trailer函数获取响应流中的元数据。流式RPC调用包含:
stream, err := client.SomeStreamingRPC(ctx)
// retrieve header
header, err := stream.Header()
// retrieve trailer
trailer := stream.Trailer()
在请求-响应RPC调用中,gRPC服务端通过SendHeader、SetTrailer方法发送Header、Trailer元数据,这两个函数都需要上下文context作为参数
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// create and send header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
流式RPC调用,使用ServerStream中的SendHeader 、SetTrailer方法发送header、trailer元数据
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
// create and send header
header := metadata.Pairs("header-key", "val")
stream.SendHeader(header)
// create and set trailer
trailer := metadata.Pairs("trailer-key", "val")
stream.SetTrailer(trailer)
}
gRPC服务端从context上下文中获取客户端发送的元数据。但是请求-响应RPC、流式RPC接收元数据的方式稍有不同
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
func (s *server) SomeStreamingRPC(stream pb.Service_SomeStreamingRPCServer) error {
md, ok := metadata.FromIncomingContext(stream.Context()) // get context from stream
// do something with metadata
}
代码目录
├── README.md
├── client
│ └── main.go
└── server
└── main.go
运行服务端
go run server/main.go
运行客户端
go run client/main.go
客户端核心代码
func unaryCallWithMetadata(c pb.EchoClient, message string) {
fmt.Printf("--- unary ---\n")
// Create metadata and context.
md := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
ctx := metadata.NewOutgoingContext(context.Background(), md)
//注意此种方式,调用 NewOutgoingContext函数会覆盖 timestamp元数据
//newMD := metadata.Pairs("k3", "v3")
//ctx = metadata.NewOutgoingContext(ctx, newMD)
// 定义变量 存储服务端header、trailer 元数据
var header, trailer metadata.MD
r, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: message}, grpc.Header(&header), grpc.Trailer(&trailer))
if err != nil {
log.Fatalf("failed to call UnaryEcho: %v", err)
}
// 判断response header是否返回timestamp元数据
if t, ok := header["timestamp"]; ok {
fmt.Printf("timestamp from header:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("timestamp expected but doesn't exist in header")
}
// 判断response header是否返回location元数据
if l, ok := header["location"]; ok {
fmt.Printf("location from header:\n")
for i, e := range l {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("location expected but doesn't exist in header")
}
fmt.Printf("------------------------------- response:\n")
fmt.Printf(" - %s\n", r.Message)
//读取trailer 元数据
if t, ok := trailer["timestamp"]; ok {
fmt.Printf("timestamp from trailer:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
} else {
log.Fatal("timestamp expected but doesn't exist in trailer")
}
}
服务端核心代码
func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) {
fmt.Printf("--- UnaryEcho ---\n")
// 利用defer 最后发送trailer元数据
defer func() {
//设置trailer 元数据
trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
grpc.SetTrailer(ctx, trailer)
}()
// 读取客户端的元数据
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Errorf(codes.DataLoss, "UnaryEcho: failed to get metadata")
}
if t, ok := md["timestamp"]; ok {
fmt.Printf("timestamp from metadata:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
}
// 发送header 元数据
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
grpc.SendHeader(ctx, header)
fmt.Printf("request received: %v, sending echo\n", in)
return &pb.EchoResponse{Message: in.Message}, nil
}
Wireshark抓包截图
简单的分析下服务端Streaming RPC调用的代码,其他方式(客户端Streaming RPC、双向流模式RPC)大同小异只是调用函数不一致而已
func (s *server) ServerStreamingEcho(in *pb.EchoRequest, stream pb.Echo_ServerStreamingEchoServer) error {
fmt.Printf("--- ServerStreamingEcho ---\n")
// 利用defer 最后发送trailer元数据
defer func() {
trailer := metadata.Pairs("timestamp", time.Now().Format(timestampFormat))
stream.SetTrailer(trailer)
}()
// 读取客户端发送的元数据
md, ok := metadata.FromIncomingContext(stream.Context())
if !ok {
return status.Errorf(codes.DataLoss, "ServerStreamingEcho: failed to get metadata")
}
if t, ok := md["timestamp"]; ok {
fmt.Printf("timestamp from metadata:\n")
for i, e := range t {
fmt.Printf(" %d. %s\n", i, e)
}
}
// 创建返回的header元数据
header := metadata.New(map[string]string{"location": "MTV", "timestamp": time.Now().Format(timestampFormat)})
stream.SendHeader(header)
fmt.Printf("request received: %v\n", in)
// 模拟多次发送
for i := 0; i < streamingCount; i++ {
fmt.Printf("echo message %v\n", in.Message)
err := stream.Send(&pb.EchoResponse{Message: in.Message})
if err != nil {
return err
}
}
return nil
}
测试结果
如上图,代码里里面设置的trailer信息在最后一个stream流中的header 信息中传输给客户端。