调用失败的情况
对于第一种和第二种gRPC客户端会自动重试,因为服务端并未处理,所以始终可以重试(透明重试transparent retries),第三种需要手动设置重试策略
//在客户端设置重试策略( 新建重试配置retry.json)
1. name 指定需要配置异常重试的RPC方法,service是必填项,method是可选项
2. retryPolicy 指定重试策略
2.1 maxAttempts 最大重试次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。必须是大于 1 的整数,对于大于5的值会被视为5。
如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded
2.2 retryableStatusCode 重试状态码,当 RPC 调用返回非 OK 响应,会根据 retryableStatusCode 来判断是否进行重试,
GRPC并没有提供自定义CODE的功能,所以只能用内置的CODE
2.3 initialBackoff,maxBackoff,backoffMultiplier 指数退避参数,在进行下一次重试请求前,会计算需要等待的时间。
必须指定,并且必须具有大于0。第一次重试间隔是random(0, initialBackoff),
第 n 次的重试间隔为random(0, min( initialBackoff*backoffMultiplier**(n-1) , maxBackoff))
{
"methodConfig": [
{
"name": [
{
"service": "cn.jannal.grpc.facade.dto.HelloService",
"method": "hello"
}
],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE"
]
}
}
]
}
客户端重试配置代码
@Slf4j
public class GrpcConsumerRetry {
public static final String IP = "127.0.0.1";
public static final int PORT = 8082;
@Test
public void testRetry() {
ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
.defaultServiceConfig(getRetryingServiceConfig())
.enableRetry() // 重要,客户端是默认关闭了重试的
.usePlaintext()// 启用明文
.build();
// 同步调用
HelloServiceGrpc.HelloServiceBlockingStub stub
= HelloServiceGrpc.newBlockingStub(channel);
HelloResponse helloResponse = stub.hello(HelloRequest.newBuilder()
.setFirstName("Jannal")
.setLastName("Jan")
.build());
log.info("Response received from server:{}", helloResponse);
channel.shutdown();
}
protected static Map<String, ?> getRetryingServiceConfig() {
return new Gson()
.fromJson(new JsonReader(new InputStreamReader(
Objects.requireNonNull(GrpcConsumerRetry.class.getClassLoader().getResourceAsStream(
"retry.json")), StandardCharsets.UTF_8)),
Map.class);
}
}
对冲是指在不等待响应的情况主动发送单次调用的多个请求。请求的流程
注意事项:使用对冲的时候,请求可能会访问到不同的后端(如果设置了负载均衡),所以要求对冲策略应该只用于幂等的操作
对冲策略配置
1. name 指定需要配置对冲策略的RPC方法,service是必填项,method是可选项
2. hedgingPolicy 指定对冲策略
2.1 maxAttempts 最大请求次数,指定一次RPC 调用中最多的请求次数,包括第一次请求。 必须是大于1的整数,对于大于5的值会被视为5。
如果设置了调用的过期时间,那么到了过期时间,无论重试情况都会返回超时错误DeadlineExceeded
2.2 hedgingDelay 等待时间,如果hedgingDelay时间内没有响应,那么直接发送第二次请求,如果指定0S,会立即将maxAttempts个请求发出
2.3 nonFatalStatusCodes 当对冲请求接收到nonFatalStatusCodes后,会立即发送下一个对冲请求,不管 hedgingDelay。
如果收到其他的状态码,则所有未完成的对冲请求都将被取消,并且将状态码返回给调用者。
本质上,对冲可以看做是收到FatalStatusCodes 前对RPC调用的重试。
可选的字段,因为在上一个请求没有响应的时候也会发送对冲请求
{
"methodConfig": [
{
"name": [
{
"service": "cn.jannal.grpc.facade.dto.HelloService",
"method": "hello"
}
],
"hedgingPolicy": {
"maxAttempts": 3,
"hedgingDelay": "1s",
"nonFatalStatusCodes":[
]
}
}
]
}
当客户端的失败和成功比超过某个阈值时,gRPC 会通过禁用这些重试策略来防止由于重试导致服务器过载
重试限流是根据服务器来设置的,而不是针对方法或者服务。对于每个 server,gRPC 的客户端都维护了一个 token_count 变量,变量初始值为配置的 maxTokens 值,值的范围是 0 - maxToken,每次 RPC 请求都会影响这个 token_count 变量值
每次失败的 RPC 请求都会对 token_count 减 1
每次成功的 RPC 请求都会对 token_count 增加 tokenRatio 值
如果token_count <= (maxTokens / 2)
,那么后续发出的请求即使失败也不会进行重试了,但是正常的请求还是会发出去,直到这个token_count > (maxTokens / 2)
才又恢复对失败请求的重试。这种策略可以有效的处理长时间故障。tokenRatio介于0~1之间,支持3位小数
设置超时时间,设置整合stub超时和当前请求的超时
@Slf4j
public class GrpcConsumerTimeout {
public static final String IP = "127.0.0.1";
public static final int PORT = 8082;
@Test
public void testTimeout() {
ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
.usePlaintext()// 启用明文
.build();
HelloServiceGrpc.HelloServiceBlockingStub stub
= HelloServiceGrpc.newBlockingStub(channel)
//设置超时时间,这是设置整个stub的deadline
.withDeadlineAfter(3, TimeUnit.SECONDS);
HelloResponse helloResponse = stub
//设置当前请求的deadline
.withDeadlineAfter(3, TimeUnit.SECONDS)
.hello(HelloRequest.newBuilder()
.setFirstName("Jannal")
.setLastName("Jan")
.build());
log.info("Response received from server:{}", helloResponse);
channel.shutdown();
}
}
有时需要将rest动态转换为gRPC的调用,可以使用反射创建请求对象
第一种通过内省填充JavaBean对象
@Test
public void testReflection() throws ClassNotFoundException,
NoSuchMethodException,
InvocationTargetException,
IllegalAccessException,
InvalidProtocolBufferException {
//可以通过参数传递过来
String requestClassName = HelloRequest.class.getName();
//构建请求的Message
Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
.loadClass(requestClassName);
Method method = requestMessageClass.getMethod("newBuilder");
Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
//填充setter方法
Descriptors.Descriptor descriptor = builder.getDescriptorForType();
Descriptors.FieldDescriptor firstNameField = descriptor.findFieldByName("firstName");
builder.setField(firstNameField, "Tom");
Descriptors.FieldDescriptor lastNameField = descriptor.findFieldByName("lastName");
builder.setField(lastNameField, "Jannal");
Message requestMessage = builder.build();
ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
.usePlaintext()// 启用明文
.build();
String serviceGrpcName = HelloServiceGrpc.class.getName();
String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
//调用newBlockingStub获取一个stub对象
Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
log.info("{}", stub);
String methodName = "hello";
//使用stub调用远程方法
Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
// HellResponse实现了Message接口
log.info("请求数据:{}", requestMessage);
Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
log.info("服务端响应数据:{}", helloResponse);
String json = JsonFormat.printer().print(helloResponse);
log.info("服务端响应数据JSON格式:{}", json);
channel.shutdown();
}
第二种通过请求的JSON直接填充请求对象
//添加依赖 protobuf与json之间相互转换
compile "com.googlecode.protobuf-java-format:protobuf-java-format"
@Test
public void testReflectionJson() throws ClassNotFoundException,
NoSuchMethodException,
InvocationTargetException,
IllegalAccessException,
InvalidProtocolBufferException {
//json转protobuf
String json = "{\n" +
"\tfirstName:\"zhangsan\",\n" +
"\tlastName:\"lisi\"\n" +
"}";
//可以通过参数传递过来
String requestClassName = HelloRequest.class.getName();
Class<?> requestMessageClass = GrpcConsumerReflection.class.getClassLoader()
.loadClass(requestClassName);
Method method = requestMessageClass.getMethod("newBuilder");
Message.Builder builder = (Message.Builder) method.invoke(null, new Object[]{});
JsonFormat.parser().merge(json, builder);
Message requestMessage = builder.build();
ManagedChannel channel = ManagedChannelBuilder.forAddress(IP, PORT)
.usePlaintext()// 启用明文
.build();
String serviceGrpcName = HelloServiceGrpc.class.getName();
String serviceGrpcNameBlockingStub = HelloServiceGrpc.HelloServiceBlockingStub.class.getName();
Class<?> grpcClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcName);
Class<?> stubClass = GrpcConsumerReflection.class.getClassLoader().loadClass(serviceGrpcNameBlockingStub);
//调用newBlockingStub获取一个stub对象
Method stubMethod = grpcClass.getMethod("newBlockingStub", Channel.class);
AbstractStub<?> stub = (AbstractStub<?>) stubMethod.invoke(null, new Object[]{channel});
log.info("{}", stub);
String methodName = "hello";
//使用stub调用远程方法
Method rpcMethod = stubClass.getMethod(methodName, HelloRequest.class);
// HellResponse实现了Message接口
log.info("请求数据:{}", requestMessage);
Message helloResponse = (Message) rpcMethod.invoke(stub, requestMessage);
log.info("服务端响应数据:{}", helloResponse);
String responseJson = JsonFormat.printer().print(helloResponse);
log.info("服务端响应数据JSON格式:{}", responseJson);
channel.shutdown();
}