- // io.grpc.internal.GrpcUtil
- public static final long DEFAULT_SERVER_KEEPALIVE_TIME_NANOS = TimeUnit.HOURS.toNanos(2L);
io.grpc.internal.KeepAliveManager
, 用于管理KeepAlive状态,ping任务调度与执行.- NettyChannelBuilder builder = NettyChannelBuilder.forTarget(String.format("grpc://%s", provider)) //
- .usePlaintext() //
- .defaultLoadBalancingPolicy(props.getBalancePolicy()) //
- .maxInboundMessageSize(props.getMaxInboundMessageSize()) //
- .keepAliveTime(1,TimeUnit.MINUTES)
- .keepAliveWithoutCalls(true)
- .keepAliveTimeout(10,TimeUnit.SECONDS)
- .intercept(channelManager.getInterceptors()); //
keepAliveTime
,keepAliveTimeout
,keepAliveWithoutCalls
。这三个变量有什么作用呢?Create & Start
- NettyChannelBuilder
- -----> NettyTransportFactory
- ---------> NettyClientTransport
- -------------> KeepAliveManager & NettyClientHandler
响应各种事件
当Active、Idle、DataReceived、Started、Termination事件发生时,更改KeepAlive状态,调度发送ping任务。
- // 只截取关键代码,详细代码请看`NettyServerBuilder`
- ServerImpl server = new ServerImpl(
- this,
- buildTransportServers(getTracerFactories()),
- Context.ROOT);
- for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
- notifyTarget.notifyOnBuild(server);
- }
- return server;
-
- // 在buildTransportServers方法中创建NettyServer
- List<NettyServer> transportServers = new ArrayList<>(listenAddresses.size());
- for (SocketAddress listenAddress : listenAddresses) {
- NettyServer transportServer = new NettyServer(
- listenAddress, resolvedChannelType, channelOptions, bossEventLoopGroupPool,
- workerEventLoopGroupPool, negotiator, streamTracerFactories,
- getTransportTracerFactory(), maxConcurrentCallsPerConnection, flowControlWindow,
- maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
- maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
- permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, getChannelz());
- transportServers.add(transportServer);
- }
-
Create & Start
- NettyServerBuilder
- ---> NettyServer
- ---------> NettyServerTransport
- -------------> NettyServerHandler
- -----------------> KeepAliveEnforcer
连接准备就绪
调用 io.netty.channel.ChannelHandler的handlerAdded
方法,关于此方法的描述:
- Gets called after the ChannelHandler was added to the actual context and it's ready to handle events.
- NettyServerHandler(handlerAdded)
- ---> 创建KeepAliveManager对象
响应各种事件
同Client
在上面Server端的简要时序图中,可以看见,server端有一个特有的io.grpc.netty.KeepAliveEnforcer
类
此类的作用是监控clinet ping的频率,以确保其在一个合理范围内。
- package io.grpc.netty;
-
- import com.google.common.annotations.VisibleForTesting;
- import com.google.common.base.Preconditions;
- import java.util.concurrent.TimeUnit;
- import javax.annotation.CheckReturnValue;
-
- /** Monitors the client's PING usage to make sure the rate is permitted. */
- class KeepAliveEnforcer {
- @VisibleForTesting
- static final int MAX_PING_STRIKES = 2;
- @VisibleForTesting
- static final long IMPLICIT_PERMIT_TIME_NANOS = TimeUnit.HOURS.toNanos(2);
-
- private final boolean permitWithoutCalls;
- private final long minTimeNanos;
- private final Ticker ticker;
- private final long epoch;
-
- private long lastValidPingTime;
- private boolean hasOutstandingCalls;
- private int pingStrikes;
-
- public KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit) {
- this(permitWithoutCalls, minTime, unit, SystemTicker.INSTANCE);
- }
-
- @VisibleForTesting
- KeepAliveEnforcer(boolean permitWithoutCalls, long minTime, TimeUnit unit, Ticker ticker) {
- Preconditions.checkArgument(minTime >= 0, "minTime must be non-negative");
-
- this.permitWithoutCalls = permitWithoutCalls;
- this.minTimeNanos = Math.min(unit.toNanos(minTime), IMPLICIT_PERMIT_TIME_NANOS);
- this.ticker = ticker;
- this.epoch = ticker.nanoTime();
- lastValidPingTime = epoch;
- }
-
- /** Returns {@code false} when client is misbehaving and should be disconnected. */
- @CheckReturnValue
- public boolean pingAcceptable() {
- long now = ticker.nanoTime();
- boolean valid;
- if (!hasOutstandingCalls && !permitWithoutCalls) {
- valid = compareNanos(lastValidPingTime + IMPLICIT_PERMIT_TIME_NANOS, now) <= 0;
- } else {
- valid = compareNanos(lastValidPingTime + minTimeNanos, now) <= 0;
- }
- if (!valid) {
- pingStrikes++;
- return !(pingStrikes > MAX_PING_STRIKES);
- } else {
- lastValidPingTime = now;
- return true;
- }
- }
-
- /**
- * Reset any counters because PINGs are allowed in response to something sent. Typically called
- * when sending HEADERS and DATA frames.
- */
- public void resetCounters() {
- lastValidPingTime = epoch;
- pingStrikes = 0;
- }
-
- /** There are outstanding RPCs on the transport. */
- public void onTransportActive() {
- hasOutstandingCalls = true;
- }
-
- /** There are no outstanding RPCs on the transport. */
- public void onTransportIdle() {
- hasOutstandingCalls = false;
- }
-
- /**
- * Positive when time1 is greater; negative when time2 is greater; 0 when equal. It is important
- * to use something like this instead of directly comparing nano times. See {@link
- * System#nanoTime}.
- */
- private static long compareNanos(long time1, long time2) {
- // Possibility of overflow/underflow is on purpose and necessary for correctness
- return time1 - time2;
- }
-
- @VisibleForTesting
- interface Ticker {
- long nanoTime();
- }
-
- @VisibleForTesting
- static class SystemTicker implements Ticker {
- public static final SystemTicker INSTANCE = new SystemTicker();
-
- @Override
- public long nanoTime() {
- return System.nanoTime();
- }
- }
- }
-
pingAcceptable
方法,此方法是判断是否接受client ping。lastValidPingTime
是上次client valid ping的时间, 连接建立时此时间等于KeepAliveEnforcer对象创建的时间。当client ping有效时,其等于当时ping的时间hasOutstandingCalls
其初始值为false,当连接activie时,其值为true,当连接idle时,其值为false。如果grpc调用为阻塞时调用,则调用时连接变为active,调用完成,连接变为idle.permitWithoutCalls
其值是创建NettyServer时传入,默认为false.IMPLICIT_PERMIT_TIME_NANOS
其值为常量,2hminTimeNanos
其值是创建NettyServer时传入,默认为5min.MAX_PING_STRIKES
其值为常量2resetCounters
方法是当grpc当中有数据时会被调用,即有grpc调用时lastValidPingTime和pingStrikes会被重置。permitWithoutCalls
值需要设置为true,而且cient keepAliveTime需要>=minTimeNanos