• Netty网络框架学习笔记-16(心跳(heartbeat)服务源码分析)


    Netty网络框架学习笔记-16(心跳(heartbeat)服务源码分析_2020.06.25)

    前言:

    Netty 作为一个网络框架,提供了诸多功能,比如编码解码等,Netty 还提供了非常重要的一个服务-----心跳机制 heartbeat。通过心跳检查对方是否有效,这是 RPC 框架中是必不可少的功能。下面我们分析一下 Netty 内部心跳服务源码实现。

    Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 三个 Handler 检测连接的有效性,

    重点分析 IdleStateHandler

    名称描述
    IdleStateHandler如果连接闲置时间过长,则会触发 IdleStateEvent 事件。在 ChannelInboundHandler 中可以覆盖 userEventTriggered(…) 方法来处理 IdleStateEvent。
    ReadTimeoutHandler在指定的时间间隔内没有接收到入站数据则会抛出 ReadTimeoutException 并关闭 Channel。ReadTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。
    WriteTimeoutHandler在指定的时间间隔内没有接收到入站数据则会抛出 WriteTimeoutException 并关闭 Channel。WriteTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。

    ReadTimeout 事件和 WriteTimeout 事件都会自动关闭连接,而且,属于异常处理,所以,这里只是介绍以下,我们重点看 IdleStateHandler。

    IdleStateHandler分析

    关键属性

    private final boolean observeOutput;  // 是否考虑出站时较慢的情况。默认值是 false
    private final long readerIdleTimeNanos; // 读事件空闲时间,0 则禁用事件
    private final long writerIdleTimeNanos; // 写事件空闲时间,0 则禁用事件
    private final long allIdleTimeNanos; // 读或写空闲时间,0 则禁用事件
    
    • 1
    • 2
    • 3
    • 4

    初始化一些操作

    @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
                // channelActive() event has been fired already, which means this.channelActive() will
                // not be invoked. We have to initialize here instead.
                initialize(ctx); // 进行初始化
            } else {
                // channelActive() event has not been fired yet.  this.channelActive() will be invoked
                // and initialization will occur there.
            }
        }
    
    private void initialize(ChannelHandlerContext ctx) {
            // Avoid the case where destroy() is called before scheduling timeouts.
            // See: https://github.com/netty/netty/issues/143
            switch (state) {
            case 1:
            case 2:
                return;
            default:
                 break;
            }
    
            state = 1;
            initOutputChanged(ctx);
    		// 获取当前系统纳秒时间
            lastReadTime = lastWriteTime = ticksInNanos();
        	// 读超时 大于0 说明关注了该事件
            if (readerIdleTimeNanos > 0) {
                readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                        readerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        	// 写超时 大于0 说明关注了该事件
            if (writerIdleTimeNanos > 0) {
                writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                        writerIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        	// 读或写空闲时间, 大于0 说明关注了该事件
            if (allIdleTimeNanos > 0) {
                allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                        allIdleTimeNanos, TimeUnit.NANOSECONDS);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    schedule 方法

    Future<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        	// 往当前处理器中的执行器添加了定时任务, ReaderIdleTimeoutTask,WriterIdleTimeoutTask,AllIdleTimeoutTask
            return ctx.executor().schedule(task, delay, unit); // 单位都是纳秒
        }
    
    • 1
    • 2
    • 3
    • 4

    三个定时任务类

    他们的父类

    private abstract static class AbstractIdleTask implements Runnable {
    
        private final ChannelHandlerContext ctx;
    
        AbstractIdleTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }
    
        @Override
        public void run() {
            // 当所有的定时任务, 必须是在当前通道是打开状态下, 才真正执行 run
            if (!ctx.channel().isOpen()) {
                return;
            }
    
            run(ctx);
        }
    
        protected abstract void run(ChannelHandlerContext ctx);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    ReaderIdleTimeoutTask

    protected void run(ChannelHandlerContext ctx) {
                // 读取超时时间, 纳秒
                long nextDelay = readerIdleTimeNanos;
                // 当通道阅读时候 这个值是true,  当通道读取完毕后, 这个值是false
                if (!reading) {
                    // 当前系统时间纳秒 减  最后一次读取时间 channelReadComplete(ChannelHandlerContext ctx)赋值
                    nextDelay -= ticksInNanos() - lastReadTime;
                }
    			// 当小于或等于0 说明已经到达超时时间
                if (nextDelay <= 0) {
                    // Reader is idle - set a new timeout and notify the callback.
                    // 设置新的监听超时的定时任务
                    readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstReaderIdleEvent;
                    firstReaderIdleEvent = false;
    
                    try {
                        // 创建对应事件类型,  是否是第一次触发事件
                        IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                        // 触发事件
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Read occurred before the timeout - set a new timeout with shorter delay.
                    // 没有超时情况下, 设置下一次监听触发的定时任务,
                    readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
    
    
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        // 触发(调用)当前处理器的 fireUserEventTriggered 事件
            ctx.fireUserEventTriggered(evt);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    WriterIdleTimeoutTask

    protected void run(ChannelHandlerContext ctx) {
    			// 最后一次写时间  hasOutputChanged() 赋值
                long lastWriteTime = IdleStateHandler.this.lastWriteTime;
        		// writerIdleTimeNanos是设置写超时时间,   当前纳秒 - 最后一次写时间
                long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
        		// 说明超时
                if (nextDelay <= 0) {
                    // Writer is idle - set a new timeout and notify the callback.
                     // 设置新的监听超时的定时任务
                    writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstWriterIdleEvent;
                    firstWriterIdleEvent = false;
    
                    try {
                        // 出站较慢数据的判断
                        if (hasOutputChanged(ctx, first)) {
                            return;
                        }
    					// 创建对应事件类型,  是否是第一次触发事件
                        IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                         // 触发事件
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Write occurred before the timeout - set a new timeout with shorter delay.
                    // 没有超时情况下, 设置下一次监听触发的定时任务,
                    writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    写任务的 run 代码逻辑基本和读任务的逻辑一样,唯一不同的就是有一个针对 出站较慢数据的判断 hasOutputChanged

    AllIdleTimeoutTask

    protected void run(ChannelHandlerContext ctx) {
    			// 读写超时纳秒
                long nextDelay = allIdleTimeNanos;
                if (!reading) {
                    // 当前纳秒时间减去 最后一次写或读 的时间 
                    nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
                }
        		// 若大于 0,说明超时了
                if (nextDelay <= 0) {
                    // Both reader and writer are idle - set a new timeout and
                    // notify the callback.
                     // 设置新的监听超时的定时任务
                    allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);
    
                    boolean first = firstAllIdleEvent;
                    firstAllIdleEvent = false;
    
                    try {
                        // 出站较慢数据的判断
                        if (hasOutputChanged(ctx, first)) {
                            return;
                        }
    					// 创建对应事件类型,  是否是第一次触发事件
                        IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                        // 触发事件
                        channelIdle(ctx, event);
                    } catch (Throwable t) {
                        ctx.fireExceptionCaught(t);
                    }
                } else {
                    // Either read or write occurred before the timeout - set a new
                    // timeout with shorter delay.
                    // 没有超时情况下, 设置下一次监听触发的定时任务,
                    allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    这个监控着所有的事件。当读写事件发生时,都会记录。代码逻辑和写事件的的基本一致:

    这里的时间计算是取读写事件中的最大值来的。然后像写事件一样,判断是否发生了写的慢的情况。

    总结:

    1. IdleStateHandler 可以实现心跳功能,当服务器和客户端没有任何读写交互时,并超过了给定的时间,则会触发用户 handler 的 userEventTriggered 方法。用户可以在这个方法中尝试向对方发送信息,如果发送失败,则关闭连接。
    2. IdleStateHandler 的实现基于 EventLoop 的定时任务,每次读写都会记录一个值,在定时任务运行的时候,通过计算当前时间和设置时间和上次事件发生时间的结果,来判断是否空闲。
    3. 内部有 3 个定时任务,分别对应读事件,写事件,读写事件。通常用户监听读写事件就足够了。
    4. 同时,IdleStateHandler 内部也考虑了一些极端情况:客户端接收缓慢,一次接收数据的速度超过了设置的空闲时间。Netty 通过构造方法中的 observeOutput 属性来决定是否对出站缓冲区的情况进行判断。
    5. 如果出站缓慢,Netty 不认为这是空闲,也就不触发空闲事件。但第一次无论如何也是要触发的。因为第一次无法判断是出站缓慢还是空闲。当然,出站缓慢的话,可能造成 OOM , OOM 比空闲的问题更大。
    6. 所以,当你的应用出现了内存溢出,OOM 之类,并且写空闲极少发生(使用了 observeOutput 为 true),那么就需要注意是不是数据出站速度过慢。
    7. 还有一个注意的地方:就是 ReadTimeoutHandler ,它继承自 IdleStateHandler,当触发读空闲事件的时候,就触发 ctx.fireExceptionCaught 方法,并传入一个 ReadTimeoutException,然后关闭 Socket。
    8. 而 WriteTimeoutHandler 的实现不是基于 IdleStateHandler 的,他的原理是,当调用 write 方法的时候,会创建一个定时任务,任务内容是根据传入的 promise 的完成情况来判断是否超出了写的时间。当定时任务根据指定时间开始运行,发现 promise 的 isDone 方法返回 false,表明还没有写完,说明超时了,则抛出异常。当 write 方法完成后,会打断定时任务。

    1

  • 相关阅读:
    负载均衡四层和七层的区别
    【2012NOIP普及组】T3. 摆花 试题解析
    百面深度学习-图神经网络
    JAVA1.8 jdk安装教程
    mysql- 锁的种类
    7. 吴恩达深度学习--搭建循环神经网络及其应用
    鉴源论坛 · 观模丨浅谈软件测试
    OpenHarmony实战开发-组件复用实践。
    Java api中文在线版
    第四章:Spring七大核心模块Bean、Core、Context
  • 原文地址:https://blog.csdn.net/weixin_44600430/article/details/125462232