• 聊聊分布式架构06——[NIO入门]简单的Netty NIO示例


    目录

    Java NIO和Netty NIO比较

    Java NIO:

    Netty:

    Netty NIO中的主要模块

    Transport(传输层)

    Buffer(缓冲区)

    Codec(编解码器)

    Handler(处理器)

    EventLoop(事件循环)

    Bootstrap和Channel(引导和通道)

    Future和Promise(异步编程)

    Netty示例

    服务端时序图

    服务端代码

    客户端时序图

    客户端代码

    总结


    Java NIO和Netty NIO比较

    Java NIO:
    1. 原生Java库: Java NIO是Java标准库的一部分,提供了非阻塞I/O的核心功能。它是Java平台上的底层API,允许开发者直接操作通道、缓冲区和选择器等组件。

    2. 较低级别: Java NIO是相对较低级别的API,需要开发者编写更多的底层代码来处理网络通信和协议。这可以提供更多的控制权,但也增加了开发复杂性。

    3. 多路复用: Java NIO通过选择器(Selector)实现多路复用,允许一个线程管理多个通道。这对于处理大量并发连接非常有用。

    4. 手动管理缓冲区: Java NIO需要开发者手动管理缓冲区,包括分配、读取、写入和释放缓冲区。这可能导致更复杂的代码结构。

    5. 适用场景: Java NIO适用于需要高度自定义的网络协议网络服务器。它在性能和灵活性方面提供了更多的控制权。

    Netty:
    1. 高级别框架: Netty是一个基于Java NIO的高级别框架,它封装了底层的NIO细节,提供了更简单和强大的API来处理网络通信。

    2. 事件驱动: Netty是事件驱动的框架,使用事件处理器(Event Handlers)来处理入站和出站事件。这使得编写网络应用程序更加模块化和可维护。

    3. 自动管理缓冲区: Netty自动管理缓冲区,无需开发者手动分配和释放缓冲区。这减轻了内存管理的负担。

    4. 丰富的功能: Netty提供了丰富的功能,包括HTTP、WebSocket、TLS/SSL支持、UDP通信、拆包和粘包处理等。它还支持异步和同步I/O操作。

    5. 社区和生态系统: Netty拥有强大的社区支持和丰富的生态系统,有大量的扩展和插件可用。这使得开发人员可以更快地构建复杂的网络应用。

    6. 适用场景: Netty适用于构建高性能、可扩展、可维护的网络应用程序,特别是在处理协议复杂、并发连接众多的情况下。

    为什么选择Netty?

    Java NIONetty NIO
    API和类库繁杂麻烦,需掌握Selector、Channel、Buffer等封装简单,门槛低
    扩展实现需要熟悉多线程和网络编程保证代码质量通过ChannelHandler对框架灵活扩展
    可靠性可靠性能力需手动补齐,工作量和难度大预编码、多协议,功能强,性能高
    Bug Fixed臭名昭著的epoll bug(selector空轮询,CPU到100%)1.6版本说修复,1.7版本还在,只是调低了触发率稳定,成熟,修复了所有已发现的Java NIO Bug
    社区生态/社区活跃,迭代周期短
    生存迭代/经历大规模商业应用考验,质量得到验证

    基础篇我们使用Java NIO举例演示了简单的RPC通信。代码的行数和步骤确实挺繁琐,不如来看看Netty 的操作,是骡子是马牵出来溜溜先,就当诸君饭后消个食。

    Netty NIO中的主要模块

    Netty是一个强大的网络编程框架,它由多个主要模块组成,每个模块负责不同的功能。以下是Netty中的一些主要模块:

    1. Transport(传输层
      • NIO: 这个模块实现了基于Java NIO的传输层,提供了非阻塞的网络通信功能。它包括NioEventLoopGroupNioServerSocketChannelNioSocketChannel等类,用于创建和管理NIO通道。

    2. Buffer(缓冲区)
      • Java NIO中的ByteBuffer局限性:

        • ByteBuffer长度固定,不能动态伸缩和扩展,编码对象时容易引起索引越界异常

        • ByteBuffer只有一个标识位置的指针position,需要手工调用flip()和rewind(),不方便

        • ByteBuffer的API功能有限,需要使用者自己编程实现一些高级和实用的特性

      • 为了弥补这些不足,Netty NIO提供了自己的实现——ByteBuf:

        Netty提供了高性能的缓冲区实现ByteBuf,用于处理数据的读取和写入。ByteBuf提供了直接和间接缓冲区,并支持池化,以减少内存分配和回收的开销。

    3. Codec(编解码器)
      • 编码器和解码器: 这个模块包括一系列编解码器,用于将数据序列化和反序列化为字节,以便在网络中传输。Netty提供了JSON、Protobuf、HTTP、WebSocket等多种编解码器。

    4. Handler(处理器)
      • ChannelHandler: ChannelHandler是Netty中的核心概念,用于处理事件和数据。它可以自定义,用于构建处理链。Netty提供了各种内置的ChannelHandler,如SimpleChannelInboundHandlerChannelDuplexHandler等。

    5. EventLoop(事件循环)
      • EventLoopGroup: 这个模块包括了EventLoopGroupEventLoop,用于实现事件循环机制。EventLoopGroup管理一组EventLoop,每个EventLoop负责处理一组通道上的事件。事件循环是Netty实现异步和事件驱动的关键。

    6. Bootstrap和Channel(引导和通道)
      • ServerBootstrap和Bootstrap: 这两个类用于引导Netty应用程序的启动。ServerBootstrap用于启动服务器端,而Bootstrap用于启动客户端。

      • Channel和ChannelPipeline: Channel表示通道,它代表了一个网络连接。在Channel接口层,采用Fade模式进行统一封装。ChannelPipeline是处理链,包含一系列ChannelHandler,用于处理事件和数据。

    7. Future和Promise(异步编程)
      • Future: Netty使用Future来表示异步操作的结果,允许开发者异步地等待操作完成。

      • Promise: PromiseFuture的扩展,允许开发者设置操作的结果,使得异步编程更加方便。

    Netty示例

    服务端时序图

    服务端代码
    /**
     * Netty Server
     * 开始:需要绑定端口用于启动
     * 1.创建线程组bossGroup用于处理客户端连接
     * 2.创建线程组workGroup用于socket网络读写
     * 3.创建Bootstrap服务启动辅助类,类似serverSocketChannel
     * 4.链式编程,构造线程组、serverChannel、options、channelHandle
     * 5.ChannelHandle继承自ChannelInitializer进行功能扩展
     * 结束:优雅退出关闭资源
     */
    public class Server {
        public static void main(String[] args) {
            new Server().bind(8088);
        }
    ​
        /**
         * 绑定端口用于启动
         * @param port 服务端口
         */
        public void bind(int port) {
            // 1.创建线程组bossGroup处理客户端连接
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            // 2.创建线程组workGroup用于socket网络读写
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                // 3.创建Bootstrap服务启动辅助类,类似serverSocketChannel
                ServerBootstrap server = new ServerBootstrap();
                // 4.链式编程,构造线程组、serverChannel、options、channelHandle
                server.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChildChannelHandler());
                System.out.println("server start on port:" + port);
    ​
                // 绑定端口异步调用连接客户端,同步阻塞等待(连接结果)连接成功
                ChannelFuture channelFuture = server.bind(port).sync();
    ​
                // 异步调用close关闭链路,同步阻塞等待(关闭结果)关闭成功后退出main函数
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 优雅退出关闭资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    ​
        /**
         * 5.ChannelHandle继承自ChannelInitializer进行功能扩展
         */
        private class ChildChannelHandler extends ChannelInitializer {
    ​
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new ServerHandler());
            }
        }
    ​
        private class ServerHandler extends ChannelHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                byte[] req = new byte[buf.readableBytes()]; // 读取缓冲区可读取长度
                buf.readBytes(req);
                String body = new String(req, "UTF-8");
                System.out.println("Server receive msg:" + body);
                String currTime = "query time".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "error code";
                ByteBuf resp = Unpooled.copiedBuffer(currTime.getBytes());
                ctx.write(resp);
            }
    ​
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.close();
            }
    ​
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.flush(); // 将发送缓冲数组中的消息通过flush()写入socketChannel发送,避免了频繁调用selector进行发送
            }
        }
    }
    客户端时序图

    客户端代码
    /**
     * 开始:连接服务端ip:port
     * 1.创建EventLoopGroup管理socket读取
     * 2.创建客户端辅助类Bootstrap,类似于之前的SocketChannel
     * 3.链式编程,构造客户端流程client、option、handler
     * 4.handler使用的是ChannelInitializer
     * 结束:优雅的关闭资源
     */
    public class Client {
        public static void main(String[] args) {
            new Client().connect("localhost", 8088);
        }
    ​
        /**
         * 连接服务端ip:port
         * @param ip 服务端地址
         * @param port 服务端端口
         */
        public void connect(String ip, int port) {
            // 1.创建EventLoopGroup管理socket读取
            EventLoopGroup group =new NioEventLoopGroup();
            try {
                // 2.创建客户端辅助类Bootstrap,类似于之前的SocketChannel
                Bootstrap client = new Bootstrap();
                // 3.链式编程,构造客户端流程client、option、handler
                client.group(group).channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new ClientHandler());
                            }
                        });
    ​
                // 发起异步连接操作
                ChannelFuture channelFuture = client.connect(ip, port).sync();
    ​
                // 等待服务端链路关闭
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 优雅退出,释放资源
                group.shutdownGracefully();
            }
        }
    ​
        /**
         * 4.handler使用的是ChannelInitializer
         */
        public class ClientHandler extends ChannelHandlerAdapter {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                String body = new String(req, "UTF-8");
                System.out.println("client receive server send:" + body);
                channelActive(ctx);
            }
    ​
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                Scanner scanner = new Scanner(System.in);
                System.out.println("请输入");
                String clientIn = scanner.nextLine();
                ByteBuf firstMessage = Unpooled.buffer(clientIn.getBytes().length);
                firstMessage.writeBytes(clientIn.getBytes());
                System.out.println("client send:" + clientIn);
                ctx.writeAndFlush(firstMessage);
            }
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.close();
            }
        }
    }

    运行结果:

    总结

    不难看出,不管是流程步骤还是实现功能的代码行数,Netty NIO都是优于Java原生NIO的。到此,一个简单的NIO入门示例完成。

    资料参考:《Netty权威指南》

  • 相关阅读:
    通过nginx访问另一台服务器上的图片文件
    SpringBoot整合七牛云实现图片的上传管理
    VSCode:使用CMakeLists.txt构建C++项目
    精通Spring Boot单元测试:构建健壮的Java应用
    [React] React-Redux 快速入门
    OpenLayers实战,OpenLayers调用手机陀螺仪方向实现指南针效果
    SLAM从入门到精通(a*搜路算法)
    git 本地分支基础操作
    第28篇 Spring Boot简介
    探索 Symfony 框架:工作原理、特点及技术选型
  • 原文地址:https://blog.csdn.net/Elaine2391/article/details/133709269