• 第三期:gRPC客户端与服务端连接失败后,是否会有重试机制?


    grpc 版本1.50

    client.go 代码:

    func main() {
     flag.Parse()
     // Set up a connection to the server.
     conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
     if err != nil {
      log.Fatalf("did not connect: %v", err)
     }
     defer conn.Close()
     c := pb.NewGreeterClient(conn)

     // Contact the server and print out its response.
     ctx, cancel := context.WithTimeout(context.Background(), time.Second)
     defer cancel()
     r, err := c.SayHello(ctx, &pb.HelloRequest{Name: *name})
     if err != nil {
      log.Fatalf("could not greet: %v", err)
     }
     log.Printf("Greeting: %s", r.GetMessage())
    }
    • 1

    Dial 源码:

    func Dial(target string, opts ...DialOption) (*ClientConn, error) {
     return DialContext(context.Background(), target, opts...)
    }
    • 1

    DialContext 源码:

    省略次部分代码

    // 首先会创建 ClientConn,初始化相关字段
    func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
     cc := &ClientConn{
      target:            target,
      csMgr:             &connectivityStateManager{},
      conns:             make(map[*addrConn]struct{}),
      dopts:             defaultDialOptions(),
      blockingpicker:    newPickerWrapper(),
      czData:            new(channelzData),
      firstResolveEvent: grpcsync.NewEvent(),
     }

     // 将用户设置的连接参数更新到客户端连接器 ClientConn
     for _, opt := range opts {
      opt.apply(&cc.dopts)
     }

     return cc, nil
    }
    • 1

    connect() 方法:

    func (ac *addrConn) connect() error {
     ac.mu.Lock()
     // if 校验状态
     if ac.state == connectivity.Shutdown {
      ac.mu.Unlock()
      return errConnClosing
     }
     if ac.state != connectivity.Idle {
      ac.mu.Unlock()
      return nil
     }
     ac.updateConnectivityState(connectivity.Connecting, nil)
     ac.mu.Unlock()
     // 主要看这个方法,重试连接
     ac.resetTransport()
     return nil
    }
    • 1

    进入 resetTransport() 源码

    func (ac *addrConn) resetTransport() {
     ac.mu.Lock()
     // 判断状态若为 shutdown,则不再连接直接推出
     if ac.state == connectivity.Shutdown {
      ac.mu.Unlock()
      return
     }

     addrs := ac.addrs
     // 连接失败,需要进行重试的
     // Backoff 是需要等待的时间,ac.backoffIdx表示第几次重试
     backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
     // 计算出本次向gRPC服务尝试建立TCP连接的最长时间
     // 若超过这个时间还是连接不上,则主动断开,等待尝试下次连接
     // This will be the duration that dial gets to finish.
     dialDuration := minConnectTimeout
     if ac.dopts.minConnectTimeout != nil {
      dialDuration = ac.dopts.minConnectTimeout()
     }

     if dialDuration < backoffFor {
      // Give dial more time as we keep failing to connect.
      dialDuration = backoffFor
     }
     connectDeadline := time.Now().Add(dialDuration)

     // 更新结构addrConn状态为connecting
     ac.updateConnectivityState(connectivity.Connecting, nil)
     ac.mu.Unlock()

     // 向服务器连接失败后需要做的逻辑
     if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
      ac.cc.resolveNow(resolver.ResolveNowOptions{})
      // After exhausting all addresses, the addrConn enters
      // TRANSIENT_FAILURE.
      ac.mu.Lock()
      if ac.state == connectivity.Shutdown {
       ac.mu.Unlock()
       return
      }
      ac.updateConnectivityState(connectivity.TransientFailure, err)

      // Backoff.
      b := ac.resetBackoff
      ac.mu.Unlock()

      // 定时的超时间
      timer := time.NewTimer(backoffFor)
      // 1.timer.C如果连接超时,重试的次数+1,继续下一次重试连接,但需要等待一段时间
      // 2. b,直接关闭,将继续进行重新连接
      // 2. ac.ctx.Done,走这里的话,上下文结束,这里不会再次重试了
      select {
      case <-timer.C:
       ac.mu.Lock()
       ac.backoffIdx++
       ac.mu.Unlock()
      case <-b:
       timer.Stop()
      case <-ac.ctx.Done():
       timer.Stop()
       return
      }

      ac.mu.Lock()
      // 状态 != shutdown就更新为空闲状态
      if ac.state != connectivity.Shutdown {
       ac.updateConnectivityState(connectivity.Idle, err)
      }
      ac.mu.Unlock()
      return
     }
     // 连接成功,重新设置backoff为原始值0
     ac.mu.Lock()
     ac.backoffIdx = 0
     ac.mu.Unlock()
    }
    • 1

    如何计算重试连接等待时间?

    进入Backoff方法


    func (bc Exponential) Backoff(retries int) time.Duration {
     if retries == 0 {
      return bc.Config.BaseDelay
     }
     backoff, max := float64(bc.Config.BaseDelay), float64(bc.Config.MaxDelay)
     for backoff < max && retries > 0 {
      // 幂次方
      backoff *= bc.Config.Multiplier
      retries--
     }
     // 不能超过最大延时时间
     if backoff > max {
      backoff = max
     }
     // Randomize backoff delays so that if a cluster of requests start at
     // the same time, they won't operate in lockstep.
     backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
     if backoff < 0 {
      return 0
     }
     return time.Duration(backoff)
    }
    • 1

    总结:

    • 连接失败后,客户端会进行重试连接。
    • 重试次数越多,等待下一次连接时间也会变长,但不能超过MaxDelay值。

    更多Go云原生学习资料,收录于Github:https://github.com/metashops/GoFamily

    本文由 mdnice 多平台发布

  • 相关阅读:
    Python 获取当前容器 CPU 负载和内存
    【编程DIY】一.几个有趣的小程序
    java中一些注解的作用
    Elucidating the Design Space of Diffusion-Based Generative Models 阅读笔记
    数据中台-资产管理、数据安全
    Java之TreeSet
    微信小程序使用路由传参和传对象的方法
    有趣之matlab-烟花
    阿里云域名免费配置HTTPS
    shell 字符串变量
  • 原文地址:https://blog.csdn.net/realize_dream/article/details/126556721