• ETCD数据库源码分析——gRPC 拦截器


    什么是拦截器?在构建gRPC应用程序时,无论是客户端应用程序,还是服务器端应用程序,在远程方法执行之前或之后,都免不了需要执行一些通用逻辑。在gRPC中,可以使用拦截器的扩展机制来拦截RPC的执行以满足特定的需求,如日志、认证、性能度量指标等。根据所拦截的RPC调用的类型,gRPC拦截器可以分为两类。对于一元RPC,可以使用一元拦截器(unary interceptor);对于流RPC,则可以使用流拦截器(streaming interceptor)。这些拦截器既可以用在gRPC服务器端,也可以用在gRPC客户端。

    对于服务器端一元拦截器,需要先实现UnaryServerInterceptor类型的函数(func(ctx context.Context, req interface{], info *UnaryServerInfo, handler UnaryHandler) (resp insterface{], err error)),并在创建gRPC服务器端时将函数注册进来(grpc.NewServer(grpc.UnaryInterceptor(xxxServerInterceptor)))。服务器端一元拦截器的实现通常可以分为3个部分:前置处理、调用RPC方法以及后置处理。前置处理阶段是在调用预期的RPC远程方法之前执行。在前置处理阶段,用户可以通过检查传入的参数来获取关于当前RPC的信息,比如RPC上下文、RPC请求和服务器端信息。随后,在调用阶段,需要调用gRPC UnaryHandler来触发RPC方法。在调用RPC之后,就进入后置处理阶段。RPC响应要流经后置处理阶段。在这个阶段中,可以按需处理返回的响应和错误。当后置处理阶段完成之后,需要以拦截器函数返回参数的形式将消息和错误返回。如果不需要后置处理,那么可以直接返回handler调用(handler(ctx, req))。

    对于服务器端流拦截器,需要先实现StreamServerInterceptor类型的函数(func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error),与一元拦截器类似,在前置处理阶段,可以在流RPC进入服务实现之前对其进行拦截。在前置处理阶段之后,则可以调用StreamHandler来完成远程方法的RPC执行,而且通过已实现grpc.ServerStream接口的包装器流接口,可以拦截流RPC的消息。在通过handler(srv, newWrappedStream(ss))方法调用grpc.StreamHandler时,可以将这个包装器结构传递进来。grpc.ServerStream的包装器可以拦截gRPC服务发送或接收到的数据。它实现了SendMsg函数和RecvMsg函数,这两个函数分别会在服务发送和接收RPC流消息的时候被调用。
    在这里插入图片描述
    对于客户端一元拦截器,UnaryClientInterceptor是客户端一元拦截器的类型,函数签名如下(func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) ERROR)。在前置处理阶段,可以在调用远程方法之前拦截RPC。这里可以通过检查出入的参数来访问关于当前RPC的信息,比如RPC的上下文、方法字符串、要发送的请求以及CallOption配置。借助UnaryInvoker参数,可以调用实际的一元RPC。在后置处理阶段,可以访问RPC的响应结果或错误结果。
    对于客户端流拦截器,StreamClientInterceptor是客户端流拦截器的类型,函数签名如下func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
    在这里插入图片描述

    etcd 提供了丰富的 metrics、日志、请求行为检查等机制,可记录所有请求的执行耗时及错误码、来源 IP 等,也可控制请求是否允许通过,比如 etcd Learner 节点只允许指定接口和参数的访问,帮助大家定位问题、提高服务可观测性等,而这些特性是怎么非侵入式的实现呢?etcd server 定义了如下的 Service KV 和 Range 方法,启动的时候它会将实现 KV 各方法的对象注册到 gRPC Server,并在其上注册对应的拦截器。拦截器提供了在执行一个请求前后的 hook 能力,除了我们上面提到的 debug 日志、metrics 统计、对 etcd Learner 节点请求接口和参数限制等能力,etcd 还基于它实现了以下特性:

    • 要求执行一个操作前集群必须有 Leader;
    • 请求延时超过指定阈值的,打印包含来源 IP 的慢查询日志 (3.5 版本)。

    server 收到 client 的 Range RPC 请求后,根据 ServiceName 和 RPC Method 将请求转发到对应的 handler 实现,handler 首先会将上面描述的一系列拦截器串联成一个执行,在拦截器逻辑中,通过调用 KVServer 模块的 Range 接口获取数据。

    func _KV_Range_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    	in := new(RangeRequest)
    	if err := dec(in); err != nil { return nil, err }
    	if interceptor == nil { return srv.(KVServer).Range(ctx, in) } // 没有interceptor,直接调用Range
    	info := &grpc.UnaryServerInfo{ Server: srv, FullMethod: "/etcdserverpb.KV/Range", }
    	handler := func(ctx context.Context, req interface{}) (interface{}, error) {
    		return srv.(KVServer).Range(ctx, req.(*RangeRequest))
    	}
    	return interceptor(ctx, in, info, handler) // 调用输入参数interceptor 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    如下函数就是将一元拦截器和流拦截器注册到创建gRPC服务器端的过程。newUnaryInterceptor、newLogUnaryInterceptor和newStreamInterceptor定义在server/etcdserver/api/v3rpc/interceptor.go文件中,下面我们描述一下这些拦截器。

    server/etcdserver/api/v3rpc/grpc.go
    func Server(s *etcdserver.EtcdServer, tls *tls.Config, interceptor grpc.UnaryServerInterceptor, gopts ...grpc.ServerOption) *grpc.Server {
    	var opts []grpc.ServerOption
    	opts = append(opts, grpc.CustomCodec(&codec{}))
    	if tls != nil {
    		bundle := credentials.NewBundle(credentials.Config{TLSConfig: tls})
    		opts = append(opts, grpc.Creds(bundle.TransportCredentials()))
    	}
    	chainUnaryInterceptors := []grpc.UnaryServerInterceptor{ newLogUnaryInterceptor(s), newUnaryInterceptor(s), grpc_prometheus.UnaryServerInterceptor, }
    	if interceptor != nil { chainUnaryInterceptors = append(chainUnaryInterceptors, interceptor) }
    	chainStreamInterceptors := []grpc.StreamServerInterceptor{ newStreamInterceptor(s), grpc_prometheus.StreamServerInterceptor, }
    
    	if s.Cfg.ExperimentalEnableDistributedTracing {
    		chainUnaryInterceptors = append(chainUnaryInterceptors, otelgrpc.UnaryServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
    		chainStreamInterceptors = append(chainStreamInterceptors, otelgrpc.StreamServerInterceptor(s.Cfg.ExperimentalTracerOptions...))
    	}
    
    	opts = append(opts, grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(chainUnaryInterceptors...)))
    	opts = append(opts, grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(chainStreamInterceptors...)))
    	opts = append(opts, grpc.MaxRecvMsgSize(int(s.Cfg.MaxRequestBytes+grpcOverheadBytes)))
    	opts = append(opts, grpc.MaxSendMsgSize(maxSendBytes))
    	opts = append(opts, grpc.MaxConcurrentStreams(maxStreams))
    	grpcServer := grpc.NewServer(append(opts, gopts...)...)  // 在服务器端注册拦截器等
    	pb.RegisterKVServer(grpcServer, NewQuotaKVServer(s))
    	pb.RegisterWatchServer(grpcServer, NewWatchServer(s))
    	pb.RegisterLeaseServer(grpcServer, NewQuotaLeaseServer(s))
    	pb.RegisterClusterServer(grpcServer, NewClusterServer(s))
    	pb.RegisterAuthServer(grpcServer, NewAuthServer(s))
    	pb.RegisterMaintenanceServer(grpcServer, NewMaintenanceServer(s))
    	// server should register all the services manually use empty service name for all etcd services' health status, see https://github.com/grpc/grpc/blob/master/doc/health-checking.md for more
    	hsrv := health.NewServer()
    	hsrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
    	healthpb.RegisterHealthServer(grpcServer, hsrv)
    	// set zero values for metrics registered for this grpc server
    	grpc_prometheus.Register(grpcServer)
    
    	return grpcServer
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    newLogUnaryInterceptor函数生成服务器端一元拦截器,它没有前置处理,但是有后置处理逻辑。如果打开debug日志级别或者请求延时超过指定阈值的,则后置处理逻辑会获取统计信息并打印。

    func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
    	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    		startTime := time.Now()
    		resp, err := handler(ctx, req) // 调用handler完成一元RPC的正常执行
    		lg := s.Logger()
    		if lg != nil { // acquire stats if debug level is enabled or RequestInfo is expensive
    			defer logUnaryRequestStats(ctx, lg, s.Cfg.WarningUnaryRequestDuration, info, startTime, req, resp)
    		}
    		return resp, err
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    UnaryInterceptor主要是对etcd Learner 节点请求接口和参数限制进行限制,检测执行一个操作前集群必须有 Leader。

    func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
    	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    		if !api.IsCapabilityEnabled(api.V3rpcCapability) { return nil, rpctypes.ErrGRPCNotCapable }
    		if s.IsMemberExist(s.MemberId()) && s.IsLearner() && !isRPCSupportedForLearner(req) { return nil, rpctypes.ErrGRPCNotSupportedForLearner }
    
    		md, ok := metadata.FromIncomingContext(ctx)
    		if ok {
    			ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
    			if len(vs) > 0 { ver = vs[0] }
    			if !utf8.ValidString(ver) { return nil, rpctypes.ErrGRPCInvalidClientAPIVersion }
    			clientRequests.WithLabelValues("unary", ver).Inc()
    
    			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
    				if s.Leader() == types.ID(raft.None) {return nil, rpctypes.ErrGRPCNoLeader }
    			}
    		}
    		return handler(ctx, req)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    服务器端流拦截器创建协程monitorLeader检测maxNoLeaderCnt election timeout超时后取消掉正在使用的streams流;检测执行一个操作前集群必须有 Leader;创建serverStreamWithCtx,并将其存放到smap.streams字典streams map[grpc.ServerStream]struct{}中,用于monitorLeader协程Cancel掉流RPC通信。

    func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
    	smap := monitorLeader(s)
    
    	return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    		if !api.IsCapabilityEnabled(api.V3rpcCapability) { return rpctypes.ErrGRPCNotCapable }
    		if s.IsMemberExist(s.MemberId()) && s.IsLearner() && info.FullMethod != snapshotMethod { // learner does not support stream RPC except Snapshot
    			return rpctypes.ErrGRPCNotSupportedForLearner
    		}
    
    		md, ok := metadata.FromIncomingContext(ss.Context())
    		if ok {
    			ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey)
    			if len(vs) > 0 { ver = vs[0] }
    			if !utf8.ValidString(ver) { return rpctypes.ErrGRPCInvalidClientAPIVersion }
    			clientRequests.WithLabelValues("stream", ver).Inc()
    
    			if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
    				if s.Leader() == types.ID(raft.None) { return rpctypes.ErrGRPCNoLeader }
    				ctx := newCancellableContext(ss.Context())
    				
    				ss = serverStreamWithCtx{ctx: ctx, ServerStream: ss}
    				smap.mu.Lock()
    				smap.streams[ss] = struct{}{}
    				smap.mu.Unlock()
    
    				defer func() {
    					smap.mu.Lock()
    					delete(smap.streams, ss)
    					smap.mu.Unlock()
    					// TODO: investigate whether the reason for cancellation here is useful to know
    					ctx.Cancel(nil)
    				}()
    			}
    		}
    
    		return handler(srv, ss)
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    参考文章:gRPC与云原生应用开发 以Go和Java为例

  • 相关阅读:
    Redis原理篇——数据结构
    WebShell后门检测与WebShell箱子反杀
    基于java+ssm+vue+mysql的网络教学系统
    pycharm爬虫模块(scrapy)基础使用
    IT运维:利用数据分析平台采集Windows event log数据
    数据库系统及应用复习——第二章关系数据库
    分析网上的一篇“浪漫烟花“程序<VS-C++>
    传智教育|如何转行互联网高薪岗位之一的软件测试?(附软件测试学习路线图)
    人脸识别5.1.3- insightface人脸识别模型arcface-Paddle
    C# 使用Newtonsoft.Json解析嵌套json
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/126061407