• gRPC 健康检查


    gRPC健康检查

    gRPC提供健康检查机制,允许服务器应用程序发出他们的状态信号给对应的客户端,而不会断开与客户端的连接。例如,当服务器本身已经启动,但是它依赖的另一个服务不可用,该业务场景就可以使用健康检查机制。

    健康检查机制通常结合负载均衡机制配套使用,当检查到后端服务状态异常时,选择正常的Node节点,进行RPC调用,知道异常Node节点正常为止。

    注意: 健康检查机制需要服务名称,所以客户端需要配置服务名称。可以设置空字符串,表示指定主机、端口上所有服务的运行状况都需要被监控。

    健康检查协议

    基于请求-响应式的健康检查协议,客户端需要定期轮询服务器。当集群服务规模不大的时候,这并不是问题。然后当集群规模非常庞大时,大量的客户端发送健康检查请求,那么会占用服务器资源、网络带宽,进一步影响系统正常运行。因此需要将健康检查协议转换为基于流式监控的API。

    需要注意:这里有存在一个细微的缺点,当服务端健康检查代码变得不健康时,可能存在以下情况导致服务无法发送数据:

    • 服务器停止,客户端断开连接
    • 健康检查服务中的问题导致,但实际服务正常运行,客户端不能感知服务端最新状态

    健康检查API

    客户端有两种模式检查服务端的状态:

    • 请求-响应模式 - 客户端不断轮训服务端状态,该方式不优雅

      Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
      
      • 1
    • 监听机制 - 服务端主动推送状态给客户端

      Watch(*HealthCheckRequest, Health_WatchServer) error
      
      • 1

    健康检查的核心接口

    //健康服务API 接口定义.
    type HealthServer interface {
      // 请求服务不可用,请求失败 状态为: NOT_FOUND.
    	Check(context.Context, *HealthCheckRequest) (*HealthCheckResponse, error)
      // 1. 执行watch方法请求服务状态,该方法返回服务当前状态;并且当服务状态改变是主动通知客户端
    	
    	// 2. 如果请求不可用,会返回 “SERVICE_UNKNOWN”,后续当服务状态正常时,推送正常状态给客户端
    	
      // 3. 当客户端接收到 “UNIMPLEMENTED”,表示该服务不支持,不应该发送重试请求
      // 当客户端接收到 其他状态包含 OK, 允许客户端在合适的时机发送重试请求
    	Watch(*HealthCheckRequest, Health_WatchServer) error
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    健康检查状态

    const (
    	HealthCheckResponse_UNKNOWN         HealthCheckResponse_ServingStatus = 0
    	HealthCheckResponse_SERVING         HealthCheckResponse_ServingStatus = 1
    	HealthCheckResponse_NOT_SERVING     HealthCheckResponse_ServingStatus = 2
      // Used only by the Watch method.
    	HealthCheckResponse_SERVICE_UNKNOWN HealthCheckResponse_ServingStatus = 3 
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    客户端行为

    默认情况下禁用客户端检查;服务所有者可以通过配置启动检查机制。即使在服务配置中启用了通道参数,也可以在客户端上使用通道参数来禁用健康检查。

    客户端第一次建立连接,如果已经启用健康检查,会立即调用Watch()方法,channel状态为CONNECTING,直到第一次接收到Response返回。接收到服务端返回的健康检查Response,如果状态为正常,则channel状态改变为 READY。否则channel状态为TRANSIENT_FAILURE。当后端服务从不健康状态转换为健康状态时,子通道的连接状态从TRANSIENT_FAILURE直接转换为READY,其间不会停止CONNECTING。

    调用Watch()方法返回UNIMPLEMENTED状态时,客户端将禁用健康检查,并不会发送重试请求,但是channel状态为 READY,可以正常通信。但是客户端将记录channel事件,同时记录eroor日志。

    调用Watch()方法返回其他状态,channel状态为TRANSIENT_FAILURE,会发送重试请求。为避免集中重试请求造成网络拥堵,客户端在两次重试之间使用指数回退。当客户端在接收到服务端返回的Response是,重置回退状态,立即发送下一次请求。然后重试请求将受指数回退(简单的理解,就是确定重试请求的时间间隔)的影响。当下一次重试开始是,channel状态转换为 CONNECTING

    Channel就绪条件

    由于网络IO读写的异步性,启用健康检查机制后,客服端有可能在接收到服务健康状态之前,已经存在(待运行)RPC调用。此时如果直接调用RPC接口,就会出现一些未知的情况。当第一次建立连接是,该问题可能会影响到更多的RPC。因为可能存在很多RPC排队等待通道连接,这些RPC将会同时发送。

    为了避免上述情况,客户端在channel通道就绪之前,必须等待初始健康检查响应。

    Example 代码

    完整代码

    serviceConfig := grpc.WithDefaultServiceConfig(`{
      "loadBalancingPolicy": "round_robin", //负载均衡策略
      "healthCheckConfig": {
        "serviceName": "" //指定服务名称
      }
    }`)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    服务端代码

    func main() {
    	flag.Parse()
    
    	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
    	if err != nil {
    		log.Fatalf("failed to listen: %v", err)
    	}
    
    	s := grpc.NewServer()
    	//启动健康检查服务
    	healthcheck := health.NewServer()
    	healthgrpc.RegisterHealthServer(s, healthcheck)
    	pb.RegisterEchoServer(s, &echoServer{})
    
    	go func() {
    		// 异步检查依赖并切换状态
    		// 初始化设置为服务正常状态
    		next := healthpb.HealthCheckResponse_SERVING
    		for {
    			//设置服务健康状态
    			healthcheck.SetServingStatus(system, next)
    
    			if next == healthpb.HealthCheckResponse_SERVING {
    				// 暂停休眠后,模拟设置服务状态为不可用
    				next = healthpb.HealthCheckResponse_NOT_SERVING
    			} else {
    				// 恢复服务状态为可用状态
    				next = healthpb.HealthCheckResponse_SERVING
    			}
    			//暂停 模拟数据发送
    			time.Sleep(*sleep)
    		}
    	}()
    
    	if err := s.Serve(lis); err != nil {
    		log.Fatalf("failed to serve: %v", err)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    客户端代码

    • 客户端启动健康检查
    // step 1 定义服务配置
    var serviceConfig = `{
    	"loadBalancingPolicy": "round_robin",
    	"healthCheckConfig": {
    		"serviceName": ""
    	}
    }`
    
    // step2 开启负载均衡策略 并指定健康检查服务名称
    options := []grpc.DialOption{
    		grpc.WithTransportCredentials(insecure.NewCredentials()),
    		grpc.WithBlock(),
    		grpc.WithResolvers(r),
    		grpc.WithDefaultServiceConfig(serviceConfig),
    	}
    
    // step3 这一步非常关键 通过init方法启动客服端检查
    import _ "google.golang.org/grpc/health"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 客户端健康检查核心代码

      • 初始化客户端健康检查

        func init() {
        	fmt.Println("client health check init ..")
        	internal.HealthCheckFunc = clientHealthCheck
        }
        
        • 1
        • 2
        • 3
        • 4
      • 重试间隔时间

        var (
        	backoffStrategy = backoff.DefaultExponential
        	backoffFunc     = func(ctx context.Context, retries int) bool {
        		d := backoffStrategy.Backoff(retries)
        		//通过定时器 指定重试间隔时间
        		timer := time.NewTimer(d)
        		select {
        		case <-timer.C:
        			return true
        		case <-ctx.Done():
        			timer.Stop()
        			return false
        		}
        	}
        )
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
      • 健康检查核心逻辑

        const healthCheckMethod = "/grpc.health.v1.Health/Watch"
        
        func clientHealthCheck(ctx context.Context,
        	newStream func(string) (interface{}, error),
        	setConnectivityState func(connectivity.State, error),
        	service string) error {
        	tryCnt := 0
        
        retryConnection:
        	for {
        		// 连接失败 进行重试
        		// Backs off if the connection has failed in some way without receiving a message in the previous retry.
        		if tryCnt > 0 && !backoffFunc(ctx, tryCnt-1) {
        			return nil
        		}
        		tryCnt++
        
        		if ctx.Err() != nil {
        			return nil
        		}
        		// 设置channel 为 connecting 状态
        		setConnectivityState(connectivity.Connecting, nil)
        		//通过stream 连接流 连接server Watch 方法,完成健康检查数据连接通道
        		rawS, err := newStream(healthCheckMethod)
        		if err != nil {
        			continue retryConnection
        		}
        
        		s, ok := rawS.(grpc.ClientStream)
        		// Ideally, this should never happen. But if it happens, the server is marked as healthy for LBing purposes.
        		if !ok {
        			// channel 设置为 ready 状态 (UNIMPLEMENTED)
        			setConnectivityState(connectivity.Ready, nil)
        			return fmt.Errorf("newStream returned %v (type %T); want grpc.ClientStream", rawS, rawS)
        		}
        
        		// 发送健康检查请求
        		if err = s.SendMsg(&healthpb.HealthCheckRequest{Service: service}); err != nil && err != io.EOF {
        			// Stream should have been closed, so we can safely continue to create a new stream.
        			continue retryConnection
        		}
        		s.CloseSend()
        
        		//检查状态
        		resp := new(healthpb.HealthCheckResponse)
        		for {
        			err = s.RecvMsg(resp)
        
        			// Reports healthy for the LBing purposes if health check is not implemented in the server.
        			if status.Code(err) == codes.Unimplemented {
        				setConnectivityState(connectivity.Ready, nil)
        				return err
        			}
        
        			// Reports unhealthy if server's Watch method gives an error other than UNIMPLEMENTED.
        			if err != nil {
        				setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but received health check RPC error: %v", err))
        				continue retryConnection
        			}
        
        			// As a message has been received, removes the need for backoff for the next retry by resetting the try count.
        			tryCnt = 0
        			if resp.Status == healthpb.HealthCheckResponse_SERVING {
        				setConnectivityState(connectivity.Ready, nil)
        			} else {
        				setConnectivityState(connectivity.TransientFailure, fmt.Errorf("connection active but health check failed. status=%s", resp.Status))
        			}
        		}
        	}
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
        • 12
        • 13
        • 14
        • 15
        • 16
        • 17
        • 18
        • 19
        • 20
        • 21
        • 22
        • 23
        • 24
        • 25
        • 26
        • 27
        • 28
        • 29
        • 30
        • 31
        • 32
        • 33
        • 34
        • 35
        • 36
        • 37
        • 38
        • 39
        • 40
        • 41
        • 42
        • 43
        • 44
        • 45
        • 46
        • 47
        • 48
        • 49
        • 50
        • 51
        • 52
        • 53
        • 54
        • 55
        • 56
        • 57
        • 58
        • 59
        • 60
        • 61
        • 62
        • 63
        • 64
        • 65
        • 66
        • 67
        • 68
        • 69
        • 70

    验证结果

    • 启动服务端

      //开启两个服务端进行,并设置不同的休眠时间
      go run server/main.go -port=50051 -sleep=5s
      go run server/main.go -port=50052 -sleep=10s
      
      • 1
      • 2
      • 3
    • 启动客户端

      go run client/main.go
      
      • 1
    • 结果截图

      在这里插入图片描述

  • 相关阅读:
    Lombok插件介绍、MyBatisPlus分页功能、控制台日志及删除banner
    华为设备链路聚合基础
    我用80行核心JS代码每个月躺着挣一瓶肥宅快乐水
    JUC并发编程——JMM详解(基于狂神说得到学习笔记)
    提交本地项目到GitHub
    2022年全国职业院校技能大赛网络安全A模块安全加固解析
    (Java实习生)每日10道面试题打卡——Java基础知识篇
    辅助治疗帕金森病机器人的可行性研究
    面试系列多线程:核心参数设置多少合适
    VUE父组件向子组件传递值
  • 原文地址:https://blog.csdn.net/u013433591/article/details/127840370