• 4.keepalive 与 Idle 监测


    为什么需要 keepalive ?

    假设你开了一个饭店,别人电话来订餐,电话通了后,订餐的说了一堆订餐要求,说着说
    着,对方就不讲话了(可能忘记挂机/出去办事/线路故障等)。
    这个时候你会一直握着电话等么? 不会
    如果不会,那你一般怎么去做?会确认一句“你还在么?”,如果对方没有回复,挂机。这套机制即“keepalive
    在这里插入图片描述

    怎么设计 keepalive ?以 TCP keepalive 为例

    net.ipv4.tcp_keepalive_time = 7200 
    net.ipv4.tcp_keepalive_intvl = 75
    net.ipv4.tcp_keepalive_probes = 9
    
    • 1
    • 2
    • 3

    当启用(默认关闭)keepalive 时,TCP 在连接没有数据通过的7200秒后发送 keepalive 消息,当探测没有确认时,按75秒的重试频率重发,一直发 9 个探测包都没有确认,就认定连接失效。
    所以总耗时一般为:2 小时 11 分钟 (7200 秒 + 75 秒* 9 次)

    为什么还需要应用层 keepalive ?

    TCP 层的 keepalive 默认关闭,且经过路由等中转设备 keepalive 包可能会被丢弃。
    TCP 层的 keepalive 时间太长:默认 > 2 小时,虽然可改,但属于系统参数,改动影响所有应用。

    Idle 监测是什么?

    生活场景:
    假设你开了一个饭店,别人电话来订餐,电话通了后,订餐的说了一堆订餐要求,说着说着,对方就不讲话了。
    你会立马发问:你还在么?
    • 不会
    • 一般你会稍微等待一定的时间,在这个时间内看看对方还会不会说话(Idle 检测),如果还不说,认定对方存在问题(Idle),于是开始发问“你还在么?”(keepalive),或者问都不问干脆直接挂机(关闭连接)。

    Idle 监测,只是负责诊断,诊断后,做出不同的行为,决定 Idle 监测的最终用途:

    1. 发送 keepalive :一般用来配合 keepalive ,减少 keepalive 消息。
    2. 直接关闭连接

    如何在Netty 中开启 TCP keepalive 和 Idle 检测

    开启keepalive:
    Server 端开启 TCP keepalive

    bootstrap.childOption(ChannelOption.SO_KEEPALIVE,true) 
    bootstrap.childOption(NioChannelOption.of(StandardSocketOptions.SO_KEEPALIVE), true)
    提示:.option(ChannelOption.SO_KEEPALIVE,true) 存在但是无效
    
    • 1
    • 2
    • 3

    开启不同的 Idle Check:

    ch.pipeline().addLast(“idleCheckHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS));
    
    • 1

    源码解读

    读 Idle 检测的原理

    public enum IdleState {
        /**
         * No data was received for a while.
         */
        READER_IDLE, 读空闲
        /**
         * No data was sent for a while.
         */
        WRITER_IDLE, 写空闲
        /**
         * No data was either received or sent for a while.
         */
        ALL_IDLE  读 或 写 任一空闲。
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    public class IdleStateEvent {
        public static final IdleStateEvent FIRST_READER_IDLE_STATE_EVENT = new IdleStateEvent(IdleState.READER_IDLE, true);
        public static final IdleStateEvent READER_IDLE_STATE_EVENT = new IdleStateEvent(IdleState.READER_IDLE, false);
        public static final IdleStateEvent FIRST_WRITER_IDLE_STATE_EVENT = new IdleStateEvent(IdleState.WRITER_IDLE, true);
        public static final IdleStateEvent WRITER_IDLE_STATE_EVENT = new IdleStateEvent(IdleState.WRITER_IDLE, false);
        public static final IdleStateEvent FIRST_ALL_IDLE_STATE_EVENT = new IdleStateEvent(IdleState.ALL_IDLE, true);
        public static final IdleStateEvent ALL_IDLE_STATE_EVENT = new IdleStateEvent(IdleState.ALL_IDLE, false);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这个类根据上面的三种状态定义了发生 idle 时触发的 6 个事件,为什么是 6 个不是 3 个呢?对于第一次发生 idle ,抽出了第一次发生的事件,这样就能只对第一次发生 idle 时做处理,后面就可以忽略。

    IdleStateHandler extends ChannelDuplexHandler
    
    • 1

    继承自 ChannelDuplexHandler ,通过重写其 channelRead、write 等方法可以获得 channel 的读写状态并控制 IdleStateHandler 的生命周期。当读或写空闲时,将会触发 IdleStateEvent 事件。

     public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds) {
    
            this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
                 TimeUnit.SECONDS);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    三个参数分别代表三种形式

    private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
            if (observeOutput) {
                //正常情况下,false,即写空闲的判断中的写是指写成功,但是实际上,有可能遇到几种情况:
                //(1)写了,但是缓存区满了,写不出去;(2)写了一个大“数据”,写确实在“动”,但是没有完成。
                //所以这个参数,判断是否有“写的意图”,而不是判断“是否写成功”。
    
                // We can take this shortcut if the ChannelPromises that got passed into write()
                // appear to complete. It indicates "change" on message level and we simply assume
                // that there's change happening on byte level. If the user doesn't observe channel
                // writability events then they'll eventually OOME and there's clearly a different
                // problem and idleness is least of their concerns.
                if (lastChangeCheckTimeStamp != lastWriteTime) {
                    lastChangeCheckTimeStamp = lastWriteTime;
    
                    // But this applies only if it's the non-first call.
                    if (!first) {
                        return true;
                    }
                }
    
                Channel channel = ctx.channel();
                Unsafe unsafe = channel.unsafe();
                ChannelOutboundBuffer buf = unsafe.outboundBuffer();
    
                if (buf != null) {
                    int messageHashCode = System.identityHashCode(buf.current());
                    long pendingWriteBytes = buf.totalPendingWriteBytes();
    
                    if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                        lastMessageHashCode = messageHashCode;
                        lastPendingWriteBytes = pendingWriteBytes;
    
                        if (!first) {
                            return true;
                        }
                    }
    
                    long flushProgress = buf.currentProgress();
                    if (flushProgress != lastFlushProgress) {
                        lastFlushProgress = flushProgress;
    
                        if (!first) {
                            return true;
                        }
                    }
                }
            }
    
            return false;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    关于observeOutput参数的解析,默认false

    private void initialize(ChannelHandlerContext ctx) {
            // Avoid the case where destroy() is called before scheduling timeouts.
            // See: https://github.com/netty/netty/issues/143
            switch (state) {
            case 1:
            case 2:
                return;
            }
    
            state = 1;
            initOutputChanged(ctx);
    
            lastReadTime = lastWriteTime = ticksInNanos();
            if (readerIdleTimeNanos > 0) {
                readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                        readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (writerIdleTimeNanos > 0) {
                writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                        writerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
            if (allIdleTimeNanos > 0) {
                allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                        allIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    初始化函数,在注册的时候会被调用, 根据时间初始了三个定时任务

    ReaderIdleTimeoutTask
    private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
    
          ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
              super(ctx);
          }
    
          @Override
          protected void run(ChannelHandlerContext ctx) {
              // 下一次检测的定时任务的时间
              long nextDelay = readerIdleTimeNanos;
              // 不处于正在读取状态
              if (!reading) {
                  // 配置的空闲时间 - 实际空闲时间
                  nextDelay -= ticksInNanos() - lastReadTime;
              }
              // 小于 0 说明读空闲,也就是说空闲时间大于配置的读空闲时间
              if (nextDelay <= 0) {
                  // 重新设置定时任务的空闲时间
                  // Reader is idle - set a new timeout and notify the callback.
                  readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                  // 标志是否是首次空闲
                  boolean first = firstReaderIdleEvent;
                  firstReaderIdleEvent = false;
    
                  try {
                      // 创建读空闲事件
                      IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                      // 通知通道空闲事件
                      channelIdle(ctx, event);
                  } catch (Throwable t) {
                      ctx.fireExceptionCaught(t);
                  }
              } else {
                  // 空闲时间设置为 nextDelay , 按照最后一次读的时间作为开始计数
                  // Read occurred before the timeout - set a new timeout with shorter delay.
                  readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
              }
          }
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    @Override
        protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
            assert evt.state() == IdleState.READER_IDLE;
            readTimedOut(ctx);
        }
    
        /**
         * Is called when a read timeout was detected.
         */
        protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
            if (!closed) {
                // 抛出异常
                ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
                ctx.close();
                closed = true;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    前面说的 IdleStateHandler 作用是判断空闲,并创建了空闲事件,具体的处理逻辑还需要用户自行处理。ReadTimeoutHandler 做了一个简单的处理,就是发生读空闲事件时,就抛出异常。

    WriterIdleTimeoutTask触发写空闲时,是由WriteTimeoutHandler处理的吗
    不是的。WriterIdleTimeoutTask触发写空闲事件和WriteTimeoutHandler处理写超时事件是两个独立的过程。
    WriterIdleTimeoutTaskIdleStateHandler的一部分,它用于检测写操作的空闲状态。当服务器和客户端没有任何写交互,并且超过了给定的时间,WriterIdleTimeoutTask就会触发用户handler的userEventTriggered方法。
    WriteTimeoutHandler则是当执行channel的write方法后会执行延时任务,延时的时间就是设置的超时时间,然后isDone判断write是否执行完毕,如果write没有执行完毕就抛出异常并断开channel的连接。
    这两者的关联在于,它们都是用于处理写操作的超时情况,但是处理方式和触发条件有所不同。WriterIdleTimeoutTask主要是用于检测写操作的空闲状态,而WriteTimeoutHandler则是用于检测写操作的完成状态

  • 相关阅读:
    C++提高:03 STL- 常用容器_2
    【汇编】mov和add指令、确定物理地址的方法、内存分段表示法
    如何在 uniapp 里面使用 pinia 数据持久化 (pinia-plugin-persistedstate)
    【前端知识】Three 学习日志(三)—— 光源对物体表面的影响
    java物业服务信息系统计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
    Apache Poi 操作word,替换字符保留样式问题,runs段落混乱问题。
    借助这个宝藏神器,我成为全栈了
    2022我的前端面试总结
    什么是Generator函数?如何使用yield表达式他是干嘛的?next()方法是做什么的?
    14_自定义ItemDecoration实现qq好友列表分组效果
  • 原文地址:https://blog.csdn.net/anglybaby/article/details/138187260