• 【Netty 从成神到升仙系列 五】Netty 的责任链真有这么神奇吗?


    • 👏作者简介:大家好,我是爱敲代码的小黄,独角兽企业的Java开发工程师,Java领域新星创作者。
    • 📝个人公众号:爱敲代码的小黄
    • 📕系列专栏:Java设计模式、数据结构和算法
    • 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀
    • 🔥如果感觉博主的文章还不错的话,请👍三连支持👍一下博主哦
    • 🍂博主正在努力完成2022计划中:以梦为马,扬帆起航,2022追梦人

    责任链模式

    1. 什么是责任链模式?

    我们来看下百度百科 责任链模式 的定义:

    责任链模式是一种设计模式。在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链。请求在这个链上传递,直到链上的某一个对象决定处理此请求。发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任。

    通俗的理解,当我们的业务需要一连串的校验,比如下面的商品上架流程:

    image

    正常情况下,对于这种链式的校验,我们一般采用 **责任链 ** 的方式去进行处理。

    2. 责任链实现 demo

    首先,我们肯定要有一个上下文,这个上下文用来进行链表的执行:

    /**
     * handler上下文,维护链表和负责链表的执行
     */
    public class HandlerChainContext {
    
        HandlerChainContext next; // 持有下一个节点:单链表
    
        AbstractHandler handler;
    
        public HandlerChainContext(AbstractHandler handler) {
            this.handler = handler;
        }
    
        // 真正执行 handler
        void handler(Object arg0) {
            this.handler.doHandler(this, arg0);
        }
    
        // 继续执行下一个
        void runNext(Object arg0) {
            if (this.next != null) {
                this.next.handler(arg0);
            }
        }
    }
    

    处理器的抽象类:

    abstract public class AbstractHandler {
        abstract void doHandler(HandlerChainContext context, Object arg0);
    }
    

    具体的实现类:

    public class Handler1 extends AbstractHandler {
        @Override
        void doHandler(HandlerChainContext handlerChainContext, Object arg0) {
            System.out.println("--首次购买打9折!");
            arg0 = new DecimalFormat("0.00").format(Double.valueOf(arg0.toString()) * 0.9);
            System.out.println("折扣后金额:" + arg0);
            // 继续执行下一个
            handlerChainContext.runNext(arg0);
        }
    }
    

    Pipeline 类:

    public class Pipeline {
        // 持有上下文(可以获得需要的数据,属性)
        public HandlerChainContext context = new HandlerChainContext(new AbstractHandler() {
            @Override
            void doHandler(HandlerChainContext context, Object arg0) {
                System.out.println("折扣前" + arg0);
                context.runNext(arg0);
            }
        });
    
        // 添加责任链
        public void addLast(AbstractHandler handler) {
            HandlerChainContext next = context;
            while (next.next != null) {
                next = next.next;
            }
            next.next = new HandlerChainContext(handler);
        }
    
        // 开始调用
        public void requestProcess(Object arg0) {
            context.handler(arg0);
        }
        
        public static void main(String[] args) {
            Pipeline p = new Pipeline();
            p.addLast(new Handler1());
            p.addLast(new Handler2());
            p.addLast(new Handler3());
    
            p.requestProcess("150");
        }
    
    }
    

    3. Netty的责任链

    我们回顾下 Netty 的责任链使用方法:

    .childHandler(new ChannelInitializer<SocketChannel>() {
        protected void initChannel(SocketChannel ch) throws Exception {
            /*添加到该子channel的pipeline的尾部*/
            ch.pipeline().addLast(serverHandler);
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
            ch.pipeline().addLast(new ChannelOutboundHandlerAdapter());
        }
    });
    

    从这个地方我们可以看到,Netty 为每一个客户端的连接 SocketChannel 都分配了一个独立的 pipeline

    ChannelPipeline支持运行态动态的添加或者删除 ChannelHandler。当业务高峰期需要对系统做拥塞保护时,就可以根据当前的系统时间进行判断,如果处于业务高峰期,则动态地将系统拥塞保护ChannelHandler添加到当前的ChannelPipeline中,当高峰期过去之后,就可以动态删除拥塞保护 ChannelHandler了。

    ChannelPipeline是线程安全的,但是 ChannelHandler 却不是线程安全的。

    我们的 ChannelPipeline 执行是单线程的,因此是安全的,但 ChannelHandler 自己实现的可以利用多线程来跑,但是这种是不安全的。

    至于 ChannelPipeline 是单线程怎么知道的,后面我们会解答

    3.1 Netty 的责任链设计
    3.1.1 责任处理器的接口

    pipeline 中的所有的 handler 的顶级抽象接口,它规定了所有的 handler 统一要有添加,移除,异常捕获的行为。

    public interface ChannelHandler {
        // 添加
        void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    	 // 删除
        void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    }
    

    当然,NettyChannelHandler 做了进一步的划分,分别为 ChannelInboundHandlerAdapter(入栈)ChannelOutboundHandlerAdapter(出栈)

    在这里插入图片描述

    3.1.2 添加删除责任处理器的接口

    ChannelPipeline 中有各种各样的处理 Handler 的接口:

    public interface ChannelPipeline {
    	 // 头部
        ChannelPipeline addFirst(String name, ChannelHandler handler);
    	
        // 尾部
        ChannelPipeline addLast(String name, ChannelHandler handler);
    }
    

    默认的实现是 DefaultChannelPipeline,具体的实现见代码(核心就是双向链表的添加和删除

    3.1.3 上下文

    pipeline中的handler被封装进了上下文中。

    如下, 通过上下文,可以轻松拿到当前节点所属的channel,以及它的线程执行器
    在这里插入图片描述

    3.1.4 责任终止机制
    • 在pipeline中的任意一个节点,只要我们不手动的往下传播下去,这个事件就会终止传播在当前节点
    • 传播使用:super.channelRead(ctx, msg); 直接调用 ChannelHandlerContext 上下文里面的下一个执行 Handler。如下所示:
      在这里插入图片描述
    3.1.5 线程安全与否

    在我们执行过程中,每一个连接服务端的客户端,都会有一个 SocketChannel,而当客户端接入时,我们的 ServerBootstrap 会为每一个 SocketChannel 分配其相应的 pipeline

    当有新的链接进来时,会启动如下:

    // 这个方法会在服务端检查当前有连接事件时,调用 unsafe.read() 方法,创建 SocketChannel
    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        //接受新连接创建SocketChannel
        SocketChannel ch = SocketUtils.accept(javaChannel());
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
        return 0;
    }
    
    /**
     * Creates a new instance.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        // 新的 Pipeline
        pipeline = newChannelPipeline();
    }
    

    我们之前聊过,每一个 SocketChannel,我们都会为其分配一个 EventLoop 来进行执行。

    所以,我们每一个 SocketChannel 中的 Pipeline 在运行时,可以理解成是一个单线程的(线程安全)。

    但我们可以在每个 ChannelHandler 里面进行一个多线程的操作

    比如,客户端传进来的请求:客户端发送1000个正则表达式,需要服务端尽快的返回命中结果

    如果我们单单的使用 EventLoop 来使用,那么整个 Pipeline 中是一个单线程的处理模式,会极大的影响 RT

    于是,我们新起一个 Handler,在 Handler 里面新起一个线程池,来执行 1000Task,如下:

    public class Test {
    
        public static ThreadPoolExecutor service = new ThreadPoolExecutor(
                10, 10, 3, TimeUnit.MINUTES, new LinkedBlockingQueue(100));
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            Random random = new Random();
            List<Future<Boolean>> list = new ArrayList<>();
            for (int i = 0; i < 100; i++) {
                Future<Boolean> path = service.submit(new Task(random.nextInt(10)));
                list.add(path);
            }
            for (Future<Boolean> future : list) {
                System.out.println(future.get());
            }
        }
    }
    
    /**
     * 计算当前的正则匹配
     * 这里使用数字代替
     */
    class Task implements Callable {
        int value;
    
        public Task(int value) {
            this.value = value;
        }
    
        @Override
        public Object call() throws Exception {
            if (value <= 5) {
                return true;
            } else {
                return false;
            }
        }
    }
    
    

    这种我们整体线程流转如下:

    在这里插入图片描述

    注意:这种虽然可以提高效率,但必须要保证多线程情况下 ChannelHandlerContext 上下文的线程安全,不然,有可能出现线程不安全的情况发生。

    4. 总结

    总体而言,NettyPipline 设计主要使用了 责任链 的设计模式,通过 headtail 两个指针来对 Handler 进行操作。

    其中,个人感觉比较重要的应该是我们的 服务端(NioServerSocketChannel) 会为每一个 客户端(NIOSocketChannel) 分配一个独一无二的 Pipline,当然,每个 Pipline 里面的 Handler 还是固定的。

    另外,在 Pipline 运行过程中,我们执行完全由 EventLoop 来进行执行,也就是我们通常所说的 EventLoopGroup 线程组里面的线程。

    同样,我们之前也讲过关于 Netty 为每一个通道(NIOSocketChannel)分配 EventLoop 的策略(2次幂等 | 轮询)。

    最后,注意一下 Handler 是固定的,所以我们一般不会在 Hanler 里面做一些线程并发的情况。

    这次的文章就到这里了,后面会更新下 Netty 接受客户端连接的整个过程,再更新一波 Netty 整个的链路图,Netty 的章节应该就可以结束了。

    下一个模块的话,准备更新完之前落下的 Kafka 的源码模块。

  • 相关阅读:
    【FME实战教程】002:FME完美实现CAD数据转shp案例教程(以三调土地利用现状数据为例)
    JAVA要点
    vue3 setup中defineEmits与defineProps的使用案例
    浅析vue中computed,method,watch,watchEffect的区别
    数据结构--七大排序算法(更新ing)
    CC28 买卖股票的最好时机
    几道ctf 命令注入题目的解法
    物联网AI MicroPython学习之语法 sys系统相关
    【leetcode C++】最小栈
    AI黑科技:名片识别革命,一键get轻松
  • 原文地址:https://blog.csdn.net/qq_40915439/article/details/126962616