• 【Netty 几个组件介绍】


    一、eventLoop

    EventLoop

    EventLoop 本质是一个单线程执行器 (同时维护了一个Selector), 里面有run 方法处理一个或者多个channel 上源源不断的io事件。
    它的继承关系如下:

    • 继承自j.u.c.ScheduleExecutorService 因此包含了线程池中所有的方法
    • 继承自 netty 自己的OrdererEventExecutor
      • 提供了 boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此EventLoop
      • 提供了EventLoopGroup parent() 方法来看自己属于哪个EventLoopGroup

    EventLoopGroup

    EventLoopGroup 是一组EventLoop, Channel 一般会调用
    EventLoopGroup 的register 方法来绑定其中一个 EventLoop, 后续这个Channel 上的 io事件都是由此EventLoop来处理 (保证了io事件处理时的线程安全)

    • 继承自netty 自己的EventExecutorGroup
      • 实现了Iterable 接口提供遍历 EventLoop的能力
      • 另外 next可以获取到下一个 EventLoop

    处理普通与定时任务

    代码:

    package com.xlg.component.netty.eventLoop;
    
    import java.util.concurrent.TimeUnit;
    
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-08
     */
    public class TestEventLoop {
    
        public static void main(String[] args) {
            // 1. 创建事件循环组
            /*
            nthreads == 0 ? Math.max(1, SystemPropertyUtil.getInt(
                    "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2)) : nthreads;
             */
            EventLoopGroup group = new NioEventLoopGroup(2); // io 事件, 普通任务, 定时任务
            //        EventLoopGroup group = new DefaultEventLoopGroup();  // 普通任务, 定时任务
    
            // 2. 获取下一个事件循环对象
            // 1 3 相等 因为底层是循环走的
            System.out.println(group.next());
            System.out.println(group.next());
            System.out.println(group.next());
            System.out.println(group.next());
    
            // 3. 指定普通任务
            //        group.next().submit(() -> System.out.println("ok"));
    
            // 4. 定时任务
            // 立即执行, 并且是1s一次
            group.next().scheduleAtFixedRate(() -> System.out.println("ok"), 0, 1, TimeUnit.SECONDS);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    执行结果:

    io.netty.channel.nio.NioEventLoop@3d8c7aca
    io.netty.channel.nio.NioEventLoop@5ebec15
    io.netty.channel.nio.NioEventLoop@3d8c7aca
    io.netty.channel.nio.NioEventLoop@5ebec15
    ok
    ok
    ok
    ok
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    关闭EventLoopGroup
    优雅的关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup到关闭状态从而拒绝新的任务加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的.

    处理IO任务

    服务器代码

    package com.xlg.component.netty.eventLoop;
    
    import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
    
    import java.nio.charset.StandardCharsets;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-08
     */
    public class EventLoopServer {
    
        private static final Logger logger = LoggerFactory.getLogger(EventLoopServer.class);
    
        public static void main(String[] args) {
           new ServerBootstrap()
                   .group(new NioEventLoopGroup())
                   .channel(NioServerSocketChannel.class)
                   .childHandler(new ChannelInitializer<NioSocketChannel>() {
                       @Override
                       protected void initChannel(NioSocketChannel ch) throws Exception {
                           ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                               @Override
                               public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                   ByteBuf buf = (ByteBuf) msg;
                                   System.out.println(buf.toString(StandardCharsets.UTF_8));
                               }
                           });
                       }
                   }).bind(NETTY_SERVER_PORT);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    客户端代码

    package com.xlg.component.netty.eventLoop;
    
    import static com.xlg.component.nio.TestCommon.LOCAL_HOST;
    import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
    
    import java.net.InetSocketAddress;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-08
     */
    public class EventLoopClient {
        public static void main(String[] args) throws InterruptedException {
             Channel channel = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT))
                    .sync()
                    .channel();
            System.out.println(channel);
            System.out.println("--");
            channel.writeAndFlush("asdfasf");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    分工

    Bootstrap 的group方法可以传入两个 EventLoopGroup参数,分别boss处理accept事件,work处理读写

    new ServerBootstrap()
                	// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                
    				...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    多个客户端分别发送 hello 结果.

    nioEventLoopGroup-3-1 hello1
    nioEventLoopGroup-3-2 hello2
    nioEventLoopGroup-3-1 hello3
    nioEventLoopGroup-3-2 hello4
    nioEventLoopGroup-3-2 hello4
    
    • 1
    • 2
    • 3
    • 4
    • 5

    其实可以看到, 一个EventLoop 可以负责多个Channel,且EventLoop 一旦与Channel绑定,则一直负责该Channel的事件.
    image.png

    增加自定义EventLoopGroup
    问题:
    当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup, 避免同一个NioEventLoop 中的其他Channel在较长时间内都无法得到处理

    package com.xlg.component.netty.eventLoop;
    
    import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
    
    import java.nio.charset.StandardCharsets;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.DefaultEventLoopGroup;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-09
     */
    public class MyServer {
    
        public static void main(String[] args) {
            // 增加自定义的非NioEventLoopGroup
            EventLoopGroup group = new DefaultEventLoopGroup();
    
            new ServerBootstrap()
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        // 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(
                                            Thread.currentThread().getName() + ": " + buf.toString(StandardCharsets.UTF_8));
    
                                    // 调用下一个handler
                                    ctx.fireChannelRead(msg);
                                }
                            })
                            // 该handler绑定自定义的Group
                            .addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(Thread.currentThread().getName() + ": " + buf
                                            .toString(StandardCharsets.UTF_8));
                                }
                            });
                        }
                    })
                    .bind(NETTY_SERVER_PORT);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    启动三个客户端发送数据

    nioEventLoopGroup-4-1: asdf
    defaultEventLoopGroup-2-1: asdf
    nioEventLoopGroup-4-2: hel2
    defaultEventLoopGroup-2-2: hel2
    nioEventLoopGroup-4-1: adf333
    defaultEventLoopGroup-2-3: adf333
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    可以看出,客户端与服务器之间的事件,被nioEventLoopGroup 和 defaultEventLoopGroup分别处理
    image.png

    切换的实现
    **不同的EventLoopGroup 切换的实现原理如下: **
    由上面的图可以看出,当handler 中绑定的Group不同时,需要切换Group来执行不同的任务

    io.netty.channel.AbstractChannelHandlerContext
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
     final Object m = next.pipline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        // 获取下一个EventLoop, excutor 即为EventLoopGroup
        EventExecutor executor = next.executor();
        
        // 如果下一个EventLoop 在当前的 EVentLoopGroup 中
        if (executor.inEventLoop()) {
            // 使用当前的 EventLoopGroup 中的 EventLoop 来处理任务
            next.invokeChannelRead(m);
        } else {
            // 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
            executor.executor(new Runnable(){
                public void run() {
                   next.invokeChannelRead(m);
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 如果两个handler 绑定的是同一个 EventLoopGroup, 就直接调用
    • 否则,把要调用的代码封装为一个任务对象,由下一个handler 的EventLoopGroup 来调用.

    二、Channel


    Channel 的常用方法

    • close() 可以用来关闭Channel
    • closeFuture() 用来处理 Channel 的关闭
      • sync 方法作用是同步等待channel 关闭
      • 而 addListener 方法是异步等待 channel 关闭
    • pipline() 方法用于添加处理器
    • write() 方法将数据写入
      • 因为缓冲机制,数据被写入到channel 中以后,不会立即被发送
      • 只有当缓冲满了 或者 调用了flush() 方法后,才会将数据通过channel 发送出去
    • writeAndFlush() 方法将数据写入并立即发送 (刷出)

    ChannelFuture

    获取channel

    同步异步获取建立连接后的 channel 和 发送数据.

    package com.xlg.component.netty.eventLoop;
    
    import static com.xlg.component.nio.TestCommon.LOCAL_HOST;
    import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
    
    import java.net.InetSocketAddress;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-08
     */
    public class EventLoopClient {
        private static final Logger logger = LoggerFactory.getLogger(EventLoopClient.class);
    
        public static void main(String[] args) throws InterruptedException {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 1. 建立链接
                    // 异步非阻塞, main发起了调用, 真正执行的是 nio 线程.NioEventLoopGroup
                    .connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT));
    
            // 2.1 使用sync同步处理结果
            // 阻塞住当前线程, 直到nio线程连接完毕. 如果去掉sync此时, 由于连接未建立完毕, main执行, channel无法给服务器写数据
    //        channelFuture.sync();
    //        Channel channel = channelFuture.channel();
    //        System.out.println(channel);
    //        System.out.println("--");
    //        channel.writeAndFlush("hello world");
    
            // 2.2 异步addListener(回调对象)
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    final Channel channel = future.channel();
                    logger.debug("{}", channel);
                    channel.writeAndFlush("hello world !!");
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    正确关闭

    package com.xlg.component.netty.eventLoop;
    
    import static com.xlg.component.nio.TestCommon.LOCAL_HOST;
    import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
    
    import java.net.InetSocketAddress;
    import java.util.Scanner;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-08
     */
    public class CloseFutureClient {
        private static final Logger logger = LoggerFactory.getLogger(CloseFutureClient.class);
    
        public static void main(String[] args) throws InterruptedException {
            NioEventLoopGroup group = new NioEventLoopGroup();
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 1. 建立链接
                    // 异步非阻塞, main发起了调用, 真正执行的是 nio 线程.NioEventLoopGroup
                    .connect(new InetSocketAddress(LOCAL_HOST, NETTY_SERVER_PORT));
    
            Channel channel = channelFuture.sync().channel();
            new Thread(() -> {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String line = scanner.nextLine();
                    if ("q".equals(line)) {
                        // 还是异步的, 关闭交给另外一个线程处理
                        channel.close();
                        // thread1 输出
    //                    logger.debug("关闭之后的操作!!1");
                        break;
                    }
                    channel.writeAndFlush(line);
                }
            }, "thread1").start();
    //        logger.debug("main end ");
    
            // 获取closeFuture  关闭后的处理 1. 同步 2.异步
            ChannelFuture closeFuture = channel.closeFuture();
    //        logger.debug("waiting close....");
    //        closeFuture.sync();
    //        logger.debug("处理关闭之后的操作...");
    
            closeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    logger.debug("处理关闭之后的操作...");
                    // 优雅的关闭NioEventLoopGroup 里面还有其他线程
                    group.shutdownGracefully();
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78

    提升的是 吞吐量: 单位时间内处理的请求个数
    截屏2022-06-11 13.42.55.png

    三、Future & Promise

    在异步处理时,经常用到这两个接口
    netty 的Future 和 jdk 中Future ,但是是两个接口,属于继承。而Promise 对netty中future进行了扩展

    • jdk Future 只能同步等待任务结束 (成功 或者 失败 ) 才能得到结果
    • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束.
    • netty Promise 不仅有netty future的能力,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
      | 功能/名称 | jdk Future | netty Future | Promise |
      | — | — | — | — |
      | cancel | 取消任务 | - | - |
      | isCanceled | 任务是否取消 | - | - |
      | isDone | 任务是否完成,不能区分成功失败 | - | - |
      | get | 获取任务结果,阻塞等待 | - | - |
      | getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
      | await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
      | sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
      | isSuccess | - | 判断任务是否成功 | - |
      | cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
      | addLinstener | - | 添加回调,异步接收结果 | - |
      | setSuccess | - | - | 设置成功结果 |
      | setFailure | - | - | 设置失败结果 |

    jdk future

    package com.xlg.component.netty.FutureAndPromise;
    
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-11
     */
    public class TestJDKFuture {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //1. 线程池
            ExecutorService executorService = Executors.newFixedThreadPool(2);
            //2. 执行
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("执行计算!!1");
                    Thread.sleep(1000);
                    return 50;
                }
            });
            //3. 获取结果
            System.out.println("等待结果!11");
            System.out.println(future.get());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    netty future

    package com.xlg.component.netty.FutureAndPromise;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.channel.EventLoop;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.util.concurrent.Future;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-11
     */
    public class TestNettyFuture {
    
        private static final Logger logger = LoggerFactory.getLogger(TestNettyFuture.class);
    
        public static void main(String[] args) {
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            EventLoop loop = group.next();
    
            //2. 执行
            Future<Integer> future = loop.submit(() -> {
                logger.debug("执行计算中!");
                Thread.sleep(1000);
                return 50;
            });
            //3. 获取结果
            logger.debug("等待结果!1!");
            // 立即返回结果
            logger.debug("{}", future.getNow());
            // 同步返回结果 也就是阻塞
    //        logger.debug("{}", future.get());
            // 异步返回结果
            future.addListener(future1 -> logger.debug("接受结果: {}", future1.getNow()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    netty promise

    package com.xlg.component.netty.FutureAndPromise;
    
    import java.util.concurrent.ExecutionException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.channel.EventLoop;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.util.concurrent.DefaultPromise;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-11
     */
    public class TestNettyPromise {
    
        private static final Logger logger = LoggerFactory.getLogger(TestNettyPromise.class);
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            EventLoop loop = group.next();
    
            DefaultPromise<Object> promise = new DefaultPromise<>(loop);
    
            new Thread(() -> {
    
                logger.debug("开始计算!!");
                try {
                    int i = 1 / 0;
                    Thread.sleep(1000);
                    promise.setSuccess(50);
                } catch (Exception e) {
                    promise.setFailure(e);
                }
    
            }).start();
    
            logger.debug("等待结果!!");
    
    //        logger.debug("接受结果: {}", promise.get());
            logger.debug("接受结果: {}", promise.isSuccess());
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    四、handler & Pipline


    代码:

    package com.xlg.component.netty.Pipeline;
    
    import static com.xlg.component.nio.TestCommon.NETTY_SERVER_PORT;
    
    import java.nio.charset.StandardCharsets;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOutboundHandlerAdapter;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.ChannelPromise;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-12
     */
    public class TestPipelineServer {
    
        private static final Logger logger = LoggerFactory.getLogger(TestPipelineServer.class);
    
        public static void main(String[] args) {
            new ServerBootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(
                            new ChannelInitializer<NioSocketChannel>() {
                                @Override
                                protected void initChannel(NioSocketChannel ch) throws Exception {
                                    // socket中 是有pipeline类似一个处理流 双向链表
                                    // 默认有head -> tail  插入的分为入站和出站
                                    // head -> h1 -> h2 -> h3 -> h4 -> tail
                                    ChannelPipeline pipeline = ch.pipeline();
    
                                    // 入站的handler处理完数据, 还可以把处理的数据 msg 给到下一个hander
                                    // 调用后一个handler处理: ctx.fireChannelRead(msg); == super.channelRead(ctx, msg);
                                    pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                                        @Override
                                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            logger.debug("1");
                                            super.channelRead(ctx, "1 -> 变更数据");
                                        }
                                    });
                                    pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                                        @Override
                                        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                            logger.debug("2, msg = {}", msg);
                                            // 3. 执行write写出数据, 这样出站handler才能执行
                                            ch.writeAndFlush(ctx.alloc().buffer().writeBytes("server..".getBytes(
                                                    StandardCharsets.UTF_8)));
                                            super.channelRead(ctx, msg);
                                        }
                                    });
    
                                    // 出站的顺序是从tail向前的
                                    pipeline.addLast("h3", new ChannelOutboundHandlerAdapter(){
                                        @Override
                                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                                throws Exception {
                                            logger.debug("3");
                                            super.write(ctx, msg, promise);
                                        }
                                    });
                                    pipeline.addLast("h4", new ChannelOutboundHandlerAdapter(){
                                        @Override
                                        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                                                throws Exception {
                                            logger.debug("4");
                                            super.write(ctx, msg, promise);
                                        }
                                    });
                                }
                            })
                    .bind(NETTY_SERVER_PORT);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    运行结果:

    2022-06-12 16:47:31,253 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 47] 1
    2022-06-12 16:47:31,253 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 54] 2, msg = 1 -> 变更数据
    2022-06-12 16:47:31,256 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 75] 4
    2022-06-12 16:47:31,257 DEBUG [nioEventLoopGroup-2-2] c.x.c.n.Pipeline.TestPipelineServer [TestPipelineServer.java : 67] 3
        
    当然因为有LoggerHanlder 在客户端, 有server给客户端发的数据. 
        server...
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • handler 是可以取名的addLast(“h3”, new ChannelOutboundHandlerAdapter(){
    • pipeline是一个双向链表, 默认head -> tail 两个节点
      • 通过ctx.fireChannelRead(msg) 等方法,将当前handler的处理结果传递给下一个handler
    • **入站 (Inbound) ,**从head 向后,直到不是Inbound
    • 出站 (outbound), 从tail向前调用handler, 直到不是outbound,但是需要有触发.
      • channel.writeAndFlush(ctx.alloc().buffer().writeBytes(“server…”.getBytes(
        StandardCharsets.UTF_8))); 从tail向前走找outbound
      • ctx.writeAndFlush(), 从当前handler向前找outbound

    具体的结构

    image.png

    调用顺序

    image.png

    EmbeddedChannel

    embeddedChannel 可以用于测试各个handler, 通过其构造函数按照顺序传入需要测试的handler, 然后调用对应的Inbound 和 Outbound方法即可.

    public class TestEmbeddedChannel {
        public static void main(String[] args) {
            ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("1");
                    super.channelRead(ctx, msg);
                }
            };
    
            ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                    System.out.println("2");
                    super.channelRead(ctx, msg);
                }
            };
    
            ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("3");
                    super.write(ctx, msg, promise);
                }
            };
    
            ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
                @Override
                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                    System.out.println("4");
                    super.write(ctx, msg, promise);
                }
            };
    
            // 用于测试Handler的Channel
            EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
            
            // 执行Inbound操作 
            channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
            // 执行Outbound操作
            channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes(StandardCharsets.UTF_8)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    五、ByteBuf


    调试工具方法

    public static void log(ByteBuf buffer) {
        int length = buffer.readableBytes();
        int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
        StringBuilder buf = new StringBuilder(rows * 80 * 2)
            .append("read index:").append(buffer.readerIndex())
            .append(" write index:").append(buffer.writerIndex())
            .append(" capacity:").append(buffer.capacity())
            .append(NEWLINE);
        appendPrettyHexDump(buf, buffer);
        System.out.println(buf.toString());
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    该方法可以帮助我们更为详细地查看ByteBuf中的内容

    创建

    package com.xlg.component.netty.ByteBuf;
    
    import java.nio.charset.StandardCharsets;
    
    import org.junit.Test;
    
    import com.xlg.component.ks.utils.ByteBufUtl;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    
    /**
     * @author wangqingwei
     * Created on 2022-06-12
     */
    public class ByteBufTest {
    
        @Test
        public void byteBufStudy(){
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
            // init
            ByteBufUtl.log(buffer);
    
            StringBuilder stringBuilder = new StringBuilder();
            for (int i = 0; i < 300; i++) {
                stringBuilder.append("a");
            }
    
            buffer.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
            ByteBufUtl.log(buffer);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    运行结果

    结果: 
    ead index:0 write index:0 capacity:256
    
    read index:0 write index:300 capacity:512
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    通过** ByteBufAllocator.DEFAULT.buffer() ** 创建byteBuf 默认大小 256,使用直接内存创建,也可以指定其大小.
    并且我我们来看,其有扩容机制,这一点是NIO所没有的.
    如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建

    直接内存与堆内存

     @Test
        public void directAndHeapByteBuf(){
            // 默认是以直接内存
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
            System.out.println(buffer.getClass());
    
            // 堆内存创建
            ByteBuf buf = ByteBufAllocator.DEFAULT.heapBuffer(16);
            System.out.println(buf.getClass());
    
            ByteBuf buffer2 = ByteBufAllocator.DEFAULT.directBuffer(16);
            System.out.println(buffer2.getClass());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    **结果: **

    class io.netty.buffer.PooledUnsafeDirectByteBuf   // 池化直接内存
    class io.netty.buffer.PooledUnsafeHeapByteBuf     // 池化堆内存
    class io.netty.buffer.PooledUnsafeDirectByteBuf
    
    • 1
    • 2
    • 3
    • 直接内存创建和销毁的代价昂贵,但读写性能高 (少一次内存复制), 适合配合池化功能一起用
    • 直接内存对GC 压力小,因为这一部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放.

    池化和非池化

    优势: 重用ByteBuf

    • 没有池化,每次都得创建新的byteBuf实例,这个操作对直接内存代价昂贵,就算采用对堆内存,会有GC压力
    • 有了池化,重用池中byteBuf实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
    • 高并发时,池化功能更节约内存,减少内存溢出的问题

    开启:

    -Dio.netty.allocator.type={unpooled|pooled}
    
    • 1
    • 4.1 以后,非android 平台默认启用,android启用非池化
    • 4.1 以前都是非池化

    组成

    ByteBuf 主要有以下几个组成部分

    • 最大容量 与 当前容量
      • 构造ByteBuf时 两个参数, 一个初始容量,一个最大容量。第二个参数默认为integer.MAX_VALUE
      • 当byteBuf 容量无法容纳所有数据时,会进行扩容,但如果超过最大容量,会报出java.lang.IndexOutOfBoundsExecption异常
    • 读写不同:
      • ByteBuffer 只使用position 控制,而ByteBuf 分别由读指针和写指针控制。进行读写操作时,无需进行模式的切换.

    image.png

    写入

    常用方法:

    方法签名含义备注
    writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
    writeByte(int value)写入 byte 值
    writeShort(int value)写入 short 值
    writeInt(int value)写入 int 值Big Endian(大端写入),即 0x250,写入后 00 00 02 50
    writeIntLE(int value)写入 int 值Little Endian(小端写入),即 0x250,写入后 50 02 00 00
    writeLong(long value)写入 long 值
    writeChar(int value)写入 char 值
    writeFloat(float value)写入 float 值
    writeDouble(double value)写入 double 值
    writeBytes(ByteBuf src)写入 netty 的 ByteBuf
    writeBytes(byte[] src)写入 byte[]
    writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
    int writeCharSequence(CharSequence sequence, Charset charset)写入字符串CharSequence为字符串类的父类,第二个参数为对应的字符集

    注意

    • 这些方法的未指明返回值的,其返回值都是byteBuf, 可以链式带哦用来写入不同的数据
    • 网络传输中,默认习惯是 Big Endian, 使用 writeInt(int value)

    使用方法

    public class ByteBufStudy {
        public static void main(String[] args) {
            // 创建ByteBuf
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
            ByteBufUtil.log(buffer);
    
            // 向buffer中写入数据
            buffer.writeBytes(new byte[]{1, 2, 3, 4});
            ByteBufUtil.log(buffer);
    
            buffer.writeInt(5);
            ByteBufUtil.log(buffer);
    
            buffer.writeIntLE(6);
            ByteBufUtil.log(buffer);
    
            buffer.writeLong(7);
            ByteBufUtil.log(buffer);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    运行结果

    read index:0 write index:0 capacity:16
    
    read index:0 write index:4 capacity:16
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 01 02 03 04                                     |....            |
    +--------+-------------------------------------------------+----------------+
    
    read index:0 write index:8 capacity:16
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 01 02 03 04 00 00 00 05                         |........        |
    +--------+-------------------------------------------------+----------------+
    
    read index:0 write index:12 capacity:16
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 01 02 03 04 00 00 00 05 06 00 00 00             |............    |
    +--------+-------------------------------------------------+----------------+
    
    read index:0 write index:20 capacity:20
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
    |00000010| 00 00 00 07                                     |....            |
    +--------+-------------------------------------------------+----------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置

    扩容
    扩容规则:

    • 写入后未超过 512 字节,则选下一个16的整数倍扩容。
      • 例如 写入后12 个字节,则扩容后 capacity是16字节
    • 写入后超过 512字节,则选 2^n
      • 例如写入后 514, 则扩容后 2^10 = 1024 字节.
    • 当前扩容不能超过 maxCapacity

    读取

    截屏2022-06-18 12.22.43.png

    还有方法是采用get开头的一系列方法,这些方法不会改变read index

    回收/释放

    retain& release
    由于Netty 中有堆外内存(直接内存)的 byteBuf 实现,堆外内存最好是手动来释放,而不是等GC垃圾回收。

    • UnpooledHeapByteBuf 使用的是JVM 内存,只需等GC 回收内存即可
    • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
    • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

    Netty 这里使用了引用计数法来控制 回收内存,每个ByteBuf 都实现了 ReferenceCounted 接口

    • 每个 ByteBuf 对象的初始计数为1
    • 调用release 方法计数减1,如果计数为0,ByteBuf内存被回收
    • 调用 retain 方法计数+1,表示调用者没有用完之前,其他handler 即使调用了 release 也不会造成回收
    • 当计数为 0 时,底层内存会被回收,这时即使ByteBuf 对象还在,其各个方法均无法正常使用

    释放规则

    因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在每个 ChannelHandler 中都去调用 release ,就失去了传递性(如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
    基本规则: 谁是最后使用者,谁负责release

    • 起点,对于NIO 实现来讲,在io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
    • 入站 byteBuf 处理规则
      • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
      • 将原始 byteBuf 转换为其他类型的 java 对象,这时ByteBuf 就没有用了,必须release
      • 如果不调用** ctx.fireChannelRead(msg)向后传递,那么也必须release**
      • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
      • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
    • 出站 ByteBuf 处理原则
      • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由HeadContext flush后release
    HeadContext.write 
    
    @Override
            public final void write(Object msg, ChannelPromise promise) {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    // If the outboundBuffer is null we know the channel was closed and so
                    // need to fail the future right away. If it is not null the handling of the rest
                    // will be done in flush0()
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise, newClosedChannelException(initialCloseCause));
                    // release message now to prevent resource-leak
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 异常处理原则
      • 有时候不清楚ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回true
    while (!buffer.release()) {}
    
    • 1

    当ByteBuf 被传到了pipline的head 与tail 时,ByteBuf 会被其中的方法彻底释放,但前提是ByteBuf 被传递到了head 与 tail中

    TailContext 释放源码

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
        } finally {
            // 具体的释放方法
            ReferenceCountUtil.release(msg);
        }
    }
    
    //判断是否是 ByteBuf 因为ByteBuf 实现了ReferenceCounted接口.
    public static boolean release(Object msg) {
    	return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    切片

    ByteBuf 切片是 【零拷贝】 的体现之一,对原始ByteBuf 进行切片成多个 ByteBuf ,切片后的ByteBuf 并没有发生内存复制,还是使用原始ByteBuf的内存,切片后的ByteBuf 维护对应的read、write指针

    得到分片后的 buffer后,要调用其retain 方法,使其内部的引用计数 + 1。避免原ByteBuf 释放,导致切片也无法使用。
    修改原ByteBuf中的值,也会影响切片后得到的ByteBuf
    image.png

    package com.xlg.component.netty.ByteBuf;
    
    import com.xlg.component.ks.utils.ByteBufUtil;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.buffer.CompositeByteBuf;
    
    
    public class TestSlice {
        public static void main(String[] args) {
            // 创建ByteBuf
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
    
    
    
            // 向buffer中写入数据
            buffer.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
    
            // 将buffer分成两部分
            ByteBuf slice1 = buffer.slice(0, 5);
            ByteBuf slice2 = buffer.slice(5, 5);
    
            // 需要让分片的buffer引用计数加一
            // 避免原Buffer释放导致分片buffer无法使用
            slice1.retain();
            slice2.retain();
            
            ByteBufUtil.log(slice1);
            ByteBufUtil.log(slice2);
    
            // 更改原始buffer中的值
            System.out.println("===========修改原buffer中的值===========");
            buffer.setByte(0,5);
    
            System.out.println("===========打印slice1===========");
            ByteBufUtil.log(slice1);
    
    
            // 分片合并
            final CompositeByteBuf byteBufs = ByteBufAllocator.DEFAULT.compositeBuffer(10);
            byteBufs.addComponents(true, slice1, slice1);
            ByteBufUtil.log(slice1);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    当然还有 合并这些分片的方法, 还有copy(复制数据,内存地址变化)等

    优势

    • 池化思想-- 可以重用池中 ByteBuf 实例,更节约内存,减少内存溢出的可能。
    • **读写指针分离 **,不需要像 ByteBuffer 一样切换读写模式
    • 可以自动扩容
    • 支持链式调用,使用更加流畅
    • 很多地方体现零拷贝,比如
      • slice、duplicate、compositeByteBuf
  • 相关阅读:
    WPF 窗口白屏问题分析与初步解决
    bacnet cov机制详细介绍
    Qt5开发从入门到精通——第五篇四节( 文本编辑器 Easy Word 开发 V1.3详解 )
    【专栏】RPC系列(理论)-协议与序列化
    Python3-word文档操作(六):word文档中表格的操作-单元格文字居中,字体颜色等的设置
    齐次变换矩阵、欧拉角
    MATLAB向量化编程基础精讲教程
    python读取yaml文件内容
    HTML5期末大作业【红色的电影售票平台网站】web前端 html+css+javascript网页设计实例 企业网站制作
    自动驾驶(apollo)
  • 原文地址:https://blog.csdn.net/qq_41773026/article/details/125627768