• gRPC之API详解


    异常重试

    1. 调用失败的情况

      • 第一种:客户端请求异常,此时未达到服务端
      • 第二种:请求达到服务端,但是服务端并未处理此请求
      • 第三种:服务端处理请求,但是处理结果是失败
    2. 对于第一种和第二种gRPC客户端会自动重试,因为服务端并未处理,所以始终可以重试(透明重试transparent retries),第三种需要手动设置重试策略

      //在客户端设置重试策略( 新建重试配置retry.json)
      1. name 指定需要配置异常重试的RPC方法,service是必填项,method是可选项
      2. retryPolicy 指定重试策略
         2.1 maxAttempts 最大重试次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。必须是大于 1 的整数,对于大于5的值会被视为5。
             如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded  
         2.2 retryableStatusCode 重试状态码,当 RPC 调用返回非 OK 响应,会根据 retryableStatusCode 来判断是否进行重试,
             GRPC并没有提供自定义CODE的功能,所以只能用内置的CODE
         2.3 initialBackoff,maxBackoff,backoffMultiplier 指数退避参数,在进行下一次重试请求前,会计算需要等待的时间。
        			必须指定,并且必须具有大于0。第一次重试间隔是random(0, initialBackoff), 
      				第 n 次的重试间隔为random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff)) 
      
      {
        "methodConfig": [
          {
            "name": [
              {
                "service": "cn.jannal.grpc.facade.dto.HelloService",
                "method": "hello"
              }
            ],
      
            "retryPolicy": {
              "maxAttempts": 5, 
              "initialBackoff": "0.5s",
              "maxBackoff": "30s",
              "backoffMultiplier": 2,
              "retryableStatusCodes": [
                "UNAVAILABLE" 
              ]
            }
          }
        ]
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
    3. 客户端重试配置代码

      @Slf4j
      public class GrpcConsumerRetry {
          public static final String IP = "127.0.0.1";
          public static final int PORT = 8082;
      
          @Test
          public void testRetry() {
              ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                      .defaultServiceConfig(getRetryingServiceConfig())
                      .enableRetry() // 重要,客户端是默认关闭了重试的
                      .usePlaintext()// 启用明文
                      .build();
              // 同步调用
              HelloServiceGrpc.HelloServiceBlockingStub stub
                      = HelloServiceGrpc.newBlockingStub(channel);
      
              HelloResponse helloResponse = stub.hello(HelloRequest.newBuilder()
                      .setFirstName("Jannal")
                      .setLastName("Jan")
                      .build());
              log.info("Response received from server:{}", helloResponse);
              channel.shutdown();
          }
      
          protected static Map<String, ?> getRetryingServiceConfig() {
              return new Gson()
                      .fromJson(new JsonReader(new InputStreamReader(
                                      Objects.requireNonNull(GrpcConsumerRetry.class.getClassLoader().getResourceAsStream(
                                              "retry.json")), StandardCharsets.UTF_8)),
                              Map.class);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32

    对冲策略

    1. 对冲是指在不等待响应的情况主动发送单次调用的多个请求。请求的流程

      • 第一次正常的请求正常发出
      • 在等待固定时间间隔后,没有收到正确的响应,第二个对冲请求会被发出
      • 再等待固定时间间隔后,没有收到任何前面两个请求的正确响应,第三个会被发出
      • 一直重复以上流程直到发出的对冲请求数量达到配置的最大次数
      • 一旦收到正确响应,所有对冲请求都会被取消,响应会被返回给应用层
    2. 注意事项:使用对冲的时候,请求可能会访问到不同的后端(如果设置了负载均衡),所以要求对冲策略应该只用于幂等的操作

    3. 对冲策略配置

      1. name 指定需要配置对冲策略的RPC方法,service是必填项,method是可选项
      2. hedgingPolicy 指定对冲策略
      	2.1 maxAttempts 最大请求次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。 必须是大于1的整数,对于大于5的值会被视为5。
      		如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded
      	2.2 hedgingDelay 等待时间,如果hedgingDelay时间内没有响应,那么直接发送第二次请求,如果指定0S,会立即将maxAttempts个请求发出
      	2.3 nonFatalStatusCodes 当对冲请求接收到nonFatalStatusCodes后,会立即发送下一个对冲请求,不管 hedgingDelay。
      			如果收到其他的状态码,则所有未完成的对冲请求都将被取消,并且将状态码返回给调用者。
      		  本质上,对冲可以看做是收到FatalStatusCodes 前对RPC调用的重试。
      			可选的字段,因为在上一个请求没有响应的时候也会发送对冲请求
      {
        "methodConfig": [
          {
            "name": [
              {
                "service": "cn.jannal.grpc.facade.dto.HelloService",
                "method": "hello"
              }
            ],
            "hedgingPolicy": {
              "maxAttempts": 3,
              "hedgingDelay": "1s",
              "nonFatalStatusCodes":[
                
              ]
            }
          }
        ]
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28

    重试限流

    1. 当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载

    2. 重试限流是根据服务器来设置的,而不是针对方法或者服务。对于每个 server,gRPC 的客户端都维护了一个 token_count 变量,变量初始值为配置的 maxTokens 值,值的范围是 0 - maxToken,每次 RPC 请求都会影响这个 token_count 变量值

      每次失败的 RPC 请求都会对 token_count 减 1
      每次成功的 RPC 请求都会对 token_count 增加 tokenRatio 值
      
      • 1
      • 2
    3. 如果token_count <= (maxTokens / 2),那么后续发出的请求即使失败也不会进行重试了,但是正常的请求还是会发出去,直到这个token_count > (maxTokens / 2) 才又恢复对失败请求的重试。这种策略可以有效的处理长时间故障。tokenRatio介于0~1之间,支持3位小数

    设置超时

    1. 设置超时时间,设置整合stub超时和当前请求的超时

      @Slf4j
      public class GrpcConsumerTimeout {
          public static final String IP = "127.0.0.1";
          public static final int PORT = 8082;
      
          @Test
          public void testTimeout() {
              ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                      .usePlaintext()// 启用明文
                      .build();
              HelloServiceGrpc.HelloServiceBlockingStub stub
                      = HelloServiceGrpc.newBlockingStub(channel)
                      //设置超时时间,这是设置整个stub的deadline
                      .withDeadlineAfter(3, TimeUnit.SECONDS);
      
              HelloResponse helloResponse = stub
                      //设置当前请求的deadline
                      .withDeadlineAfter(3, TimeUnit.SECONDS)
                      .hello(HelloRequest.newBuilder()
                              .setFirstName("Jannal")
                              .setLastName("Jan")
                              .build());
              log.info("Response received from server:{}", helloResponse);
              channel.shutdown();
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

    反射创建对象

    1. 有时需要将rest动态转换为gRPC的调用,可以使用反射创建请求对象

    2. 第一种通过内省填充JavaBean对象

      @Test
      public void testReflection() throws ClassNotFoundException,
              NoSuchMethodException,
              InvocationTargetException,
              IllegalAccessException,
              InvalidProtocolBufferException {
          //可以通过参数传递过来
          String requestClassName = HelloRequest.class.getName();
      
          //构建请求的Message
          Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
                  .loadClass(requestClassName);
          Method method = requestMessageClass.getMethod("newBuilder");
          Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
          //填充setter方法
          Descriptors.Descriptor descriptor = builder.getDescriptorForType();
          Descriptors.FieldDescriptor firstNameField = descriptor.findFieldByName("firstName");
          builder.setField(firstNameField, "Tom");
          Descriptors.FieldDescriptor lastNameField = descriptor.findFieldByName("lastName");
          builder.setField(lastNameField, "Jannal");
          Message requestMessage = builder.build();
      
      
          ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                  .usePlaintext()// 启用明文
                  .build();
      
          String serviceGrpcName = HelloServiceGrpc.class.getName();
          String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
          Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
          Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
          //调用newBlockingStub获取一个stub对象
          Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
          AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
          log.info("{}", stub);
      
          String methodName = "hello";
          //使用stub调用远程方法
          Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
          // HellResponse实现了Message接口
          log.info("请求数据:{}", requestMessage);
          Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
          log.info("服务端响应数据:{}", helloResponse);
      
          String json = JsonFormat.printer().print(helloResponse);
          log.info("服务端响应数据JSON格式:{}", json);
      
          channel.shutdown();
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
    3. 第二种通过请求的JSON直接填充请求对象

      //添加依赖 protobuf与json之间相互转换  
      compile "com.googlecode.protobuf-java-format:protobuf-java-format"   
        
      @Test
      public void testReflectionJson() throws ClassNotFoundException,
              NoSuchMethodException,
              InvocationTargetException,
              IllegalAccessException,
              InvalidProtocolBufferException {
      
          //json转protobuf
          String json = "{\n" +
                  "\tfirstName:\"zhangsan\",\n" +
                  "\tlastName:\"lisi\"\n" +
                  "}";
          //可以通过参数传递过来
          String requestClassName = HelloRequest.class.getName();
          Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
                  .loadClass(requestClassName);
          Method method = requestMessageClass.getMethod("newBuilder");
          Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
          JsonFormat.parser().merge(json, builder);
          Message requestMessage = builder.build();
      
      
          ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
                  .usePlaintext()// 启用明文
                  .build();
      
          String serviceGrpcName = HelloServiceGrpc.class.getName();
          String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
          Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
          Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
          //调用newBlockingStub获取一个stub对象
          Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
          AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
          log.info("{}", stub);
      
          String methodName = "hello";
          //使用stub调用远程方法
          Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
          // HellResponse实现了Message接口
          log.info("请求数据:{}", requestMessage);
          Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
          log.info("服务端响应数据:{}", helloResponse);
      
          String responseJson = JsonFormat.printer().print(helloResponse);
          log.info("服务端响应数据JSON格式:{}", responseJson);
      
          channel.shutdown();
      }  
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
  • 相关阅读:
    Spring框架(三)Spring注解和获取Bean对象详解
    MySQL 部署:初始化建议关注的参数
    教你轻松理解Go Ticker的用法和实现原理
    【基于FreeRTOS的STM32F103系统】Heap_4内存管理机制程序详解
    基于FPGA的电子万年历设计
    递归下降语法分析程序设计与实现
    深度学习——(11)Knowledge distillation理论
    Redis总结
    flutter系列之:flutter架构什么的,看完这篇文章就全懂了
    [Unity] GPU动画实现(三)——材质合并
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126247994