什么是拦截器?在构建gRPC应用程序时,无论是客户端应用程序,还是服务器端应用程序,在远程方法执行之前或之后,都免不了需要执行一些通用逻辑。在gRPC中,可以使用拦截器的扩展机制来拦截RPC的执行以满足特定的需求,如日志、认证、性能度量指标等。根据所拦截的RPC调用的类型,gRPC拦截器可以分为两类。对于一元RPC,可以使用一元拦截器(unary interceptor);对于流RPC,则可以使用流拦截器(streaming interceptor)。这些拦截器既可以用在gRPC服务器端,也可以用在gRPC客户端。
对于服务器端一元拦截器,需要先实现UnaryServerInterceptor类型的函数(func(ctx context.Context, req interface{], info *UnaryServerInfo, handler UnaryHandler) (resp insterface{], err error)
),并在创建gRPC服务器端时将函数注册进来(grpc.NewServer(grpc.UnaryInterceptor(xxxServerInterceptor))
)。服务器端一元拦截器的实现通常可以分为3个部分:前置处理、调用RPC方法以及后置处理。前置处理阶段是在调用预期的RPC远程方法之前执行。在前置处理阶段,用户可以通过检查传入的参数来获取关于当前RPC的信息,比如RPC上下文、RPC请求和服务器端信息。随后,在调用阶段,需要调用gRPC UnaryHandler来触发RPC方法。在调用RPC之后,就进入后置处理阶段。RPC响应要流经后置处理阶段。在这个阶段中,可以按需处理返回的响应和错误。当后置处理阶段完成之后,需要以拦截器函数返回参数的形式将消息和错误返回。如果不需要后置处理,那么可以直接返回handler调用(handler(ctx, req))。
对于服务器端流拦截器,需要先实现StreamServerInterceptor类型的函数(func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error
),与一元拦截器类似,在前置处理阶段,可以在流RPC进入服务实现之前对其进行拦截。在前置处理阶段之后,则可以调用StreamHandler来完成远程方法的RPC执行,而且通过已实现grpc.ServerStream接口的包装器流接口,可以拦截流RPC的消息。在通过handler(srv, newWrappedStream(ss))方法调用grpc.StreamHandler时,可以将这个包装器结构传递进来。grpc.ServerStream的包装器可以拦截gRPC服务发送或接收到的数据。它实现了SendMsg函数和RecvMsg函数,这两个函数分别会在服务发送和接收RPC流消息的时候被调用。
对于客户端一元拦截器,UnaryClientInterceptor是客户端一元拦截器的类型,函数签名如下(func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) ERROR
)。在前置处理阶段,可以在调用远程方法之前拦截RPC。这里可以通过检查出入的参数来访问关于当前RPC的信息,比如RPC的上下文、方法字符串、要发送的请求以及CallOption配置。借助UnaryInvoker参数,可以调用实际的一元RPC。在后置处理阶段,可以访问RPC的响应结果或错误结果。
对于客户端流拦截器,StreamClientInterceptor是客户端流拦截器的类型,函数签名如下func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
。
etcd 提供了丰富的 metrics、日志、请求行为检查等机制,可记录所有请求的执行耗时及错误码、来源 IP 等,也可控制请求是否允许通过,比如 etcd Learner 节点只允许指定接口和参数的访问,帮助大家定位问题、提高服务可观测性等,而这些特性是怎么非侵入式的实现呢?etcd server 定义了如下的 Service KV 和 Range 方法,启动的时候它会将实现 KV 各方法的对象注册到 gRPC Server,并在其上注册对应的拦截器。拦截器提供了在执行一个请求前后的 hook 能力,除了我们上面提到的 debug 日志、metrics 统计、对 etcd Learner 节点请求接口和参数限制等能力,etcd 还基于它实现了以下特性:
server 收到 client 的 Range RPC 请求后,根据 ServiceName 和 RPC Method 将请求转发到对应的 handler 实现,handler 首先会将上面描述的一系列拦截器串联成一个执行,在拦截器逻辑中,通过调用 KVServer 模块的 Range 接口获取数据。
func _KV_Range_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RangeRequest)
if err := dec(in); err != nil { return nil, err }
if interceptor == nil { return srv.(KVServer).Range(ctx, in) } // 没有interceptor,直接调用Range
info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/etcdserverpb.KV/Range", }
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(KVServer).Range(ctx, req.(*RangeRequest))
}
return interceptor(ctx, in, info, handler) // 调用输入参数interceptor
}
如下函数就是将一元拦截器和流拦截器注册到创建gRPC服务器端的过程。newUnaryInterceptor、newLogUnaryInterceptor和newStreamInterceptor定义在server/etcdserver/api/v3rpc/interceptor.go文件中,下面我们描述一下这些拦截器。
server/etcdserver/api/v3rpc/grpc.go
func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
var opts []grpc.ServerOption
opts = append(opts, grpc.CustomCodec(&codec{}))
if tls != nil {
bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
}
chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ newLogUnaryInterceptor(s), newUnaryInterceptor(s), grpc_prometheus.UnaryServerInterceptor, }
if interceptor != nil { chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor) }
chainStreamInterceptors := []grpc.StreamServerInterceptor{ newStreamInterceptor(s), grpc_prometheus.StreamServerInterceptor, }
if s.Cfg.ExperimentalEnableDistributedTracing {
chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
}
opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(chainUnaryInterceptors...)))
opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(chainStreamInterceptors...)))
opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
grpcServer := grpc.NewServer(append(opts, gopts...)...) // 在服务器端注册拦截器等
pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
// server should register all the services manually use empty service name for all etcd services' health status, see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
hsrv := health.NewServer()
hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(grpcServer, hsrv)
// set zero values for metrics registered for this grpc server
grpc_prometheus.Register(grpcServer)
return grpcServer
}
newLogUnaryInterceptor函数生成服务器端一元拦截器,它没有前置处理,但是有后置处理逻辑。如果打开debug日志级别或者请求延时超过指定阈值的,则后置处理逻辑会获取统计信息并打印。
func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now()
resp, err := handler(ctx, req) // 调用handler完成一元RPC的正常执行
lg := s.Logger()
if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive
defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp)
}
return resp, err
}
}
UnaryInterceptor主要是对etcd Learner 节点请求接口和参数限制进行限制,检测执行一个操作前集群必须有 Leader。
func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
if !api.IsCapabilityEnabled(api.V3rpcCapability) { return nil, rpctypes.ErrGRPCNotCapable }
if s.IsMemberExist(s.MemberId()) && s.IsLearner() && !isRPCSupportedForLearner(req) { return nil, rpctypes.ErrGRPCNotSupportedForLearner }
md, ok := metadata.FromIncomingContext(ctx)
if ok {
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
if len(vs) > 0 { ver = vs[0] }
if !utf8.ValidString(ver) { return nil, rpctypes.ErrGRPCInvalidClientAPIVersion }
clientRequests.WithLabelValues("unary", ver).Inc()
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) {return nil, rpctypes.ErrGRPCNoLeader }
}
}
return handler(ctx, req)
}
}
服务器端流拦截器创建协程monitorLeader检测maxNoLeaderCnt election timeout超时后取消掉正在使用的streams流;检测执行一个操作前集群必须有 Leader;创建serverStreamWithCtx,并将其存放到smap.streams字典streams map[grpc.ServerStream]struct{}
中,用于monitorLeader协程Cancel掉流RPC通信。
func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
smap := monitorLeader(s)
return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if !api.IsCapabilityEnabled(api.V3rpcCapability) { return rpctypes.ErrGRPCNotCapable }
if s.IsMemberExist(s.MemberId()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
return rpctypes.ErrGRPCNotSupportedForLearner
}
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
if len(vs) > 0 { ver = vs[0] }
if !utf8.ValidString(ver) { return rpctypes.ErrGRPCInvalidClientAPIVersion }
clientRequests.WithLabelValues("stream", ver).Inc()
if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
if s.Leader() == types.ID(raft.None) { return rpctypes.ErrGRPCNoLeader }
ctx := newCancellableContext(ss.Context())
ss = serverStreamWithCtx{ctx: ctx, ServerStream: ss}
smap.mu.Lock()
smap.streams[ss] = struct{}{}
smap.mu.Unlock()
defer func() {
smap.mu.Lock()
delete(smap.streams, ss)
smap.mu.Unlock()
// TODO: investigate whether the reason for cancellation here is useful to know
ctx.Cancel(nil)
}()
}
}
return handler(srv, ss)
}
}
参考文章:gRPC与云原生应用开发 以Go和Java为例