• Netty网络框架学习笔记-15(ChannelPipeline 调度 handler分析)


    Netty网络框架学习笔记-15(ChannelPipeline 调度 handler分析_2020.06.25)

    前言:

    当一个请求进来的时候,ChannelPipeline 是如何调用内部的这些 handler 链中的处理器的呢?

    是如何将处理结果选择是否传递给下一个处理器的呢?

    调度分析

    DefaultChannelPipeline 分析

    相关入站事件

    首先,当一个请求进来的时候,会第一个调用 DefaultChannelPipeline 的 相关方法,如果是入站事件,这些方法由 fire 开头, 表示开始管道的流动。让后面的 handler 继续处理

    	@Override
        public final ChannelPipeline fireChannelRegistered() {
            AbstractChannelHandlerContext.invokeChannelRegistered(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelUnregistered() {
            AbstractChannelHandlerContext.invokeChannelUnregistered(head);
            return this;
        }
    
    	@Override
        public final ChannelPipeline fireChannelActive() {
            AbstractChannelHandlerContext.invokeChannelActive(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelInactive() {
            AbstractChannelHandlerContext.invokeChannelInactive(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireExceptionCaught(Throwable cause) {
            AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireUserEventTriggered(Object event) {
            AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelReadComplete() {
            AbstractChannelHandlerContext.invokeChannelReadComplete(head);
            return this;
        }
    
        @Override
        public final ChannelPipeline fireChannelWritabilityChanged() {
            AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
            return this;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    可以看出, 这些方法都是 inbound 的方法, 也就是入站事件, 都是传入了 head 头部处理器最先开始进行处理, 然后在一步步传递下去给其他处理器, 这些静态方法则会调用 head ChannelInboundInvoker 接口的方法,再然后调用 handler 的真正方

    相关出站事件

    final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler

    		 @Override
            public void bind(
                    ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
                unsafe.bind(localAddress, promise);
            }
    
            @Override
            public void connect(
                    ChannelHandlerContext ctx,
                    SocketAddress remoteAddress, SocketAddress localAddress,
                    ChannelPromise promise) {
                unsafe.connect(remoteAddress, localAddress, promise);
            }
    
            @Override
            public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.disconnect(promise);
            }
    
            @Override
            public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.close(promise);
            }
    
            @Override
            public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) {
                unsafe.deregister(promise);
            }
    
            @Override
            public void read(ChannelHandlerContext ctx) {
                unsafe.beginRead();
            }
    
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                unsafe.write(msg, promise);
            }
    
            @Override
            public void flush(ChannelHandlerContext ctx) {
                unsafe.flush();
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    这些都是出站的实现,但是调用的是 outbound 类型的 head handler 来进行处理,因为这些都是 outbound 事件。

    出站是 tail 开始,入站从 head 开始。因为出站是从内部向外面写,从 tail 开始,能够让前面的 handler 进行处理,防止 handler 被遗漏,比如编码。反之,入站当然是从 head 往内部输入,让后面的 handler 能够处理这些输入的数据。比如解码。因此虽然 head 也实现了 outbound 接口,但不是从 head 开始执行出站任务

    入站-入口 (debug开始)

    AbstractNioByteChannel # read() => pipeline.fireChannelRead(byteBuf)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bJxhVmCa-1656147004519)(https://s1.ax1x.com/2022/06/25/jkKrjA.png)]

    出站-入口 (debug开始)

    AbstractChannelHandlerContext # writeAndFlush(Object msg)

    jk8zvD.png

    总结:

    Context 包装 handler,多个 Context 在 pipeline 中形成了双向链表,入站方向叫 inbound,由 head 节点开始,出站方法叫 outbound ,由 tail 节点开始。

    而节点中间的传递通过 AbstractChannelHandlerContext 类内部的 fire 系列方法,找到当前节点的下一个节点 , 不断的循环传播。是一个过滤器形式完成对 handler 的调度

    1

  • 相关阅读:
    14届蓝桥青少STEMA-C++组10月评测
    进入软件行业的几点建议
    Android基础篇 Android 数据存储与性能
    02-文本属性
    我做数画ai绘画教程日赚过千,良心分享给想兼职赚钱的人
    Android学习之路(19) ListView详解
    Ubuntu18.04 安装完成后的开发配置
    C# | Chaikin算法 —— 计算折线对应的平滑曲线坐标点
    1.6 IntelliJ IDEA开发工具
    超级实用的程序员接单平台,看完少走几年弯路,强推第一个!
  • 原文地址:https://blog.csdn.net/weixin_44600430/article/details/125461412