• Netty(6)netty组件:EventLoop、Channel、Future和Promise、Handler和Pipeline


    EventLoop

    EventLoop

    可以理解为:事件循环对象

    EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件

    1. 单线程执行器
    2. 处理 Channel 上源源不断的 io 事件:Channel 上通过selector去监听accept(建立连接)、read(读)、write(可写)等事件,通过EventLoop去处理这些事件

    继承关系,查看源码可知:
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    1. 继承自 java.util.concurrent.ScheduledExecutorService(这个是执行定时任务的线程池), 因此包含了线程池中所有的方法
    2. 继承自 netty 自己的 OrderedEventExecutor(这是一个有序的,进行事件处理的一个执行器)
      • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
      • 提供了 parent 方法来看看自己属于哪个 EventLoopGroup

    EventLoopGroup

    可以理解为:事件循环组
    一般我们不会直接使用EventLoop,而是使用EventLoopGroup

    EventLoopGroup 是一组 EventLoop

    1. Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop
    2. 后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全

    继承自 netty 自己的 EventExecutorGroup
    在这里插入图片描述

    • 实现了 Iterable 接口提供遍历 EventLoop 的能力
    • 另有 next 方法获取集合中下一个 EventLoop

    使用

    获取下一个事件循环对象

    // 1. 创建事件循环组
    /*
    NioEventLoopGroup:
        默认创建的线程个数为:
            1. 首先读取netty自己的性能参数,读取到了就以这个为准备(就是自己指定的数)
            2. 如果上面的没有读取到,会找到当前电脑的的cpu核心数 * 2
            3. 如果指定的为错误(如0),他是会去取范围中间的一个数
        功能最为全面,可以做下面的事情:
            1. 处理IO事件
            2. 处理普通任务
            3. 处理定时任务
    DefaultEventLoopGroup:
        可以做下面的事情:
            1. 处理普通任务
            2. 处理定时任务
     */
     // 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
    EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
    //EventLoopGroup group = new DefaultEventLoopGroup(); // 普通任务,定时任务
    // 2. 获取下一个事件循环对象
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    /*
    io.netty.channel.nio.NioEventLoop@548e7350
    io.netty.channel.nio.NioEventLoop@1a968a59
    io.netty.channel.nio.NioEventLoop@548e7350
    io.netty.channel.nio.NioEventLoop@1a968a59
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    执行普通任务

    好处:

    1. 可以进行异步处理
    2. 在accept,事件分发时,会使用这种方法,将接下来的一段代码从一个线程给(转移)到另一个线程执行
    // 1. 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
    EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
    // 2. 获取下一个事件循环对象
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    // 3. 执行普通任务
    //group.next().submit()类似
    group.next().execute(() -> {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.debug("ok");
    });
    log.debug("main");
    /*
    io.netty.channel.nio.NioEventLoop@548e7350
    io.netty.channel.nio.NioEventLoop@1a968a59
    io.netty.channel.nio.NioEventLoop@548e7350
    io.netty.channel.nio.NioEventLoop@1a968a59
    23:44:33 [DEBUG] [main] c.i.n.c.TestEventLoop - main
    23:44:34 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    执行定时任务

    用途:在实现keep-alive时,实现任务的保活

    // 1. 内部创建了两个 EventLoop, 每个 EventLoop 维护一个线程
    EventLoopGroup group = new NioEventLoopGroup(2); // io 事件,普通任务,定时任务
    // 2. 获取下一个事件循环对象
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    System.out.println(group.next());
    
    // 4. 执行定时任务
    /*
    	scheduleAtFixedRate:执行一个定时任务,以一定的频率去执行
    		参数1:任务对象
    		参数2:初始化延迟时间,设置为1就是1s后执行,设置为0代表立即执行
    		参数3:间隔时间
    		参数4:时间单位
    */
    group.next().scheduleAtFixedRate(() -> {
        log.debug("ok");
    }, 0, 1, TimeUnit.SECONDS);
    
    log.debug("main");
    /*
    io.netty.channel.nio.NioEventLoop@548e7350
    io.netty.channel.nio.NioEventLoop@1a968a59
    io.netty.channel.nio.NioEventLoop@548e7350
    io.netty.channel.nio.NioEventLoop@1a968a59
    23:48:26 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    23:48:26 [DEBUG] [main] c.i.n.c.TestEventLoop - main
    23:48:27 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    23:48:28 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    23:48:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    23:48:30 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    23:48:31 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    23:48:32 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestEventLoop - ok
    
    Process finished with exit code -1
    
    */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    处理IO事件

    服务器端

    在服务器端做了两个细分:

    1. 把EventLoop分为: boss 和 worker,二者处理不同的事件
    2. 创建一个独立的 EventLoopGroup去处理其他的任务
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    import java.nio.charset.Charset;
    
    @Slf4j//后面使用的log.debug需要这个注解
    public class EventLoopServer {
        public static void main(String[] args) {
            // 细分2:创建一个独立的 EventLoopGroup
            /*
                如果遇见某一个handler的耗时较长,最好不要使这个handler占用worker的NIO线程,否则就会影响NIO的读写操作
                下面这个EventLoopGroup专门处理耗时较长的操作,而不是NioEventLoopGroup去操作这种耗时较长的事件
                    DefaultEventLoopGroup:不需要处理IO事件,只能处理定时和普通任务
             */
            EventLoopGroup group = new DefaultEventLoopGroup();
            new ServerBootstrap()
                    /*细分1*/
                    // 把EventLoop分为: boss 和 worker
                    // 第一个参数: boss 只负责 ServerSocketChannel 上 accept 事件
                    // 第二个参数: worker 只负责 ServerSocketChannel 上的读写
                    /* 参数1:boss
                            ServerSocketChannel 只会跟一个EventLoop 绑定,更多的ServerSocketChannel不可能有,因为服务器只有一个,
                            其只会占用EventLoopGroup中的一个线程,剩余的也不会创建,这个地方不写参数也是可以的
                       参数2:worker
                            这个根据实际情况去设置,不设置的话就是cpu核心数 * 2 ;
                            下面设置为2,说明以后的worker线程只有两个
                     */
                    .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override//连接建立后调用
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {//自己处理数据
                                @Override// 没有StringDeCode,所以数据是一个ByteBuf类型(字节数据)
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//关注读事件
                                    ByteBuf buf = (ByteBuf) msg;
                                    // 将buf 转为字符串, 最好指定哪一种字节码
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                    //必须使用下面这个方法,否则,消息将会在这个地方断掉
                                    ctx.fireChannelRead(msg); // 将消息传递给下一个handler
                                }
                            }).addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
                                @Override                                         // ByteBuf
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    log.debug(buf.toString(Charset.defaultCharset()));
                                }
                            });
                        }
                    })
                    .bind(8080);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    客户端

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.net.InetSocketAddress;
    
    public class EventLoopClient{
        public static void main(String[] args) throws InterruptedException {
            // 1. 创建启动类,启动客户端
            new Bootstrap()
                // 2. 添加 EventLoop:比如服务器端发送数据过来,客户端的eventLoop就可以从选择器里触发读事件,去进行进一步的处理
                .group(new NioEventLoopGroup())
                // 3. 选择客户端 channel 实现,NioSocketChannel(封装了jdk的NioSocketChannel)
                .channel(NioSocketChannel.class)
                // 4. 添加处理器,ChannelInitializer(连接建立后会被调用,调用后便会执行initChannel方法)
                .handler(new ChannelInitializer<NioSocketChannel>() {
                    @Override // 在连接建立后被调用
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        //客户端将字符串编码成为ByteBuf,服务器端将ByteBuf解码为字符串
                        ch.pipeline().addLast(new StringEncoder());
                    }
                })
                // 5. 连接到服务器
                .connect(new InetSocketAddress("localhost", 8080))
                .sync()//
                .channel();
                
                // 6. 向服务器发送数据,即写数据
                System.out.println(channel);
                System.out.println("");
              
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    运行

    debug输入数据

    按下面的操作,便可以实现debug启动
    在这里插入图片描述

    在这里插入图片描述

    启动多个客户端

    1. 第一步
      在这里插入图片描述

    2. 按如下选择
      在这里插入图片描述

    3. 点击apply–》ok

    以后,直接右键,便可以启动多个客户端
    在这里插入图片描述

    运行结果

    在这里插入图片描述

    总结

    未创建EventLoopGroup去处理其他的任务

    在这里插入图片描述
    一旦建立连接,Channel就会跟一个NioEventLoop绑定,后续的所有请求,都会由同一个EventLoop处理

    创建EventLoopGroup去处理其他的任务

    在这里插入图片描述
    举例说明:

    1. 当第一个channel1到来时,前两个handler线程(head、h1)跟NIO的线程EventLoop1绑定了
    2. 当继续向下运行时,执行到另一个handler(h2),由于这个handler被指派给了DefaultEventLoopGroup里面的EventLoop1,所以这二者便绑定了
    3. 最后一个线程tail,这个还是跟Nio中的线程(EventLoop1)绑定

    handler 执行中如何实现切换

    问题也可以描述为:多个handler之间,使用的是不同的EventLoop,是如何进行线程的切换的

    EventLoop就可以当做是一个个人,多个工人的工作是如何进行交换的

    以下面的图举例:

    在这里插入图片描述

    在这里,将上面的NIOEventLoopGroup简称为Nio线程,DefaultEventLoopGroup简称为普通线程

    核心代码在这个类中:io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
    其关键代码如下

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
        /*
        	next.executor():返回下一个handler的EventLoop
        */
        EventExecutor executor = next.executor();
        
        // 是,直接调用
        /*
        	executor.inEventLoop():当handler中的线程,是否与executor 是同一个线程
        		成立的进入if,不成立进入else
        */
        if (executor.inEventLoop()) {
        	//让下一个handler也是在同一个线程去执行
            next.invokeChannelRead(m);
        } 
        // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
        else {
        	//给另一个线程executor去执行
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    简而言之:
    如果两个handler绑定的是同一个线程,那么就直接调用; 否则,把要调用的代码封装为一个任务对象,由下一一个 handler的线程来调用

    步骤:

    1. channel1中,第一个handler是Nio线程执行的;接下来会调用下一个handler,调用这个handler时会执行两步操作
    2. 操作会执行上面的invokeChannelRead这个方法:在这个方法里面,如果调用next.invokeChannelRead(m);,便会进入下一个handler

    Channel

    channel 的主要作用

    • close() 可以用来关闭 channel

    • closeFuture() 用来处理 channel 的关闭

      当close事件发生后,去进行关闭后的善后处理

      • sync 方法作用是同步等待 channel 关闭

      • 而 addListener 方法是异步等待 channel 关闭

    • pipeline() 方法添加处理器

      当连接建立时,调用触发器时,会在触发器中的initChannel这个方法里去调用channel的pipeline() 方法。作用就是给 channel的pipeline流水线中加入一个个handler处理器

    • write() 方法将数据写入

      将数据写入channel,但是不会立刻将数据通过·网络发出去,会触发一定条件时才会进行发送

    • writeAndFlush() 方法将数据写入并刷出

      可以保证数据立即写入并发出

    ChannelFuture

    注意 connect 方法是异步的,意味着不等连接建立,方法执行就返回了。因此 channelFuture 对象中不能【立刻】获得到正确的 Channel 对象

    connect是异步非阻塞的,主线程(main)发起了调用,真正执行connect(底层的)的是Nio线程,主线程不会被阻塞,将继续向下运行

    只要是带Future、Promise的,一般都是配合异步线程,一起使用的
    有两种方法去进行结果的处理

    1. 程序运行到ChannelFuture.sync()时,便会被阻塞,直到Nio线程连接建立完毕,,谁发起的调用,谁就等结果
    2. 使用andListener(回调对象)方法异步处理结果,将等结果和处理结果的内容都交给另一个线程去处理

    处理结果(第一种方法sync)

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    
    @Slf4j
    public class EventLoopClient {
        public static void main(String[] args) throws InterruptedException {
            // 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override // 在连接建立后被调用
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 1. 连接到服务器
                    // 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
                    .connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后
    
            // 2.1 使用 sync 方法同步处理结果,谁发起的调用,谁就等结果
            channelFuture.sync(); // 阻塞住当前线程,直到nio线程连接建立完毕
            Channel channel = channelFuture.channel();
            log.debug("{}", channel);
            channel.writeAndFlush("hello, world");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    处理结果(第二种方法andListener(回调对象))

    将等结果和处理结果的内容都交给另一个线程去处理

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    
    @Slf4j
    public class EventLoopClient {
        public static void main(String[] args) throws InterruptedException {
            // 2. 带有 Future,Promise 的类型都是和异步方法配套使用,用来处理结果
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override // 在连接建立后被调用
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 1. 连接到服务器
                    // 异步非阻塞, main 发起了调用,真正执行 connect 是 nio 线程
                    .connect(new InetSocketAddress("localhost", 8080)); // 1s 秒后
    
            // 2.2 使用 addListener(回调对象) 方法异步处理结果
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                // 在 nio 线程连接建立好之后,会调用 operationComplete
                /*
                	这里的channelFuture对象与调用时的channelFuture是同一个
                */
                public void operationComplete(ChannelFuture future) throws Exception {
                    Channel channel = future.channel();
                    log.debug("{}", channel);
                    channel.writeAndFlush("hello, world");
                }
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    CloseFuture

    close()方法是一个异步操作,我们希望在close关闭后再去做一些操作
    真正处理关闭的线程是在NioEventLoopGroup这个线程里面进行的

    LoggingHandler

    这个是handler是netty提供的,定义了日志级别,如下所示
    在这里插入图片描述
    这个handler里面的日志级别将被定义为debug级别进行输出
    在这里插入图片描述

    处理关闭sync(同步方式进行关闭)

    在当前线程进行阻塞,当真正调用了channel.close()方法后,sync方法才会继续向下运行

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    import java.util.Scanner;
    
    @Slf4j
    public class CloseFutureClient {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup group = new NioEventLoopGroup();
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override // 在连接建立后被调用
                        protected void initChannel(NioSocketChannel ch) throws Exception {
    //                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost", 8080));
            System.out.println(channelFuture.getClass());
            Channel channel = channelFuture.sync().channel();
            log.debug("{}", channel);
            new Thread(()->{
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String line = scanner.nextLine();
                    if ("q".equals(line)) {
                        channel.close(); // close 异步操作 1s 之后
    //                    log.debug("处理关闭之后的操作"); // 不能在这里善后
                        break;
                    }
                    channel.writeAndFlush(line);
                }
            }, "input").start();
    
            // 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
            ChannelFuture closeFuture = channel.closeFuture();
            
            log.debug("waiting close...");
            closeFuture.sync();//同步,会在当前线程进行阻塞
            log.debug("处理关闭之后的操作");
            
            
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    处理关闭(异步方式进行关闭)

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    import java.util.Scanner;
    
    @Slf4j
    public class CloseFutureClient {
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup group = new NioEventLoopGroup();
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override // 在连接建立后被调用
                        protected void initChannel(NioSocketChannel ch) throws Exception {
    //                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost", 8080));
            System.out.println(channelFuture.getClass());
            Channel channel = channelFuture.sync().channel();
            log.debug("{}", channel);
            new Thread(()->{
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String line = scanner.nextLine();
                    if ("q".equals(line)) {
                        channel.close(); // close 异步操作 1s 之后
    //                    log.debug("处理关闭之后的操作"); // 不能在这里善后
                        break;
                    }
                    channel.writeAndFlush(line);
                }
            }, "input").start();
    
            // 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭
            ChannelFuture closeFuture = channel.closeFuture();
            closeFuture.addListener((ChannelFutureListener) future -> {
                log.debug("处理关闭之后的操作");
                //使NioEventLoopGroup 的线程都关闭掉
                group.shutdownGracefully();
            });
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    Netty为什么要使用异步的方法去处理(其提升的是是什么)

    要点

    • 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势

    • 异步并没有缩短响应时间,反而有所增加

      异步的响应时间其实是增长了,提高的是吞吐量(提高了单位时间的访问的个数)

    • 合理进行任务拆分,也是利用异步的关键

      合理拆分

    提升的是是什么:吞吐量

    吞吐量:单位时间内能够处理请求的速度

    Future & Promise

    这两个异步处理时,经常用到的接口

    netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展(netty中的Future继承了JDK中的Future,netty中的Promise继承了netty中的Future接口)

    • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
    • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
    • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器

    常用的一些方法

    功能/名称jdk Futurenetty FuturePromise
    cancel取消任务--
    isCanceled任务是否取消--
    isDone任务是否完成,不能区分成功失败--
    get获取任务结果,阻塞等待--
    getNow-等待任务结束,获取任务结果非阻塞,还未产生结果时返回 null-
    await-等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断任务是成功还是失败-
    sync-等待任务结束,不去获取结果,如果任务失败,抛出异常-
    isSuccess-判断任务是否成功-
    cause-获取失败信息非阻塞,如果没有失败,返回null-
    addLinstener-添加回调,异步接收结果-
    setSuccess--设置成功结果
    setFailure--设置失败结果

    JDK Future

    
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.*;
    
    @Slf4j
    public class TestJdkFuture {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1. 线程池
            ExecutorService service = Executors.newFixedThreadPool(2);
            // 2. 提交任务
            /*
            Callable:有返回结果
            Runable:没有返回结果
             */
            Future<Integer> future = service.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    log.debug("执行计算");
                    Thread.sleep(1000);
                    return 50;
                }
            });
            // 3. 主线程通过 future 来获取结果
            log.debug("等待结果");
            log.debug("结果是 {}", future.get());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    运行结果

    21:36:05 [DEBUG] [pool-1-thread-1] c.i.n.c.TestJdkFuture - 执行计算
    21:36:05 [DEBUG] [main] c.i.n.c.TestJdkFuture - 等待结果
    21:36:06 [DEBUG] [main] c.i.n.c.TestJdkFuture - 结果是 50
    
    
    • 1
    • 2
    • 3
    • 4

    Netty Future

    
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.EventLoop;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    
    @Slf4j
    public class TestNettyFuture {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            NioEventLoopGroup group = new NioEventLoopGroup();
            EventLoop eventLoop = group.next();
            Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    log.debug("执行计算");
                    Thread.sleep(1000);
                    return 70;
                }
            });
            //同步的方式获取
    //        log.debug("等待结果");
    //        log.debug("结果是 {}", future.get());
            //异步的方式获取
            future.addListener(new GenericFutureListener<Future<? super Integer>>(){
                @Override
                public void operationComplete(Future<? super Integer> future) throws Exception {
                    log.debug("接收结果:{}", future.getNow());
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    运行结果

    21:53:37 [DEBUG] [main] c.i.n.c.TestNettyFuture - 等待结果
    21:53:37 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - 执行计算
    21:53:38 [DEBUG] [main] c.i.n.c.TestNettyFuture - 结果是 70
    21:53:38 [DEBUG] [nioEventLoopGroup-2-1] c.i.n.c.TestNettyFuture - addListener接收结果:70
    
    Process finished with exit code -1
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    Netty Promise

    import io.netty.channel.EventLoop;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.util.concurrent.DefaultPromise;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.ExecutionException;
    
    @Slf4j
    public class TestNettyPromise {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 1. 准备 EventLoop 对象
            EventLoop eventLoop = new NioEventLoopGroup().next();
            // 2. 可以主动创建 promise;promise就类似于结果容器,存放结果
            DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
            new Thread(() -> {
                // 3. 任意一个线程执行计算,计算完毕后向 promise 填充结果
                log.debug("开始计算...");
                try {
                    int i = 1 / 0;
                    Thread.sleep(1000);
                    //填充正常的结果
                    promise.setSuccess(80);
                } catch (Exception e) {
                    e.printStackTrace();
                    //填充异常的结果
                    promise.setFailure(e);
                }
    
            }).start();
            // 4. 接收结果的线程
            log.debug("等待结果...");
            log.debug("结果是: {}", promise.get());
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    运行结果:

    21:57:52 [DEBUG] [Thread-0] c.i.n.c.TestNettyPromise - 开始计算...
    21:57:52 [DEBUG] [main] c.i.n.c.TestNettyPromise - 等待结果...
    java.lang.ArithmeticException: / by zero
    	at cn.itcast.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:22)
    	at java.lang.Thread.run(Thread.java:745)
    Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
    	at io.netty.util.concurrent.AbstractFuture.get(AbstractFuture.java:41)
    	at cn.itcast.netty.c3.TestNettyPromise.main(TestNettyPromise.java:34)
    Caused by: java.lang.ArithmeticException: / by zero
    	at cn.itcast.netty.c3.TestNettyPromise.lambda$main$0(TestNettyPromise.java:22)
    	at java.lang.Thread.run(Thread.java:745)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Handler & Pipeline

    ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline

    • 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要用来读取客户端数据,写回结果
    • 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对写回结果进行加工

      只有向channel里写入数据(write或writeAndFlush方法),才会触发

    打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而后面要讲的 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品

    inbound

    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    import java.nio.charset.Charset;
    
    @Slf4j
    public class TestPipeline {
        public static void main(String[] args) {
            new ServerBootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // 1. 通过 channel 拿到 pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            // 2. 添加处理器,下面的构成这种链: head ->  h1 -> h2 ->  h4 -> h3 -> h5 -> h6 -> tail
                            /*
                                netty会自动创建两个handler:head和tail
                                上面的一条链会依次执行
                             */
                            //入栈
                            pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    log.debug("1");
                                    super.channelRead(ctx, msg);
                                }
                            });
                            //入栈
                            pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
                                    log.debug("2");
                                    super.channelRead(ctx, name); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
                                }
                            });
                            //入栈
                            pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    log.debug("3");
                                    //写入操作,只是为了触发下面的出栈处理器
                                    ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                                }
                            });
                            //出栈
                            pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.debug("4");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            //出栈
                            pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.debug("5");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            //出栈
                            pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.debug("6");
                                    super.write(ctx, msg, promise);
                                }
                            });
                        }
                    })
                    .bind(8080);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83

    运行结果
    在这里插入图片描述

    outbound

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import jdk.nashorn.internal.objects.annotations.Constructor;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    import java.nio.charset.Charset;
    
    @Slf4j
    public class TestPipeline {
        public static void main(String[] args) {
            new ServerBootstrap()
             .group(new NioEventLoopGroup())
             .channel(NioServerSocketChannel.class)
             .childHandler(new ChannelInitializer<NioSocketChannel>() {
                 @Override
                 protected void initChannel(NioSocketChannel ch) throws Exception {
                     // 1. 通过 channel 拿到 pipeline
                     ChannelPipeline pipeline = ch.pipeline();
                     // 2. 添加处理器,下面的构成这种链: head ->  h1 -> h2 ->  h4 -> h3 -> h5 -> h6 -> tail
                     /*
                         netty会自动创建两个handler:head和tail
                         上面的一条链会依次执行
                      */
                     //入栈
                     pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                         @Override
                         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                             log.debug("1");
                             ByteBuf buf = (ByteBuf) msg;
                             String name = buf.toString();
                             super.channelRead(ctx, name);
                         }
                     });
                     //入栈
                     pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                         @Override
                         public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
                             log.debug("2");
                             Student student  = new Student(name.toString());
                             super.channelRead(ctx, student); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
                         }
                     });
                     //入栈
                     pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                         @Override
                         public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                             log.debug("3, 结果{}, class:{}", msg, msg.getClass());
                             ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                             //写入操作,只是为了触发下面的出栈处理器
    //                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                         }
                     });
                     //出栈
                     pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                         @Override
                         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                             log.debug("4");
                             super.write(ctx, msg, promise);
                         }
                     });
                     //出栈
                     pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                         @Override
                         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                             log.debug("5");
                             super.write(ctx, msg, promise);
                         }
                     });
                     //出栈
                     pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
                         @Override
                         public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                             log.debug("6");
                             super.write(ctx, msg, promise);
                         }
                     });
                 }
             })
             .bind(8080);
        }
        @Data
        @AllArgsConstructor
        static class Student {
            private String name;
    
            public Student(String name) {
                this.name = name;
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    运行结果:
    在这里插入图片描述

    ChannelHandlerContext

    ctx(ChannelHandlerContext)是向前找outbound(出站)
    ch(NioSocketChannel)是向后去找outbound(出站)

    代码如下所示,如果将h4换到h3之前,并在h3里面使用ctx的话,会出下面的结果

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    
    import java.nio.charset.Charset;
    
    @Slf4j
    public class TestPipeline {
        public static void main(String[] args) {
            new ServerBootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            // 1. 通过 channel 拿到 pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            // 2. 添加处理器,下面的构成这种链: head ->  h1 -> h2 ->  h4 -> h3 -> h5 -> h6 -> tail
                            /*
                                netty会自动创建两个handler:head和tail
                                上面的一条链会依次执行
                             */
                            //入栈
                            pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    log.debug("1");
                                    super.channelRead(ctx, msg);
                                }
                            });
                            //入栈
                            pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
                                    log.debug("2");
                                    super.channelRead(ctx, name); // 将数据传递给下个 handler,如果不调用,调用链会断开 或者调用 ctx.fireChannelRead(student);
                                }
                            });
                            //出栈
                            pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.debug("4");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            //入栈
                            pipeline.addLast("h3", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    log.debug("3");
                                    //ctx是向前去找出站(outbound)处理器
                                    ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                                    //写入操作,只是为了触发下面的出栈处理器
    //                                ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server...".getBytes()));
                                }
                            });
                            
                            //出栈
                            pipeline.addLast("h5", new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.debug("5");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            //出栈
                            pipeline.addLast("h6", new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.debug("6");
                                    super.write(ctx, msg, promise);
                                }
                            });
                        }
                    })
                    .bind(8080);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    在这里插入图片描述

    如果h4没有换到h3之前,是只能打印到3的位置的

    总结

    inbound和outbound

    ChannelInboundHandlerAdapter 是按照 addLast 的顺序执行的,而 ChannelOutboundHandlerAdapter 是按照 addLast 的逆序执行的。ChannelPipeline 的实现是一个 ChannelHandlerContext(包装了 ChannelHandler) 组成的双向链表(链表如下:)
    在这里插入图片描述

    • 入站处理器中,ctx.fireChannelRead(msg) 是 调用下一个入站处理器
      • 如果注释掉 1 处代码,则仅会打印 1
      • 如果注释掉 2 处代码,则仅会打印 1 2
    • 3 处的 ctx.channel().write(msg) 会 从尾部开始触发 后续出站处理器的执行
      • 如果注释掉 3 处代码,则仅会打印 1 2 3
    • 类似的,出站处理器中,ctx.write(msg, promise) 的调用也会 触发上一个出站处理器
      • 如果注释掉 6 处代码,则仅会打印 1 2 3 6
    • ctx.channel().write(msg) vs ctx.write(msg)
      • 都是触发出站处理器的执行
      • ctx.channel().write(msg) 从尾部开始查找出站处理器
      • ctx.write(msg) 是从当前节点找上一个出站处理器
      • 3 处的 ctx.channel().write(msg) 如果改为 ctx.write(msg) 仅会打印 1 2 3,因为节点3 之前没有其它出站处理器了
      • 6 处的 ctx.write(msg, promise) 如果改为 ctx.channel().write(msg) 会打印 1 2 3 6 6 6… 因为 ctx.channel().write() 是从尾部开始查找,结果又是节点6 自己

    pipeline 触发的原始流程

    数字代表了处理步骤的先后次序
    在这里插入图片描述
    如果在 in_3 使用的是ctx.write的话,是会向前找out

  • 相关阅读:
    lenovo联想笔记本ThinkPad P16V Gen 1(21FC,21FD)原装出厂Win11系统
    设计模式 | 简单工厂模式
    【Swift 60秒】15 - Creating empty collections
    Unity websocket
    数据库常用的几大范式NF
    【AI Business Model】人工智能的定义 | 了解 AI 的历史 | 简单理解什么是 “图灵测试“
    代码随想录算法训练营第五十天|股票问题专题(2)
    【Android Studio】工程中文件Annotate with Git Blame 不能点击
    android 5.1 BatteryManager深入分析
    操作系统原理-习题汇总
  • 原文地址:https://blog.csdn.net/yyuggjggg/article/details/126376956