• 【Netty】一、高性能NIO通信框架Netty-快速入门


    一、What is Netty?

    Netty is an asynchronous event-driven network application framework
    for rapid development of maintainable high performance protocol servers & clients.
    Netty是由JBOSS提供的一个java开源框架,是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端;
    Netty 是对Java的高级网络编程的封装,隐藏了Java背后的复杂性,提供了一个易于使用的 API 客户端/服务器框架;底层是JDK里面的java.net.、java.nio.、java.util.concurrent.* 包下的封装;
    官网:https://netty.io/
    Github:https://github.com/netty/netty

    Netty内部架构模块:在这里插入图片描述

    Netty在大量的开源项目中应用:

    Dubbo、RocketMQ、Spark、ElasticSearch、Cassandra、Flink、gPRC、xxl-job、lettuce、redission等等都使用了Netty;

    二、 Netty程序开发

    请求 - 响应
    客户端 - 服务端

            <!-- netty-all -->
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.51.Final</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1、Netty编写一个服务端

    Netty 实现一个服务端需要两个核心步骤:
    1、一个服务端 handler:这个组件实现了服务端的业务逻辑,决定了连接创建后和接收到信息后该如何处理;
    2、ServerBootstrap: 这个是配置服务端的启动代码,最少需要设置服务器绑定的端口,用来监听连接请求;

    public class EchoServer {
    
        public static final int PORT = 1234;
    
        public static void main(String[] args) {
            EchoServer echoServer = new EchoServer();
            echoServer.run();
        }
    
        /**
         * web程序下,在webListener、servlet.init 可以调一下该方法,那么netty服务端就启动起来了
         */
        public void run() {
            final EchoServerHandler echoServerHandler = new EchoServerHandler();
            final EchoServerHandler2 echoServerHandler2 = new EchoServerHandler2();
    
            //1、创建一个线程池
            EventLoopGroup boosLoopGroup = new NioEventLoopGroup();
            EventLoopGroup workLoopGroup = new NioEventLoopGroup();
            try {
                //2、启动引导类
                ServerBootstrap bootstrap = new ServerBootstrap();
    
                //3、给启动引导类做一些配置:
                // - NioServerSocketChannel
                // - childHandler
                // - bind端口
                bootstrap.group(boosLoopGroup, workLoopGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .handler(new ChannelInitializer<NioSocketChannel>() {
                            @Override
                            protected void initChannel(NioSocketChannel ch) throws Exception {
                                System.out.println("服务端启动中.....");
                            }
                        })
                        .childOption(ChannelOption.SO_REUSEADDR, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline channelPipeline = socketChannel.pipeline();
                                //channelPipeline.addLast(new LoggingHandler(LogLevel.INFO));
                                //netty的粘包和拆包
                                channelPipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
                                channelPipeline.addLast(new LengthFieldPrepender(4));
                                channelPipeline.addLast(new StringEncoder());
                                channelPipeline.addLast(new StringDecoder());
    
                                channelPipeline.addLast(echoServerHandler);
                                channelPipeline.addLast(echoServerHandler2);
                            }
                        });
    
                //4、绑定一个端口,返回未来的通道
                ChannelFuture channelFuture = bootstrap.bind(PORT).sync();
    
                //5、当channel被关闭的时候会通知此处关闭chanel(closeFuture方法)
                channelFuture.channel().closeFuture().sync();
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                Future future = boosLoopGroup.shutdownGracefully();
                workLoopGroup.shutdownGracefully();
                //阻塞等待
                future.syncUninterruptibly();
                System.out.println("1111111111");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    通过ChannelHandler实现服务端逻辑

    ChannelHandler将在某些事件中触发调用,可以实现ChannelInboundHandler 接口或者继承ChannelInboundHandlerAdapter
    类,用来定义处理入站事件的方法。ChannelInboundHandlerAdapter这个类 提供了默认ChannelInboundHandler 的实现,所以只需要覆盖下面的方法:
    channelRead() - 每个信息入站都会调用;
    channelReadComplete() - 通知处理器最后的 channelread() 是当前批处理中的最后一条消息时调用;
    exceptionCaught()- 读操作时捕获到异常时调用;

    
    
    /**
     * 服务端的handler,用于业务处理
     *
     * 基于事件的异步通信框架
     *
     */
    @ChannelHandler.Sharable
    public class EchoServerHandler2 extends ChannelInboundHandlerAdapter {
    
        public EchoServerHandler2() {
            System.out.println("EchoServerHandler2构造方法执行.......");
        }
    
        /**
         * 通道是活跃的,有数据过来了,触发该方法
         *
         * @param ctx
         * @param msg
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelRead数据读取方法执行2.......");
    
            // msg 这个就是客户端发送过来的数据: ByteBuf包装的
            ByteBuf byteBuf = (ByteBuf) msg;
            String message = byteBuf.toString(CharsetUtil.UTF_8);
    
            System.out.println("接收到的消息2:" + message);
            //也可以根据实际情况然后给客户端一个响应,比如把接收到的数据原封不动地写回去
            ctx.writeAndFlush(msg);
        }
    
        /**
         * 数据读取完的方法
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelReadComplete2方法执行.......");
            //数据读完了,就把通道关闭
            //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    
        /**
         * 发生异常,触发该方法
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("exceptionCaught方法执行2.......");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    通过ServerBootstrap启动服务端

    ServerBootstrap引导服务器启动服务端;
    监听和接收进来的连接请求;
    配置 Channel来通知一个入站消息的 EchoServerHandler 实例;

    2、Netty写一个客户端

    public class EchoClient {
    
        public static final String IP = "127.0.0.1";
    
        public static final int PORT = 1234;
    
        public static void main(String[] args) {
            EchoClient echoClient = new EchoClient();
            echoClient.run();
        }
    
        public void run() {
    
            //1、创建一个线程池
            EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
    
            try {
                //2、启动引导类 (客户端需要new Bootstrap)
                Bootstrap bootstrap = new Bootstrap();
    
                //3、给启动引导类做一些配置:
                // - NioServerSocketChannel
                // - childHandler
                // - bind端口
                bootstrap.group(eventLoopGroup)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .option(ChannelOption.SO_REUSEADDR, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                ChannelPipeline channelPipeline = socketChannel.pipeline();
    
                                //netty的粘包和拆包
                                channelPipeline.addLast(new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
                                channelPipeline.addLast(new LengthFieldPrepender(4));
    
                                channelPipeline.addLast(new StringEncoder());
                                channelPipeline.addLast(new StringDecoder());
    
                                channelPipeline.addLast(new EchoClientHandler());
                            }
                        });
    
                //绑定一个端口,返回未来的通道 .bind() --> udp
                ChannelFuture channelFuture = bootstrap.connect(IP, PORT).sync();
    
                //连接之后再加option就无效了
                //bootstrap.option();
    
                //得到一个通道
                Channel channel = channelFuture.channel();
    
                //从控制台输入数据,往服务端发送
                Scanner scanner = new Scanner(System.in);
                for (;;) {
                    String line = scanner.nextLine();
                    if (line.equals("bye")) {
                        break;
                    }
    
                    //数据需要写到一个ByteBuf, 然后把ByteBuf写到服务端
                    channel.writeAndFlush(Unpooled.copiedBuffer(line, CharsetUtil.UTF_8));
                }
    
                //4、当channel被关闭的时候会通知此处关闭chanel(closeFuture方法)
                channelFuture.channel().closeFuture().sync();
    
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                eventLoopGroup.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75

    通过ChannelHandler 实现客户端逻辑

    跟编写服务端一样,也是通过ChannelInboundHandler 来处理数据,也可以采用 SimpleChannelInboundHandler 或者
    ChannelInboundHandlerAdapter来处理所有的任务,需要覆盖三个方法:
    channelActive() - 服务器的连接被建立后调用
    channelRead0() - 数据后从服务器接收到调用
    exceptionCaught() - 捕获一个异常时调用
    ByteBuf(Netty的字节容器)‘

    /**
     * 客户端的handler,用于业务处理
     *
     * 基于事件的异步通信框架,触发该类中方法
     *
     */
    public class EchoClientHandler extends ChannelInboundHandlerAdapter {
    
        public EchoClientHandler() {
            System.out.println("EchoServerHandler构造方法执行.......");
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelRegistered方法执行.......");
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelUnregistered方法执行.......");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelActive方法执行.......");
    
            StringBuffer stringBuffer = new StringBuffer();
            for (int i=0; i<100; i++) {
                stringBuffer.append("sdkfhsfbsdfsdhbfsdfjksdjkfhsdjkfhhhhhhhhhhhhhhhhhhhhhhhhhhhhh");
            }
    
            //通道被激活了,我们也可以往服务端写数据
            for (int i=0; i<20; i++) {
                ctx.writeAndFlush(stringBuffer.toString());
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelInactive方法执行.......");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelRead方法执行.......");
    
            // msg 这个就是客户端发送过来的数据: ByteBuf包装的
            //ByteBuf byteBuf = (ByteBuf) msg;
            String message = (String) msg;
    
            //String message = byteBuf.toString(CharsetUtil.UTF_8);
            System.out.println("客户端接收到的消息:" + message);
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelReadComplete方法执行.......");
            //数据读完了,就把通道关闭
            //ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("userEventTriggered方法执行.......");
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            System.out.println("channelWritabilityChanged方法执行.......");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("exceptionCaught方法执行.......");
        }
    
        @Override
        protected void ensureNotSharable() {
            System.out.println("ensureNotSharable方法执行.......");
        }
    
        @Override
        public boolean isSharable() {
            System.out.println("isSharable方法执行.......");
            return super.isSharable();
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerAdded方法执行.......");
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            System.out.println("handlerRemoved方法执行.......");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97

    三、Netty的核心组件

    Netty 的核心组件包括:

    • Bootstrap 和 ServerBootstrap
    • Channel
    • ChannelHandler
    • ChannelPipeline
    • EventLoop
    • ChannelFuture
    • ChannelHandlerContext

    Bootstrap 和 ServerBootstrap

    Netty应用程序通过设置 Bootstrap类开始,该类提供了一个用于应用程序网络层配置的容器;
    在这里插入图片描述
    Bootstrap有以下两种类型:

    • 一种是用于客户端的Bootstrap
    • 一种是用于服务端的ServerBootstrap
      不管程序使用哪种协议,创建的是一个客户端还是服务器,“引导”类都是必须要使用到的;

    客户端启动引导类Bootstrap

    抽象通用的引导类AbstractBootstrap,具体的客户端或服务器引导类分别是Bootstrap 和 ServerBootstrap 处理,这两个类可以进行流式编程;
    其方法说明如下:
    1、Bootstrap group(EventLoopGroup) 设置用于处理Channel所有事件的 EventLoopGroup
    2、Bootstrap channel(Class) channel()方法指定了Channel的实现类;
    3、Bootstrap localAddress(SocketAddress) 指定Channel应该绑定到的本地地址,如果没有指定,则由操作系统创建一个随机的地址,也可以通过bind()或者connect()方法指定localAddress;
    4、 Bootstrap option(ChannelOption option,T value)设置ChannelOption,其将被应用到每一个新创建的Channel 的 ChannelConfig,这些选项将会通过bind()或者connect()方法设置到Channel,这个方法在 Channel已经被创建后再调用将不会有任何的效果,支持的
    ChannelOption 取决于使用的Channel类型;

    • 1、ChannelOption.SO_BACKLOG
      ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,
      服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
    • 2、ChannelOption.SO_REUSEADDR
      ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,
      比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设SO_REUSEADDR就无法正常使用该端口;
    • 3、ChannelOption.SO_KEEPALIVE
      Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接,当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文;
    • 4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF
      ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF,这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功;
    • 5、ChannelOption.SO_LINGER
      ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送;
    • 6、ChannelOption.TCP_NODELAY
      ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关;
      Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输;

    5、Bootstrap handler(ChannelHandler) 设置将被添加到ChannelPipeline 以接收事件通知的ChannelHandler;
    6、Bootstrap remoteAddress(SocketAddress) 设置远程地址,也可以通过 connect()方法来指定;
    7、ChannelFuture connect() 连接到远程节点并返回一个ChannelFuture,其将会在连接操作完成后接收到通知;
    8、ChannelFuture bind() 绑定Channel并返回一个ChannelFuture,其将会在绑定操作完成后接收到通知;
    注意:
    在引导的过程中,在调用bind()或者connect()方法之前,必须调用以下方法来设置所需的组件:
    group();
    channel();
    handler();
    否则将会导致IllegalStateException异常;
    服务端启动引导类ServerBootStrap

    服务端启动引导类ServerBootStrap

    1、group()方法设置ServerBootstra要用的EventLoopGroup,这个 EventLoopGroup将用于 ServerChannel 和被接受的子 Channel 的 I/O 处理;
    2、Channel()方法,设置将要被实例化的ServerChannel类;
    3、localAddress()方法,指定ServerChannel应该绑定到的本地地址,如果没有指定,则将由操作系统使用一个随机地址,也可以通过 bind()方法来指定该 localAddress;
    4、option()方法,指定要应用到新创建的ServerChannel的ChannelConfig的ChannelOption,这些选项将会通过bind()方法设置到Channel,在bind()方法被调用之后,设置或者改变ChannelOption都不会有任何的效果,所支持的ChannelOption取决于所使用的Channel类型;
    5、childOption()方法,指定当子Channel被接受时,应用到子Channel的 ChannelConfig的ChannelOption,所支持的ChannelOption取决于所使用的Channel的类型;
    6、handler()方法,设置被添加到ServerChannel的ChannelPipeline中的ChannelHandler,更加常用的方法参见childHandler();
    7、childHandler()方法,设置将被添加到已被接受的子Channel的 ChannelPipeline中的ChannelHandler,handler()方法和 childHandler()方法之间的区别是:前者所添加的 ChannelHandler由接受子Channel 的 ServerChannel 处理,而childHandler()方法所添加的 ChannelHandler 将由已被接受的子 Channel处理,其代表一个绑定到远程节点的套接字;
    8、bind()方法, 绑定ServerChannel并且返回一个ChannelFuture,其将会在绑定操作完成后收到通知(带着成功或者失败的结果);

    Channel

    Channel是底层网络传输操作的接口,如读,写,连接,绑定等等,相当于一个Socket,Channel定义了与Socket 丰富的交互操作集:bind, close, config, connect, isActive, isOpen, isWritable, read, write 等等,常用的 Channel 类型:
    NioSocketChannel,异步的客户端TCP Socket连接;
    NioServerSocketChannel,异步的服务器端TCP Socket连接;
    NioDatagramChannel,异步的UDP连接;
    NioSctpChannel,异步的客户端Sctp连接;
    NioSctpServerChannel,异步的服务器端Sctp连接;

    ChannelHandler

    ChannelHandler由特定事件触发,ChannelHandler可专用于几乎所有的动作,包括将一个对象转为字节(或相反),执行过程中抛出的异常处理;
    常用的一个接口是 ChannelInboundHandler,这个类型接收到入站事件(包括接收到的数据)可以处理应用程序逻辑,当你需要提供响应时,你也可以从 ChannelInboundHandler写出数据,所有业务逻辑经常存活于一个或者多个 ChannelInboundHandler中;
    实现业务处理的handler有两种方式,

    • 一种是继承ChannelInboundHandlerAdapter类,覆盖里面的方法;
    @Sharable
    public class DiscardHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    ReferenceCountUtil.release(msg);
    }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这种实现方式,消息接收后,正确的做法是调用
    ReferenceCountUtil.release(msg);方法释放已接收的消息,避免内存溢出问题;

    • 另一种做法是继承SimpleChannelInboundHandler类,并覆盖里面的方法;
    public class CoderClientHandler extends SimpleChannelInboundHandler
    
    • 1

    这种实现方式,消息接收后,不需要释放已接收的消息,另外这种实现方式,如果有指定的编码器和解码器,可以通过类的泛型直接得到泛型对象,不需要通过ByteBuf读取字节流;

    ChannelPipeline

    ChannelPipeline就是 ChannelHandler链的容器;
    ChannelPipeline提供了一个容器给ChannelHandler链并提供了一个API 用于管理沿着链入站和出站事件的流动,每个Channel都有自己的ChannelPipeline,ChannelHandler 是如何设置到ChannelPipeline?主要是实现了ChannelHandler的抽象 ChannelInitializer,ChannelInitializer子类通过ServerBootstrap 进行注册,当它的方法initChannel()被调用时,这个对象将设置自定义的ChannelHandler集到pipeline,当这个操作完成时,ChannelInitializer子类则从ChannelPipeline 自动删除自身;
    所以ChannelPipeline保存ChannelHandler的List,用于处理或拦截 Channel 的入站事件和出站操作;
    它实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler 如何相互交互;

    EventLoop

    EventLoop用于处理 Channel 的 I/O 操作,维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的run方法,执行 I/O任务和非I/O任务:
    NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个Channel只对应于一个线程;

    ChannelFuture

    Netty 所有的 I/O 操作都是异步的,因为一个操作可能无法立即返回,我们需要有一种方法在以后确定它的结果,出于这个目的,Netty 提供了接口 ChannelFuture,它的addListener方法注册了一个ChannelFutureListener,当操作完成时,可以被通知(不管成功与否);
    可以想象成一个 ChannelFuture 对象作为一个未来执行操作结果的占位符,何时执行取决于一些因素,因此不可能预测与精确,但我们可以肯定的是,它会被执行;

    ChannelHandlerContext

    保存 Channel 相关的所有上下文信息,同时关联一个ChannelHandler 对象;

    • Handler处理的传播
      可以通过ChannelHandlerContext对象的方法将事件传给下一个handler继续处理;
    ctx.fireChannelRegistered();
    ctx.fireChannelUnregistered();
    ctx.fireChannelActive();
    ctx.fireChannelInactive();
    ctx.fireChannelRead(msg);
    ctx.fireChannelReadComplete();
    ctx.fireUserEventTriggered(evt);
    ctx.fireExceptionCaught(cause);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    四、Netty优雅关闭

    调用EventLoopGroup.shutdownGracefully()方法,它将处理任何挂起的事件和任务,并关闭EventLoopGroup,随后释放所有活动的线程;
    这个方法调用将会返回一个Future,该Future将在关闭完成时接收到通知,shutdownGracefully()方法也是一个异步的操作,可以调用阻塞等待方法直到它完成,或者向所返回的Future注册一个监听器以在关闭完成时获得通知;

    Future<?> future = group.shutdownGracefully();
    // block until the group has shutdown
    future.syncUninterruptibly();
    
    • 1
    • 2
    • 3
    另外显式地关闭活动的Channel ,可以调用Channel.close()方法;
    
    • 1

    五、Netty开发总结

    1、浏览器 --> netty服务器 (http协议、webscoket协议),不需要自定义协议(也就是说不需要自定义编码器和解码器,netty已经提供好了)
    2、Netty客户端 --> Netty服务端 (可以用netty内置的编码器和解码器,也可以自己开发编码器和解码器,自己开发编码器和解码器相当于是自定义私有协议,自定义私有协议就是自己定义数据包的格式,那么就需要自己开发编码器和解码器,像Dubbo的rpc调用网络通信是自定义协议)
    3、其他的netty代码开发都是模板式的,可以多看一下那些类里面的方法;

  • 相关阅读:
    09 # 手写 some 方法
    Git常用命令
    Redis持久化方案 RDB,AOF
    JMH-并发包分析
    泛微开发修炼之旅--15后端开发连接外部数据源,实现在ecology系统中查询其他异构系统数据库得示例和源码
    如何利用现代工具来管理多项目
    845. 八数码(Acwing)
    小猫来了~Tomcat以及安装
    学习分享-微服务的相关概念
    SpringCloud 客户端负载均衡:Ribbon
  • 原文地址:https://blog.csdn.net/weixin_43333483/article/details/127465890