• Netty(二)NIO-入门


    Netty 入门

    1. 概述

    1.1 Netty

    Netty是一个异步的,基于事件驱动的网络应用框架,用于快速开发可维护,高性能的网络服务器和客户端
    Cassandra,Spark,Hadoop,RocketMQ,ElasticSearch,gRPC,Dubbo,Spring5.x,Zookeeper都是基于netty开发。

    1.2 Netty优势

    相比NIO:构建自己的协议,解决TCP传输问题(粘包),epoll空轮询导致CPU100%,对API进行增强(FastThreadLocal-ThreadLocal)。
    相比其他网络框架:比Mina更简洁,存在时间更长

    2. 入门

    2.1 目标

    开发简单客户端和服务端,客户端发送消息,服务区接收消息

    <dependency>
    	<groupId>io.nettygroupId>
    	<artifactId>netty-allartifactId>
    	<version>4.1.39.Finalversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.2 通信代码及流程

    netty执行流程

    • 把channel理解为数据的通道
    • 把msg理解为流动的数据,最开始输入是ByteBuf,但经过pipeline的加工,会变成其他类型对象,最后输出又变成ButeBuf
    • 把handler理解为数据的处理工序:
      • 工序有多道,合起来是pipeline,pipeline负责发布事件(读,读取完成)传播给每个handler,handler对自己感兴趣的事件进行处理(重写对应方法)
      • handler分Inbound和Outbound两类
    • 把eventLoop理解为处理数据的工人
      • 工人可管理多个channel的io操作,并一旦工人负责了某个channel,就要负责到底(绑定)
      • 工人既可以执行io操作,也可进行任务处理,每个工人有任务队列,队列里可堆放多个channel的待处理任务,任务分为普通,定时任务
      • 工人按照pipeline顺序,依次按照handler的规划(代码)处理数据,可为每道工序执行不同的工人

    3. 组件

    3.1 EventLoop

    EventLoop本质是单线程执行器(同时维护一个Selector),里面有run方法处理channel上源源不断的io事件。
    继承关系
    - 一是继承自juc包下的ScheduledExecutorService,因此包含线程池中所有方法
    - 二是继承自netty的OrderedEventExcutor,提供了 inEventLoop(thread)方法判断存在,提供了parent方法查询归属哪个EGroup
    EventLoopGroup是一组EventLoop,Channel一般会调用ELoopGroup的register方法来绑定一个EventLoop,后续这个Channel上的io事件都由此ELoop来处理。(保证io事件处理的线程安全)
    -三是继承netty的EventExecutorGroup。实现了Iterable接口遍历EventLoop的能力,另有next方法获取集合下一个Eloop。

    EventLoopGroup

    EventLoopGroup是一组EventLoop,Channel一般会调用EventLoopGroup的register方法来绑定其中一个EventLoop,后续这个channel上的IO事件都由此EventLoop来处理(线程安全)
    继承自netty自己的EventExecutorGroup:
    - 实现了Iterable接口提供遍历EventLoop的能力
    - 另有next方法获取集合中下一个EventLoop

    代码

    实现:boss处理连接事件,两个worker处理读写事件:

    public class EventLoopServer {
        public static void main(String[] args) {
            //问题:一个handler耗时较长会拖慢整个worker上的channel
            //改进:创建新的EventLoopGroup处理耗时长的
            DefaultEventLoopGroup group = new DefaultEventLoopGroup();
            new ServerBootstrap()
                    // 改进:EventLoopGroup分工明确
                    // 参数一:boss只处理accept时间 参数二:worker只处理socketChannel(多个客户端可多路复用共用channel)读写操作
                    .group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            ch.pipeline().addLast("handler1",new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                    ByteBuf buf = (ByteBuf) msg;
                                    String s = buf.toString(Charset.defaultCharset());
                                    System.out.println(s);
                                    ctx.fireChannelRead(msg);  //将消息传递给下一个handler
                                }
                            }).addLast(group, "handler2",new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                    ByteBuf buf = (ByteBuf) msg;
                                    String s = buf.toString(Charset.defaultCharset());
                                    System.out.println(s);
                                }
                            });
                        }
                    }).bind(8080);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    可以看到两个工人轮流处理 channel,但工人与 channel 之间进行了绑定。
    加入DefaulGroup后nio工人和非nio工人也分别绑定了channel(LoggingHandler由nio工人执行,而我们自己的 handler由非nio工人执行)。

    handler 执行中如何换人?

    如果两个handler绑定的是同一个EventLoop(线程),就直接调用,否则要把被调用的代码放在Runnable对象中传给下一个handler的线程处理
    源码:

    void invokeChannelRead(final AbstractChannelHandlerCOntext next, Object msg){
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            //下一个handler的事件循环是否与当前的事件循环是同一个线程
            EventExecutor executor = next.executor();
            //是,直接调用
            if(executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {  //不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
                executor.execute(new Runnable(){
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);  //下一个handler线程
                    }
                });
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3.2 Channel

    channel主要作用:

    close:关闭channel
    closeFuture:处理channel的关闭(sync同步关闭,addListener异步关闭)
    pipeline:添加流水线处理器
    write数据写入
    writeAndFlush:数据写入并刷出

    ChannelFuture
    		//带有future,promise的类型都是和异步方法一起使用,用来处理结果
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(ChannelInitializer<NioSocketChannel>() ch.pipeline().addLast(new StringEncoder());})
                    //connect:异步非阻塞,main调用,真正执行的连接的是nio线程,连接需要花费1s,如果没有调用sync会无阻塞向下获取channel
                    .connect(new InetSocketAddress("localhost", 8080));
            //1.使用sync方法同步处理结果
            channelFuture.sync();  //阻塞当前线程,直到nio线程建立完毕
            Channel channel = channelFuture.channel();
    		//2.使用addListener(回调对象)方法异步处理结果,主线程不用等,全部交给nio线程处理
    		channelFuture.addListener(new ChannelFutureListener() {
                //在nio线程连接建立好后,会调用operationComplete
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Channel channel = future.channel();
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    CloseFuture
    		Channel channel = channelFuture.sync().channel();
            new Thread(() -> {
                Scanner scanner = new Scanner(System.in);
                while (true) {
                    String line = scanner.nextLine();
                    if("q".equals(line)) {
                        channel.close();  //close异步操作,善后操作不应该在这之后,交给closeFuture处理
                        break;
                    }
                    channel.writeAndFlush(line);
                }
            },"input").start();
            //获取closeFuture对象, 1)同步处理关闭 2)异步处理关闭
            //1.同步:主线程执行关闭
            ChannelFuture closeFuture = channel.closeFuture();
            closeFuture.sync();
            System.out.println("执行善后操作");
            //2.异步:调用线程处理关闭
            closeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("执行善后操作");
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    优雅关闭

    优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。

    		NioEventLoopGroup group = new NioEventLoopGroup();
            closeFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("执行善后操作");
                    group.shutdownGracefully();
                }
            });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    异步思考

    多线程:单线程做所有事
    在这里插入图片描述
    优化:单线程只做一件事:
    在这里插入图片描述

    • 单线程没法异步提高效率,必须多线程,多核cpu才能发挥异步优势
    • 异步没有缩短响应时间,反而又增加
    • 合理任务拆分

    3.3 Future & Promise

    netty的Future继承自jdk的Future,而Promise对netty的Future进行扩展。

    • jdk的Future只能同步等待任务结束才能得到结果
    • netty Future可同步等待任务结束得到结果,也可异步等待结果
    • netty Promise不仅有future共嗯,而且脱离了任务独立存在,只作为两线程间传递结果的容器。
    功能/名称jdk Futurenetty FuturePromise
    cancel取消任务
    isCanceled任务是否取消
    isDone任务是否完成,不能区分成功失败
    get获取任务结果,阻塞等待
    getNow获取任务结果,非阻塞,还未产生结果时返回 null
    await等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断
    sync等待任务结束,如果任务失败,抛出异常
    isSuccess判断任务是否成功
    cause获取失败信息,非阻塞,如果没有失败,返回null
    addLinstener添加回调,异步接收结果
    setSuccess设置成功结果
    setFailure设置失败结果
    代码测试:https://gitee.com/xuyu294636185/netty-demo

    3.4 Handler & Pipeline

    ChannelHandler用来处理Channel上的各种事件,分为入站,出站两种。所有handler连起来就是pipeline。

    • 入站通常是ChannelInboundHandlerAdapter的子类,主要用于读取客户端数据,写回结果
    • 出站通常是ChannelOutboundHandlerAdapter的子类,主要对写回结果进行加工
      每个Channel是加工车间,pipeline是流水线,ChannelHandler是流水线上各道工序,ButeBuf是原材料。
    			new ServerBootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            //1.通过channel拿到pipeline
                            ChannelPipeline pipeline = ch.pipeline();
                            //2.添加处理器 headHandler - addLast(添加位置) h1 - h2 - h3 - tailHandler,底层采用双向链表
                            //入站
                            pipeline.addLast("h1", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    String str = buf.toString(Charset.defaultCharset());
                                    super.channelRead(ctx, msg);  //将加工后的str传递下一个handler处理,不调用链会断开
    //                                ctx.fireChannelRead(msg);  //或者使用此方法传递
                                }
                            });
                            pipeline.addLast("h2", new ChannelInboundHandlerAdapter(){
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    super.channelRead(ctx, msg);
                                    ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes()));  //从当前的节点向前找出战处理器
                                    ch.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes()));  //从尾部的节点向前找出战处理器
                                }
                            });
                            //出站
                            pipeline.addLast("h3", new ChannelOutboundHandlerAdapter(){
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    super.write(ctx, msg, promise);
                                }
                            });
                        }
                    })
                    .bind(8080);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3.5 ByteBuf

    是对字节数据的封装

    创建

    ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);

    直接内存&堆内存

    ByteBufAllocator.DEFAULT.heapBuffer(10);//池化基于堆内存
    ByteBufAllocator.DEFAULT.DirecBuffer(10);//池化直接内存,创建销毁代价高,性能好,不受jvmGC管理,需主动释放

    池化&非池化

    池化可以重用ByteBuf。不必每次都创建新的实例,采用类似jemalloc分配,高并发时更节约内存,减少溢出。

    Dio.netty.allocator.type={unpooled 禁用|pooled 开启} 池化开启,通过设置环境变量设置

    组成

    ByteBuf组成

    • 第一个部分是已经丢弃的字节,这部分数据是无效的;(已经读过的内容)
    • 第二部分是可读字节,这部分数据是 ByteBuf 的主体数据, 从 ByteBuf 里面读取的数据都来自这一部分;(已经写入但还未读取的内容)
    • 第三部分数据是可写字节,所有写到 ByteBuf 的数据都会写到这一段;(剩余可写入数据的空间大小)
    • 最后一部分表示的是该 ByteBuf 最多还能扩容多少容量

    从 ByteBuf 中每读取一个字节,readerIndex 自增1,ByteBuf 里面总共有 writerIndex-readerIndex 个字节可读, 由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读;
    写数据是从 writerIndex 指向的部分开始写,每写一个字节,writerIndex 自增1,直到增到 capacity,这个时候,表示 ByteBuf 已经不可写了;
    ByteBuf 里面还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错。

    写入 & 读取
    方法签名含义备注
    writeBoolean(boolean value)写入 boolean 值用一字节 01|00 代表 true|false
    writeByte(int value)写入 byte 值
    writeShort(int value)写入 short 值
    writeInt(int value)写入 int 值Big Endian大端,即 0x250,写入后 00 00 02 50
    writeIntLE(int value)写入 int 值Little Endian小端,即 0x250,写入后 50 02 00 00
    writeLong(long value)写入 long 值
    writeChar(int value)写入 char 值
    writeFloat(float value)写入 float 值
    writeDouble(double value)写入 double 值
    writeBytes(ByteBuf src)写入 netty 的 ByteBuf
    writeBytes(byte[] src)写入 byte[]
    writeBytes(ByteBuffer src)写入 nio 的 ByteBuffer
    int writeCharSequence(CharSequence sequence, Charset charset)写入字符串
    • 方法未指明返回值的,其返回值都是ByteBuf,意味可链式调用
    • 网络传输,默认Big Endian
    buffer.writeBytes(new byte[]{1,2,3,4,});
    buffer.writeInt(5);
    System.out.println(buffer.readByte());
    
    • 1
    • 2
    • 3
    扩容
    • 如何写入后数据大小未超过 512,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16
    • 如果写入后数据大小超过 512,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 210=1024(29=512 已经不够了)
    • 扩容不能超过 max capacity 会报错
    retain & release

    由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是手动来释放,而不是等 GC 垃圾回收。

    • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
    • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
    • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
      Netty 这里采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
    • 每个 ByteBuf 对象的初始计数为 1
    • 调用 release 方法计数减 1,如果计数为 0,ByteBuf 内存被回收
    • 调用 retain 方法计数加 1,表示调用者没用完之前,其它 handler 即使调用了 release 也不会造成回收
    • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还在,其各个方法均无法正常使用

    请思考,因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果在 finally 中 release 了,就失去了传递性(当然,如果在这个 ChannelHandler 内这个 ByteBuf 已完成了它的使命,那么便无须再传递)
    基本规则是,谁是最后使用者,谁负责 release,详细分析如下

    • 起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
    • 入站 ByteBuf 处理原则
      • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
      • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
      • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
      • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
      • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
    • 出站 ByteBuf 处理原则
      • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
    • 异常处理原则
      • 有时候不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
        TailContext 释放未处理消息逻辑
    // io.netty.channel.DefaultChannelPipeline#onUnhandledInboundMessage(java.lang.Object)
    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    具体代码

    // io.netty.util.ReferenceCountUtil#release(java.lang.Object)
    public static boolean release(Object msg) {
        if (msg instanceof ReferenceCounted) {
            return ((ReferenceCounted) msg).release();
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    Slice

    【零拷贝】的体现之一,对原始ByteBuf进行切片成多个ByteBuf,切片后的ByteBuf并没有发生内存复制,还是使用原始ByteBuf内存
    ,切片后ByteBuf维护独立的read,write指针。

    duplicate

    【零拷贝】的体现之一,好比截取原始ByteBuf所有内容,并没有max capacity的限制,与原始ByteBuf使用同一块底层内存,只是读写指针是独立的。
    duplicate

    copy

    会将底层内存数据进行深拷贝,无论读写,都与原始ByteBuf无关。

    compositeBuffer

    【零拷贝】的体现之一,可以将多个 ByteBuf 合并为一个逻辑上的 ByteBuf,避免拷贝
    CompositeByteBuf 是一个组合的 ByteBuf,它内部维护了一个 Component 数组,每个 Component 管理一个 ByteBuf,记录了这个 ByteBuf 相对于整体偏移量等信息,代表着整体中某一段的数据。

    • 优点,对外是一个虚拟视图,组合这些 ByteBuf 不会产生内存复制
    • 缺点,复杂了很多,多次操作会带来性能的损耗
    Unpooled

    提供非池化的ByteBuf创建,组合和复制等操作。

    ByteBuf buf = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
    ByteBufUtil.prettyHexDump(buf);
    
    • 1
    • 2
    ByteBuffer的优势
    • 池化:可以重用池中ByteBuf实例,更节约内存,减少内存溢出的可能。
    • 读写指针分离,不需要像ByteBuffer一样切换读写模式
    • 可自动扩容
    • 支持链式调用
    • 很多方法支持零拷贝

    4 双向通信

    4.1 实现一个echo server

    代码:https://gitee.com/xuyu294636185/netty-demo.git

  • 相关阅读:
    进程管理_Linux渗透_安全加固
    连接到 GBase LDAP 服务器
    php编程入门先学什么 PHP程序员需要具备哪些技能
    Solidus Labs欢迎香港前金融创新主管赵嘉丽担任战略顾问
    python 如何根据索引快速删除列表中的多个元素
    OA项目之会议发布
    内网渗透之Linux反弹shell(综合)
    建筑能源管理(5)——建筑能源审计和审计方法
    深度学习——day39 综述——(2019 计算机学报)深度卷积神经网络的发展及其在计算机视觉领域的应用_张顺
    FlutterUnit 周边 | 收录排序算法可视化
  • 原文地址:https://blog.csdn.net/xy294636185/article/details/132759460