• gRPC(Java) keepAlive机制研究


    结论

    1. gRPC keepAlive是grpc框架在应用层面连接保活的一种措施。即当grpc连接上没有业务数据时,是否发送pingpong,以保持连接活跃性,不因长时间空闲而被Server或操作系统关闭
    2. gRPC keepAlive在client与server都有,client端默认关闭(keepAliveTime为Long.MAX_VALUE), server端默认打开,keepAliveTime为2小时,即每2小时向client发送一次ping
    1. // io.grpc.internal.GrpcUtil
    2. public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
    1. KeepAlive的管理使用类io.grpc.internal.KeepAliveManager, 用于管理KeepAlive状态,ping任务调度与执行.

    Client端KeepAlive

    使用入口

    1. 我们在使用io.grpc框架创建grpc连接的时候,可以设置keeplive, 例如下面:
    1. NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //
    2. .usePlaintext() //
    3. .defaultLoadBalancingPolicy(props.getBalancePolicy()) //
    4. .maxInboundMessageSize(props.getMaxInboundMessageSize()) //
    5. .keepAliveTime(1,TimeUnit.MINUTES)
    6. .keepAliveWithoutCalls(true)
    7. .keepAliveTimeout(10,TimeUnit.SECONDS)
    8. .intercept(channelManager.getInterceptors()); //
    1. 其中与keepAlive相关的参数有三个,keepAliveTime,keepAliveTimeout,keepAliveWithoutCalls。这三个变量有什么作用呢?
    • keepAliveTime: 表示当grpc连接没有数据传递时,多久之后开始向server发送ping packet
    • keepAliveTimeout: 表示当发送完ping packet后多久没收到server回应算超时
    • keepAliveTimeoutCalls: 表示如果grpc连接没有数据传递时,是否keepAlive,默认为false

    简要时序列表

    Create & Start

    1. NettyChannelBuilder
    2. -----> NettyTransportFactory
    3. ---------> NettyClientTransport
    4. -------------> KeepAliveManager & NettyClientHandler

    响应各种事件
    当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。

    Server端KeepAlive

    使用入口

    1. // 只截取关键代码,详细代码请看`NettyServerBuilder`
    2. ServerImpl server = new ServerImpl(
    3. this,
    4. buildTransportServers(getTracerFactories()),
    5. Context.ROOT);
    6. for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
    7. notifyTarget.notifyOnBuild(server);
    8. }
    9. return server;
    10. // 在buildTransportServers方法中创建NettyServer
    11. List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
    12. for (SocketAddress listenAddress : listenAddresses) {
    13. NettyServer transportServer = new NettyServer(
    14. listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
    15. workerEventLoopGroupPool, negotiator, streamTracerFactories,
    16. getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
    17. maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
    18. maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
    19. permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
    20. transportServers.add(transportServer);
    21. }

    简要时序列表

    Create & Start

    1. NettyServerBuilder
    2. ---> NettyServer
    3. ---------> NettyServerTransport
    4. -------------> NettyServerHandler
    5. -----------------> KeepAliveEnforcer

    连接准备就绪
    调用 io.netty.channel.ChannelHandler的handlerAdded方法,关于此方法的描述:

    1. Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
    2. NettyServerHandler(handlerAdded)
    3. ---> 创建KeepAliveManager对象

    响应各种事件
    同Client

    KeepAliveEnforcer

    在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer
    此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。

    1. package io.grpc.netty;
    2. import com.google.common.annotations.VisibleForTesting;
    3. import com.google.common.base.Preconditions;
    4. import java.util.concurrent.TimeUnit;
    5. import javax.annotation.CheckReturnValue;
    6. /** Monitors the client's PING usage to make sure the rate is permitted. */
    7. class KeepAliveEnforcer {
    8. @VisibleForTesting
    9. static final int MAX_PING_STRIKES = 2;
    10. @VisibleForTesting
    11. static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2);
    12. private final boolean permitWithoutCalls;
    13. private final long minTimeNanos;
    14. private final Ticker ticker;
    15. private final long epoch;
    16. private long lastValidPingTime;
    17. private boolean hasOutstandingCalls;
    18. private int pingStrikes;
    19. public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {
    20. this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);
    21. }
    22. @VisibleForTesting
    23. KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {
    24. Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative");
    25. this.permitWithoutCalls = permitWithoutCalls;
    26. this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);
    27. this.ticker = ticker;
    28. this.epoch = ticker.nanoTime();
    29. lastValidPingTime = epoch;
    30. }
    31. /** Returns {@code false} when client is misbehaving and should be disconnected. */
    32. @CheckReturnValue
    33. public boolean pingAcceptable() {
    34. long now = ticker.nanoTime();
    35. boolean valid;
    36. if (!hasOutstandingCalls && !permitWithoutCalls) {
    37. valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;
    38. } else {
    39. valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;
    40. }
    41. if (!valid) {
    42. pingStrikes++;
    43. return !(pingStrikes > MAX_PING_STRIKES);
    44. } else {
    45. lastValidPingTime = now;
    46. return true;
    47. }
    48. }
    49. /**
    50. * Reset any counters because PINGs are allowed in response to something sent. Typically called
    51. * when sending HEADERS and DATA frames.
    52. */
    53. public void resetCounters() {
    54. lastValidPingTime = epoch;
    55. pingStrikes = 0;
    56. }
    57. /** There are outstanding RPCs on the transport. */
    58. public void onTransportActive() {
    59. hasOutstandingCalls = true;
    60. }
    61. /** There are no outstanding RPCs on the transport. */
    62. public void onTransportIdle() {
    63. hasOutstandingCalls = false;
    64. }
    65. /**
    66. * Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important
    67. * to use something like this instead of directly comparing nano times. See {@link
    68. * System#nanoTime}.
    69. */
    70. private static long compareNanos(long time1, long time2) {
    71. // Possibility of overflow/underflow is on purpose and necessary for correctness
    72. return time1 - time2;
    73. }
    74. @VisibleForTesting
    75. interface Ticker {
    76. long nanoTime();
    77. }
    78. @VisibleForTesting
    79. static class SystemTicker implements Ticker {
    80. public static final SystemTicker INSTANCE = new SystemTicker();
    81. @Override
    82. public long nanoTime() {
    83. return System.nanoTime();
    84. }
    85. }
    86. }
    1. 先来看pingAcceptable方法,此方法是判断是否接受client ping。
    • lastValidPingTime是上次client valid ping的时间, 连接建立时此时间等于KeepAliveEnforcer对象创建的时间。当client ping有效时,其等于当时ping的时间
    • hasOutstandingCalls其初始值为false,当连接activie时,其值为true,当连接idle时,其值为false。如果grpc调用为阻塞时调用,则调用时连接变为active,调用完成,连接变为idle.
    • permitWithoutCalls其值是创建NettyServer时传入,默认为false.
    • IMPLICIT_PERMIT_TIME_NANOS其值为常量,2h
    • minTimeNanos其值是创建NettyServer时传入,默认为5min.
    • MAX_PING_STRIKES其值为常量2
    1. resetCounters方法是当grpc当中有数据时会被调用,即有grpc调用时lastValidPingTime和pingStrikes会被重置。
    2. 如果client要想使用keepAlive,permitWithoutCalls值需要设置为true,而且cient keepAliveTime需要>=minTimeNanos
  • 相关阅读:
    可以直接打开小皮面板中的网站运行php文件,昨天下载了数据库插件,一直提示“服务器连接错误,如何解决?(相关搜索:建立数据库)
    RT-Thread UART
    HTTPS 的加密流程
    java火焰图收集
    使用GDIView排查GDI对象泄漏导致的程序UI界面绘制异常问题
    ASP.NET Core中创建中间件的几种方式
    冰狐智能辅助入门教程
    Python 图_系列之基于邻接矩阵实现广度、深度优先路径搜索算法
    数据分析实战 | 关联规则分析——购物车分析
    php+mysql幼儿园早教网站
  • 原文地址:https://blog.csdn.net/jh035512/article/details/127933489