• Netty 学习(十):ChannelPipeline源码说明


    Netty 学习(十):ChannelPipeline源码说明

    作者: Grey

    原文地址:

    博客园:Netty 学习(十):ChannelPipeline源码说明

    CSDN:Netty 学习(十):ChannelPipeline源码说明

    ChannelPipeline可以看作一条流水线,原料(字节流)进来,经过加工,形成一个个Java对象,然后基于这些对象进行处理,最后输出二进制字节流。

    ChannelPipeline 在创建 NioSocketChannel 的时候创建, 其默认实现是 DefaultChannelPipeline

        final AbstractChannelHandlerContext head;
        final AbstractChannelHandlerContext tail;
        protected DefaultChannelPipeline(Channel channel) {
            this.channel = ObjectUtil.checkNotNull(channel, "channel");
            succeededFuture = new SucceededChannelFuture(channel, null);
            voidPromise =  new VoidChannelPromise(channel, true);
    
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    

    ChannelPipeline 中保存了 Channel 的引用,且其中每个节点都是一个 ChannelHandlerContext 对象。每个 ChannelHandlerContext 节点都保存了执行器(即:ChannelHandler)。

    ChannelPipeline里有两种不同的节点,一种是 ChannelInboundHandler,处理 inbound 事件(例如:读取数据流),还有一种是 ChannelOutboundHandler,处理 Outbound 事件,比如调用writeAndFlush()类方法时,就会调用该 handler。

    添加 handler 的逻辑如下

        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                // 检查是否有重复的 handler
                checkMultiplicity(handler);
                // 创建 节点
                newCtx = newContext(group, filterName(name, handler), handler);
                // 添加节点
                addLast0(newCtx);
    
                // If the registered is false it means that the channel was not registered on an eventLoop yet.
                // In this case we add the context to the pipeline and add a task that will call
                // ChannelHandler.handlerAdded(...) once the channel is registered.
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
    
                EventExecutor executor = newCtx.executor();
                if (!executor.inEventLoop()) {
                    callHandlerAddedInEventLoop(newCtx, executor);
                    return this;
                }
            }
            // 回调用户方法
            callHandlerAdded0(newCtx);
            return this;
        }
    

    如上代码,整个添加过程见注释说明,其实主要就是四步:

    第一步:检查是否有重复的 handler,核心逻辑见

        private static void checkMultiplicity(ChannelHandler handler) {
            if (handler instanceof ChannelHandlerAdapter) {
                ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
                if (!h.isSharable() && h.added) {
                    // 非共享的且添加过,就抛出异常,反之,如果一个 handler 支持共享,就可以无限次被添加到 ChannelPipeline 中
                    throw new ChannelPipelineException(
                            h.getClass().getName() +
                            " is not a @Sharable handler, so can't be added or removed multiple times.");
                }
                h.added = true;
            }
        }
    

    第二步:创建节点,即把 handler 包裹成 ChannelHandlerContext,核心逻辑如下

        private static final FastThreadLocal, String>> nameCaches =
                new FastThreadLocal, String>>() {
            @Override
            protected Map, String> initialValue() {
                return new WeakHashMap, String>();
            }
        };
        private String generateName(ChannelHandler handler) {
            Map, String> cache = nameCaches.get();
            Class handlerType = handler.getClass();
            String name = cache.get(handlerType);
            if (name == null) {
                name = generateName0(handlerType);
                cache.put(handlerType, name);
            }
    
            // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
            // any name conflicts.  Note that we don't cache the names generated here.
            if (context0(name) != null) {
                String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
                for (int i = 1;; i ++) {
                    String newName = baseName + i;
                    if (context0(newName) == null) {
                        name = newName;
                        break;
                    }
                }
            }
            return name;
        }
    

    注:Netty 使用 FastThreadLocal 变量来缓存 Handler 的类和名称的映射关系,在生成 name 的时候,首先看缓存中有没有生成过默认 name,如果没有生成,就调用generateName0()生成默认名称,加入缓存。

    第三步:把 ChannelHandlerContext 作为节点添加到 pipeline 中,核心代码如下

        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    

    其本质就是一个双向链表的插入节点过程,而且,ChannelPipeline 删除 ChannelHandler 的方法,本质就是把这个双向链表的某个节点删掉!

    第四步:回调用户方法,核心代码如下

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
            try {
                ctx.callHandlerAdded();
            } catch (Throwable t) {
                boolean removed = false;
                try {
                    atomicRemoveFromHandlerList(ctx);
                    ctx.callHandlerRemoved();
                    removed = true;
                } catch (Throwable t2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                    }
                }
    
                if (removed) {
                    fireExceptionCaught(new ChannelPipelineException(
                            ctx.handler().getClass().getName() +
                            ".handlerAdded() has thrown an exception; removed.", t));
                } else {
                    fireExceptionCaught(new ChannelPipelineException(
                            ctx.handler().getClass().getName() +
                            ".handlerAdded() has thrown an exception; also failed to remove.", t));
                }
            }
        }
        final void callHandlerRemoved() throws Exception {
            try {
                // Only call handlerRemoved(...) if we called handlerAdded(...) before.
                if (handlerState == ADD_COMPLETE) {
                    handler().handlerRemoved(this);
                }
            } finally {
                // Mark the handler as removed in any case.
                setRemoved();
            }
        }
    

    其中ctx.callHandlerAdded();就是回调用户的handlerAdded方法,然后通过 CAS 方式修改节点的状态为 REMOVE_COMPLETE (说明该节点已经被移除),或者 ADD_COMPLETE (添加完成)。

    完整代码见:hello-netty

    本文所有图例见:processon: Netty学习笔记

    更多内容见:Netty专栏

    参考资料#

    跟闪电侠学 Netty:Netty 即时聊天实战与底层原理

    深度解析Netty源码

  • 相关阅读:
    Unity的脚本的基础语法(2)-Unity中记录时间
    C++的结构体
    来开源吧!发布开源组件到 MavenCentral 仓库超详细攻略
    Linux磁盘暴涨如何解决?
    独立站:最新选品建议
    图文详解机器学习中的超平面、函数间隔、几何间隔
    RedisTemplate操作ZSet的API
    哈希函数、Map-Reduce与Hadoop
    C语言程序——Switch分支选择程序
    Discrete Optimization课程笔记(2)—Constraint Programming约束规划(上)
  • 原文地址:https://www.cnblogs.com/greyzeng/p/16780213.html