• Netty学习(二)概述+EventLoop+Channel+ByteBuf


    概述

    netty 不是 异步io ,只是调用时候的异步 ,有事件才处理 。

    • group:接收事件 你什么事件 交给一个合适的 Handler
    • initChannel:里面注册Handler
    • StringDecoder: 真正的处理事件的Handler 解码
    • SimpleChannelInboundHandler: 真正的处理事件的Handler 打印数据

    在这里插入图片描述

    服务端代码
    package com.example.nettytest01.netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class HelloServer {
        public static void main(String[] args) {
            // 服务器端启动器 组装netty组件
            new ServerBootstrap()
                    //包含线程和选择器
                    .group(new NioEventLoopGroup())  // 监听accept事件 找到对应的一个accept
                    //OIO BIO 选择服务器的 NioServerSocketChannel
                    .channel(NioServerSocketChannel.class)
                    //boss 处理连接 ,worker 处理读写 ,简单理解是 handler
                    .childHandler(
                            //Channel 数据读写通道  Initializer初始化器 添加别的handler
                            new ChannelInitializer() {
                        @Override   // 在连接建立之后执行
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                           //将传输来的数据转化为字符串  ,然后给下一个处理器 SimpleChannelInboundHandler
                            nioSocketChannel.pipeline().addLast(new StringDecoder()) ;
    
                            //自定义的Handler
                            nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler() {
    
                                //读事件  处理传过来的读事件   String:接收过来的数据
                                @Override
                                protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
                                   // 打印上一步处理好的字符串
                                    System.out.println(s);
                                }
                            });
                        }
                    })
                    //监听端口
                    .bind(8080) ;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    客户端代码
    package com.example.nettytest01.netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    
    public class HelloClient {
    
        public static void main(String[] args) throws InterruptedException {
            //启动类
            new Bootstrap()
                    //添加EventLoop
                    .group(new NioEventLoopGroup()) //关注Accept
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() { //添加处理器 ,只有在连接建立才回执行initChannel()
                        //在连接建立后 被调用
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { // 添加处理器
                            nioSocketChannel.pipeline().addLast(new StringEncoder()) ;
                        }
                    })
                    .connect("localhost",8080)
                    .sync()     // 阻塞方法  ,知道连接建立
                    .channel()  //代表一个channel
                    .writeAndFlush("hello word") ; // 把数据转为ByteBuf
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    res :
    在这里插入图片描述
    在这里插入图片描述

    EventLoop

    本质上就是个单线程执行器 同时维护了一个(selector :事件监听),里面的 这个线程里面 的run方法 处理源源不断的channel上的事件 。
    在这里插入图片描述
    EventLoop还是单线程 ,一个线程对应多个channel 。
    在这里插入图片描述

    事件循环组

    EventLoopGroup: 是一组 EventLoop ,channel 一般回调用EventLoopGroup的register方法绑定 其中一个EventLoop
    后续就像结婚一样 ,以后所有Channel的io事件都由 EVentLoop来处理
    一条通道绑定到一个工人身上
    在这里插入图片描述

    
            // 创建事件循环组     默认线程数 = 你 cup核心数*2
            NioEventLoopGroup group = new NioEventLoopGroup(4); //io 普通任务 定时任务
    
           while (true){
           System.out.println(group.next());
           }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    提交任务

       public static void main(String[] args) {
    
            // 创建事件循环组     默认线程数 = 你 cup核心数*2
            NioEventLoopGroup group = new NioEventLoopGroup(4); //io 普通任务 定时任务
    
            //提交普通任务
            group.next().submit(()->{
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }) ;
            
            //提交定时任务 
            group.next().scheduleAtFixedRate(()->{
                log.info("ok");
            },0,1, TimeUnit.SECONDS) ;
            
            log.debug("mian ");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    为了防止单个线程io时间过长 , 需要独立出来一个 EventLoopGroup 专门处理io事情

    在这里插入图片描述

    服务端
    public class MyServer {
        public static void main(String[] args) {
    
            //处理耗时较长的Handler
            DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup();
    
            new ServerBootstrap()
                    // 1. NioEventLoopGroup:Boss 处理serverSockChannel 上的一个  accept 事件,
                    //                           因为serverSockChannel只有一个。NioEventLoopGroup 所有只有一个线程
                    // 2. NioEventLoopGroup:woker 处理 sockChannel上的读写事件
                    .group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
    
                            // 执行读取 时间 ,让一个 defaultEventLoopGroup 进行关联 ,下面的用的就是
                            //  defaultEventLoopGroup 中的线程 不是 new NioEventLoopGroup(2) 中的线程咯
                            // 说白了就是 分工细化
                            socketChannel.pipeline().addLast(defaultEventLoopGroup,"handler1 ",
                                    new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                    ctx.fireChannelRead(msg) ; //将消息传给下一个handler
                                }
                            }).addLast(defaultEventLoopGroup,"handler2",new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        ByteBuf buf = (ByteBuf) msg;
                                        System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                    }
                            });
    
                        }
                    })
                    .bind(8080);
        }
    }
    
    
    客户端
    
    public class MyClient {
        public static void main(String[] args) throws IOException, InterruptedException {
            Channel channel = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost", 8080))
                    .sync()
                    .channel();
            System.out.println(channel);
            // 此处打断点调试,调用 channel.writeAndFlush(...);
            System.in.read();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    handler 中如何换人

    第一个EventLoop 获取下一个EventLoop ,看是不是自己 , 是自己 这个任务就交给自己执行 ,
    否则就交给 下一个EventLoop
    在这里插入图片描述

    Channel

    close关闭
    closeFuture将channel关闭
    sync阻塞方法 ,直到连接建立,同步等待channel关闭
    addListener异步等待 channel关闭
    pipeline()添加处理器
    writer()将数据写入
    writeAndFlush将数据写入并且刷出 == writer() + flush()
    
    public class MyClient {
        public static void main(String[] args) throws IOException, InterruptedException {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 1. 连接到服务器   mian线程 将连接服务器的请求交给别的线程 去执行 -》 异步非阻塞
                    // 是另一个线程建立的连接 ( nio 线程 ) 大约 1 s
                    .connect(new InetSocketAddress("localhost", 8080));
    
            
             // 如果不使用此方法 , 主线程会真正的执行 channel.writeAndFlush("ni hao ya mememem ");
            //   但是此时  主线程 还没有和 服务器建立连接  就会报错。没有管道向管道输出
             channelFuture.sync() ;
    
            Channel channel = channelFuture.channel();
    
    
    
            System.out.println(channel);
    
            for (int i = 0 ;i <10;i++){
                Thread.sleep(3000);
                channel.writeAndFlush("ni hao ya mememem ");
            }
    
            // 此处打断点调试,调用 channel.writeAndFlush(...);
            System.in.read();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    在这里插入图片描述

    上面第 36 行是错的 !!
    我试了一下也是,感觉应该是write方法会等到连接建立好了再写?因为另外一个线程在执行connect方法,只不过main这边获得的channel只是还没建立连接,但是已经有了,到了write

    优雅的循环向服务器写,并且优雅的异步关闭客户端

    
    @Slf4j
    public class EventLoopGroupTest02 {
    
        public static void main(String[] args) throws InterruptedException {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
    
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost",8080)) ;
    
            Channel channel = channelFuture.sync().channel();
    
            new Thread(()->{
                Scanner scanner = new Scanner(System.in);
                while (true){
                    String line = scanner.nextLine();
                    log.info("当前时间:{},当前线程:{}",new Date(),Thread.currentThread().getName());
                    if (line.equals("q")){
                        channel.close(); // channel.close()是一个异步操作 !!假设3秒执行完毕关闭管道
                                         //  log.info("处理关闭之后的 操作 "); 在第一秒就执行了
                       //  log.info("处理关闭之后的 操作 ");
                        break;
                    }
                    log.info("当前时间:{},当前线程:{}",new Date(),Thread.currentThread().getName());
    
                    channel.writeAndFlush(line) ;
                }
            },"input").start();
    
            //处理关闭方法 -- 善后处理
            ChannelFuture closeFuture = channel.closeFuture();
      /*
          同步执行关闭 , 在主线程上执行善后处理
          log.info("waiting close....");
            //阻塞当前线程 ,只有当   channel.close();  执行之后才能继续向下运行
            closeFuture.sync() ;
            log.info("开始执行善后方法咯....");   */
    
            //异步执行执行善后方法   channelFuture     closeFuture
            closeFuture.addListener(
                    // 回调对象 执行关闭方法 在异步线程上执行 善后处理
                    new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    log.info("当前时间:{},当前线程:{}",new Date(),Thread.currentThread().getName());
                    log.info("开始执行善后方法咯....");
                }
            });
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    为什么要异步

    • 任务划分
    • 异步没有缩短响应时间 ,反而有所增加
    • 合理任务拆分
      在这里插入图片描述
      在这里插入图片描述

    在这里插入图片描述

    Jdk 同步获取异步任务结果

    future 可以看作一个书包 , 异步线程是你同学 ,你同学往你书包里面放 integer  ,放完了 你调用书包的get()方法 获得 integer  
    future 就是在线程间 传递结果的容器
    @Slf4j
    public class JdkFuture {
    
        //异步提交任务 主线程同步获取结果
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(2);
    
            Future future = executorService.submit(new Callable() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(2000);
                    return 50;
                }
            });
            log.info("waiting...res:{}",new Date(),Thread.currentThread().getName());
            //主线程通过futer 获取结果
            Integer integer = future.get();
            log.info("当前时间:{},当前线程:{},获取的结果:{}",new Date(),Thread.currentThread().getName(),integer);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    netty NioEventLoopGroup中的 线程获取结果

    
    @Slf4j
    public class NettyFuter {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            NioEventLoopGroup loopGroup = new NioEventLoopGroup();
            EventLoop eventLoop = loopGroup.next();
    
            Future future = eventLoop.submit(new Callable() {
    
                @Override
                public Integer call() throws Exception {
                    log.info("当前时间:{},当前线程:{}", new Date(), Thread.currentThread().getName());
                    log.info("正在获取结果");
                    Thread.sleep(3000);
                    return 10;
                }
            });
            log.info("主线程正在获取结果");
            log.info("当前时间:{},当前线程:{},获取的结果:{}",new Date(),Thread.currentThread().getName(),future.get());
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    异步获取结果

    
    @Slf4j
    public class NettyFuter {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            NioEventLoopGroup loopGroup = new NioEventLoopGroup();
            EventLoop eventLoop = loopGroup.next();
    
            Future future = eventLoop.submit(new Callable() {
    
                @Override
                public Integer call() throws Exception {
                    log.info("当前时间:{},当前线程:{}", new Date(), Thread.currentThread().getName());
                    log.info("正在获取结果");
                    Thread.sleep(3000);
                    return 10;
                }
            });
          /*  log.info("主线程正在获取结果");
            log.info("当前时间:{},当前线程:{},获取的结果:{}",new Date(),Thread.currentThread().getName(),future.get());
    */
    
            // 异步获取结果
            future.addListener(new GenericFutureListener>() {
                @Override
                public void operationComplete(Future future) throws Exception {
                    //getNow 立刻获得结果 非阻塞  ,因为 这是触发回调执行的 ,结果已经有了
                    log.info("当前时间:{},当前线程:{},获取的结果:{}",new Date(),Thread.currentThread().getName(),future.getNow());
    
                }
            }) ;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    在这里插入图片描述

    promise 同步获取结果

    promise 相当于是一个书包 ,但是这个书包是我们自己手动创建的

    @Slf4j
    public class PromiseNetty {
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            EventLoop group = new NioEventLoopGroup().next();
    
            DefaultPromise promise = new DefaultPromise<>(group);
    
            new Thread(()->{
                // 完成计算后向promise 中放结果
                log.info("开始计算~~");
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                //往包里面放结果
                promise.setSuccess(8) ;
            }).start();
    
            log.info("这在等待结果,当前时间:{},当前线程:{}", new Date(), Thread.currentThread().getName());
            // 同步获取结果
            //promise.get() 是同步阻塞 1.主线程获取结果 2.没有结果这里会一直阻塞
            log.info("这在等待结果,当前时间:{},当前线程:{},结果是:{}", new Date(), Thread.currentThread().getName(),promise.get());
    
            log.info("我已经获取结果咯,当前时间:{},当前线程:{}", new Date(), Thread.currentThread().getName());
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    在这里插入图片描述

    Handler & Pipeline

    分为入站 和出战 handler

    • 入站Handler : ChannelInboundHandlerAdapter 的子类 ,用来读取客户端数据 ,写回结果
    • 出站Handler : ChannelOutboundHandlerAdapter 的子类 ,用来 对写回结果进行加工
    客户端
    
    @Slf4j
    public class EventLoopGroupTest02 {
    
        public static void main(String[] args) throws InterruptedException {
    
            NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
    
            ChannelFuture channelFuture = new Bootstrap()
                    .group(eventExecutors)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost",8080)) ;
    
            Channel channel = channelFuture.sync().channel();
    
            new Thread(()->{
                Scanner scanner = new Scanner(System.in);
                while (true){
                    String line = scanner.nextLine();
                    log.info("当前时间:{},当前线程:{}",new Date(),Thread.currentThread().getName());
                    if (line.equals("q")){
                        channel.close(); // channel.close()是一个异步操作 !!假设3秒执行完毕关闭管道
                                         //  log.info("处理关闭之后的 操作 "); 在第一秒就执行了
                       //  log.info("处理关闭之后的 操作 ");
                        break;
                    }
                    log.info("当前时间:{},当前线程:{}",new Date(),Thread.currentThread().getName());
    
                    channel.writeAndFlush(line) ;
                }
            },"input").start();
    
            //处理关闭方法 -- 善后处理
            ChannelFuture closeFuture = channel.closeFuture();
      /*
          同步执行关闭 , 在主线程上执行善后处理
          log.info("waiting close....");
            //阻塞当前线程 ,只有当   channel.close();  执行之后才能继续向下运行
            closeFuture.sync() ;
            log.info("开始执行善后方法咯....");   */
    
            //异步执行执行善后方法   channelFuture     closeFuture
            // 回调对象 执行关闭方法 在异步线程上执行 善后处理
            closeFuture.addListener((ChannelFutureListener) channelFuture1 -> {
                        log.info("当前时间:{},当前线程:{}",new Date(),Thread.currentThread().getName());
                        log.info("开始执行善后方法咯....");
                        log.info("eventExecutors还没关闭哦....下面执行 异步线程组的关闭!!");
                        //  eventExecutors.shutdownGracefully()  执行关闭 ,把没发完的数据发完
                        eventExecutors.shutdownGracefully() ;
                    });
    
    
        }
    }
    
    服务端
    
    @Slf4j
    public class TsetPeline {
        public static void main(String[] args) {
            new ServerBootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer() {
                        @Override
                        protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                            // 1. 通过channel 拿到 pipeline
                            ChannelPipeline pipeline = nioSocketChannel.pipeline();
               // 添加处理器 head <-> h1 (pipeline.addLast()) <-> h2 <->tail
                            pipeline.addLast("h1",new ChannelInboundHandlerAdapter(){
                                @Override
                                // 入站handler 一般关注读事件
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    log.info("h1,这里是ChannelInboundHandlerAdapter 当前线程:{}",Thread.currentThread().getName());
                                    log.info("当前时间:{},当前线程:{}", new Date(), Thread.currentThread().getName());
                                    ByteBuf name = (ByteBuf) msg ;
    
                                    Person student = new Person(name.toString(Charset.defaultCharset()));
                                    //  super.channelRead(ctx,student) ==  ctx.fireChannelRead(msg) 将数据传递给下一个handler
                                    super.channelRead(ctx,student);
                                }
                            });
    
                            pipeline.addLast("h2",new ChannelInboundHandlerAdapter(){
                                @Override
                                // 入站handler 一般关注读事件
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    log.info("h2,这里是ChannelInboundHandlerAdapter 当前线程:{}",Thread.currentThread().getName());
                                    log.info("h2获得了信息:{}",(Person)msg);
                                    super.channelRead(ctx,msg);
                                    // 注意千万别是 ctx : ctx 是向前找 ChannelOutboundHandlerAdapter ,
                                     //nioSocketChannel 是从当前往后找ChannelOutboundHandlerAdapter 
                                    nioSocketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("ni hao ya client".getBytes()));
                                }
                            });
    
    
    
                            pipeline.addLast("h3",new ChannelOutboundHandlerAdapter(){
                                //出站处理器 只有向channel 写数据才会触发
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.info("h3");
                                    super.write(ctx,msg,promise);
                                }
                            }) ;
    
                            pipeline.addLast("h4",new ChannelOutboundHandlerAdapter(){
                                //出站处理器 只有向channel 写数据才会触发
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    log.info("h4");
    
                                    super.write(ctx,msg,promise);
                                }
                            }) ;
                        }
                    })
                    .bind(8080) ;
        }
    
    
        @Data
        @ToString
        @AllArgsConstructor
        static class Person{
            private String name ;
        }
    
    }
    ctx : ctx 是向前找 ChannelOutboundHandlerAdapter ,
                                     //nioSocketChannel 是从当前往后找ChannelOutboundHandlerAdapter
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140

    客户端

    ByteBuf

    ByteBuf优势
    1.池化: ByteBuf可以重用, 类似于java的常量池,减少内存溢出
    2. 读写指针分离:不需要像ByteBuffer切换读写指针
    3.可以自动扩容
    4.支持链式调用
    5.很多地方体现零拷贝(不用复制数据的拷贝,但是切记 设置retaiin() :引用计数){下面会具体使用}

    可以动态扩容
    在这里插入图片描述
    在这里插入图片描述

    默认容量太大了 ,可以指定一个 最大容量,超过容量会申请扩容

    直接内存VS 堆内存

    在这里插入图片描述
    默认是直接内存的!!

    池化vs非池化

    在这里插入图片描述

    有点类似于 线程池,默认开启
    在这里插入图片描述

    写入方法

    在这里插入图片描述
    Big Endian 大端(网络编程一般大端): 先写高位再写地位
    Little Endian 小端:相反
    在这里插入图片描述

    扩容

    在这里插入图片描述

    读取

    例如读取四个字节
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    retain & release

    UnpooledHeapByteBuf 使用的是JVM内存 只需要GC回收内存
    UnpooledDirByteBuf 使用的是直接内存 需要特殊方法进行回收
    PooledByteBuf 和它的子类使用了池化机制 ,需要更加复杂的规则回收
    ad
    有点像信号量
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在handlerAdapter调用链中 故谁最后使用了ByteBuf谁释放
    head :实现了 处理入站 和出站 Hadler
    在这里插入图片描述

    slice

    零拷贝: 网络传输数据的时候 数据不经过java内存直接从文件传到socket,减少多次内存复制,提高传输性能
    在这里插入图片描述

    
        public static void main(String[] args) {
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
           buffer.writeBytes(new byte[]{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) ;
            ByteBuf s1 = buffer.slice(0, 5);
            ByteBuf s2 = buffer.slice(6, 10);
    
            log.debug("s1:{}",s1);
            log.debug("s2:{}",s2);
    
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    切片不能增加数据 会报错
    在这里插入图片描述
    2.假如buf被release 切片数据也会消失。

        public static void main(String[] args) {
            ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
           buffer.writeBytes(new byte[]{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) ;
            ByteBuf s1 = buffer.slice(0, 5);
            ByteBuf s2 = buffer.slice(6, 10);
    
          //buffer 的引用计数加一 ,  buffer.release() 不会释放
            // 但是调用2 次就会被释放
            s1.retain() ;
    
            log.debug("s1:{}",s1);
            log.debug("s2:{}",s2);
            buffer.release() ;
            buffer.release() ;
            log.debug("s1:{}",s1);
            log.debug("s2:{}",s2);
            log.debug("buffer:{}",buffer);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    在这里插入图片描述
    做一个切片 调用一个 slice.retain()(buffer的引用计数器加一) ,使用完毕后 记得 buffer.release() 一次 ,避免 别人调用 buffer.release() 将你的数据清空

    duplicate

    在这里插入图片描述

    零拷贝跟 slice相似 ,但是不同的是 duplicate (是复制的意思 ),截取的是从头到尾

    copy

    将底层数据进行深拷贝,因此无论读写 , 都与Bytebuf无关

    CompositeByteBuf

    在这里插入图片描述

    CompositeByteBuf:避免了数据的复制
    // increaseWriterIndex: 设置是否自动增长 不然他的 widx(写指针)不动
    //CompositeByteBuf(ridx: 0, widx: 0, cap: 16, components=2)-》
    //CompositeByteBuf(ridx: 0, widx: 16, cap: 16, components=2)
    // 也需要注意引用计数问题!!! ,避免引用计数被减到零

           ByteBuf buffer1 = ByteBufAllocator.DEFAULT.buffer(16, 20);
            buffer1.writeBytes(new byte[]{0,1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) ;
    
            ByteBuf buffer2= ByteBufAllocator.DEFAULT.buffer(16, 20);
            buffer1.writeBytes(new byte[]{0,1, 2, 3, 4}) ;
    
            //组合两个buffer且 不发生复制
            CompositeByteBuf compositeBuffer = ByteBufAllocator.DEFAULT.compositeBuffer();
            // increaseWriterIndex: 设置是否自动增长 不然他的 widx(写指针)不动
            //CompositeByteBuf(ridx: 0, widx: 0, cap: 16, components=2)-》
            //CompositeByteBuf(ridx: 0, widx: 16, cap: 16, components=2)
            compositeBuffer.addComponent(true,buffer1);
            compositeBuffer.addComponent(true,buffer2) ;
    
            log.info("compositeBuffer:{}",compositeBuffer);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
  • 相关阅读:
    移动硬盘raw怎么办?一招教你解决RAW格式的文件
    有关供应链大赛的一些习得的经验和感受
    Zabbix 5.0 监控教程(三)
    我的毕业设计思路
    学信息系统项目管理师第4版系列31_信息系统工程
    B2C在线教育商城--前后端分离部署
    PyCharm配置Anaconda PyQt5开发环境
    第九届世界渲染大赛什么时候开始举办?
    kafka-生产者事务-数据传递语义&事务介绍&事务消息发送
    JavaScript-Object.defineProperty函数
  • 原文地址:https://blog.csdn.net/weixin_45699541/article/details/126331484