• Netty 学习(四):ChannelHandler 的事件传播和生命周期


    Netty 学习(四):ChannelHandler 的事件传播和生命周期

    作者: Grey

    原文地址:

    博客园:Netty 学习(四):ChannelHandler 的事件传播和生命周期

    CSDN:Netty 学习(四):ChannelHandler 的事件传播和生命周期

    ChannelHandler 的事件传播

    在通信客户端和服务端,处理的流程大致有如下步骤

    输入---> 解码 ---> 根据不同的消息指令解析数据包 ---> 编码 ---> 输出
    
    • 1

    在『根据不同的消息指令解析数据包』这个步骤中,经常需要用if-else来判断不同的指令类型并进行解析。逻辑一旦复杂,就会让代码变的极为臃肿,难以维护。

    Netty 中的 Pipeline 和 ChannelHandler 就是用来解决这个问题,它通过责任链设计模式来组织代码逻辑,并且能够支持逻辑的动态添加和删除。

    在 Netty 框架中,一个连接对应一个 Channel,这个 Channel 的所有处理逻辑都在 ChannelPipeline 的对象里,ChannelPipeline 是双向链表结构,它和 Channel 之间是一对一的关系。这个双向链表每个节点都是一个 ChannelHandlerContext 对象,这个对象可以获得和 Channel 相关的所有上下文信息。

    示例图如下

    image

    ChannelHandler 包括两个子接口:ChannelInboundHandler 和 ChannelOutboundHandler,分别用于处理读数据和写数据的逻辑。

    我们可以写一个示例来说明 ChannelHandler 的事件传播顺序(包含 ChannelInboundHandler 和 ChannelOutboundHandler)

    在服务端配置如下

          ch.pipeline().addLast(new InHandlerA());
          ch.pipeline().addLast(new InHandlerB());
          ch.pipeline().addLast(new InHandlerC());
          ch.pipeline().addLast(new OutHandlerA());
          ch.pipeline().addLast(new OutHandlerB());
          ch.pipeline().addLast(new OutHandlerC());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    其中 InHandlerA 代码如下(InHandlerB 和 InHandlerC 类似)

    package snippet.chat.server.inbound;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * @author Grey
     * @date 2022/9/19
     * @since
     */
    public class InHandlerA extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("in-A:" + msg);
            super.channelRead(ctx, msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    OutHandlerA 代码如下(OutHandlerB 和 OutHandlerC 类似)

    package snippet.chat.server.outbound;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelOutboundHandlerAdapter;
    import io.netty.channel.ChannelPromise;
    
    /**
     * @author Grey
     * @date 2022/9/19
     * @since
     */
    public class OutHandlerA extends ChannelOutboundHandlerAdapter {
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("out-A:" + msg);
            super.write(ctx, msg, promise);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    运行服务端和客户端,使用客户端向服务端发送一些数据,可以看到如下日志

    in-A:PooledUnsafeDirectByteBuf(ridx: 0, widx: 108, cap: 2048)
    in-B:PooledUnsafeDirectByteBuf(ridx: 0, widx: 108, cap: 2048)
    in-C:PooledUnsafeDirectByteBuf(ridx: 0, widx: 108, cap: 2048)
    ......
    out-C:PooledUnsafeDirectByteBuf(ridx: 0, widx: 39, cap: 256)
    out-B:PooledUnsafeDirectByteBuf(ridx: 0, widx: 39, cap: 256)
    out-A:PooledUnsafeDirectByteBuf(ridx: 0, widx: 39, cap: 256)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    由此可以知:inboundHandler 的添加顺序和执行顺序一致,而 outboundHandler 的添加顺序和执行顺序相反。 如下图示例

    image

    ChannelHandler 的生命周期

    可以用代码来说明 ChannelHandler 的生命周期,我们基于 ChannelInboundHandlerAdapter,定义了一个 LifeCycleTestHandler,完整代码如下

    package snippet.chat.client;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    
    /**
     * @author Grey
     * @date 2022/9/19
     * @since
     */
    public class LifeCycleTestHandler extends ChannelInboundHandlerAdapter {
        // 这个回调方法表示当前Channel的所有逻辑处理已经和某个NIO线程建立了绑定关系,接收新的连接,然后创建一个线程来处理这个连接的读写,只不过在Netty里使用了线程池的方式,
        // 只需要从线程池里去抓一个线程绑定在这个Channel上即可。这里的NIO线程通常指NioEventLoop
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel 绑定到线程(NioEventLoop):channelRegistered()");
            super.channelRegistered(ctx);
        }
    
        // 这个回调表明与这个连接对应的NIO线程移除了对这个连接的处理。
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel 取消线程(NioEventLoop)的绑定:channelUnregistered()");
            super.channelUnregistered(ctx);
        }
    
        // 当Channel的所有业务逻辑链准备完毕(即Channel的Pipeline中已经添加完所有的Handler),
    // 以及绑定好一个NIO线程之后,这个连接才真正被激活,接下来就会回调到此方法。
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel 准备就绪:channelActive()");
            super.channelActive(ctx);
        }
    
        // 这个连接在TCP层面已经不再是ESTABLISH状态了。
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel 被关闭:channelInactive()");
            super.channelInactive(ctx);
        }
    
        // 客户端向服务端发送数据,每次都会回调此方法,表示有数据可读。
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channel 有数据可读:channelRead()");
            super.channelRead(ctx, msg);
        }
    
        // 服务端每读完一次完整的数据,都回调该方法,表示数据读取完毕。
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channel 某次数据读完:channelReadComplete()");
            super.channelReadComplete(ctx);
        }
    
        // 表示在当前Channel中,已经成功添加了一个Handler处理器。
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("逻辑处理器被添加:handlerAdded()");
            super.handlerAdded(ctx);
        }
    
        // 我们给这个连接添加的所有业务逻辑处理器都被移除。
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("逻辑处理器被移除:handlerRemoved()");
            super.handlerRemoved(ctx);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    我们在服务端添加这个 Handler,然后启动服务端和客户端,可以看到服务台首先输出如下日志

    逻辑处理器被添加:handlerAdded()
    channel 绑定到线程(NioEventLoop):channelRegistered()
    channel 准备就绪:channelActive()
    channel 有数据可读:channelRead()
    Mon Sep 19 22:49:49 CST 2022: 收到客户端登录请求……
    Mon Sep 19 22:49:49 CST 2022: 登录成功!
    channel 某次数据读完:channelReadComplete()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    由日志可以看到,ChannelHandler 执行顺序为:

    handlerAdded()->channelRegistered()->channelActive()->channelRead()->channelReadComplete()

    关闭客户端,保持服务端不关闭,在服务端此时触发了 Channel 的关闭,打印日志如下

    channel 被关闭:channelInactive()
    channel 取消线程(NioEventLoop)的绑定:channelUnregistered()
    逻辑处理器被移除:handlerRemoved()
    
    • 1
    • 2
    • 3

    如上述日志可知,ChannelHandler 的执行顺序是

    channelInactive()->channelUnregistered()->handlerRemoved()

    整个 ChannelHandler 的生命周期如下图所示

    image

    图例

    本文所有图例见:processon: Netty学习笔记

    代码

    hello-netty

    更多内容见:Netty专栏

    参考资料

    跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

    深度解析Netty源码

  • 相关阅读:
    Flink DataStream 处理函数 ProcessFunction 和 KeyedProcessFunction
    UE5射击游戏案例蓝图篇(一)
    odoo17前端js框架的演化
    Delegate介绍
    springboot实现无数据库启动
    C++&QT day6
    Servlet 常见的API
    【数据结构-二叉树 九】【树的子结构】:树的子结构
    有10个数字要求分别用选择法和冒泡法从大到小输出 java 数组
    队列题目:用栈实现队列
  • 原文地址:https://blog.csdn.net/hotonyhui/article/details/126943967