• Netty从入门到实战


    前言

    一、什么是Netty

    1、概述

    在Netty的wiki文档上,是这样说的,“Netty致力于提供异步事件驱动的网络应用程序框架和工具,用于快速开发可维护的高性能和高可扩展性协议服务器和客户端”,wiki地址https://netty.io/wiki/user-guide-for-4.x.html,在百度百科上,这样描述,“Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序”。综上,Netty就是一个基于NIO的客户端和服务端的编程框架,极大的简化了客户端和服务端网络应用的开发。

    2、应用场景

    Netty可构建高性能、低延时的各种Java中间件,例如MQ、分布式服务框架、ESB消息总线等,Netty主要作为基础通信框架提供高性能、低延时的通信服务。例如阿里分布式服务框架 Dubbo,默认使用 Netty 作为基础通信组件,还有 RocketMQ 也是使用 Netty 作为通讯的基础。

    3、什么是NIO

    Java有三种IO模型:BIO、NIO和AIO。BIO就是传统的IO模型,同步阻塞IO,客户端有一个连接请求,服务端就要有一个对应的线程,即使这个连接不做任何事情,这样,极大的造成了服务端的资源浪费。NIO是同步非阻塞IO,服务器中一个线程处理多个连接,即客户端发送的连接请求都会注册到多路复用器上(selector),多路复用器轮询到连接有 I/O 请求就进行处理。如下图所示,分别为BIO和NIO的模型图BIO
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/874ad47e759f458da889a6a44964ff87.png
    NIO三大组件:channel(一个类似于流的通道,但是区别于流,可以同时读写数据、可以异步读写数据、可以读写数据到缓冲区的双向通道)、selector(选择器,可以将多个客户端的连接请求注册到选择器上,判断是否有具体的IO事件,选择其中的连接去让一个线程轮流处理)和buffer(缓冲区,一个可以读写数据的内存块,可以理解成是一个容器对象,该对象提供了一些方法,可以更轻松地使用内存块)

    4、核心组件&流程

    在这里插入图片描述
    先来看一张经典的Netty架构图,图中有两组线程池BossGroup和WorkerGroup,前者是专门处理客户端的连接,后者负责网络的读写,这两者的类型都是NioEventLoopGroup,简单的说,前者资本,后者牛马。

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    
    • 1
    • 2

    BossGroup中的NioEventLoop循环事件主要执行三个步骤:轮询Accept连接事件、处理Accept事件,与客户端建立连接,并生成一个NioSocketChannel然后注册到WorkerGroup下的NioEventLoop的selector上、处理任务队列的任务,也就是runAllTasks
    WorkerGroup下的NioEventLoop循环事件主要执行三个步骤:轮询读/写事件、处理读/写事件,在对应的NioSocketChannel处理、处理任务队列的任务,runAllTasks。在处理时,使用Pipeline管道,其中维护了很多handler处理器用来处理channel中的数据。

    • NioEventLoopGroup:管理NioEventLoop,可以看作一个线程池,内部维护了一组线程,每个线程(NioEventLoop)处理多个channel上的数据,而一个channel值对应一个线程
    Bootstrap b = new Bootstrap(); // (1)
                b.group(workerGroup); // (2)
                b.channel(NioSocketChannel.class); // (3)
                b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
                b.handler(new ChannelInitializer() {
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new TimeClientHandler());
                    }
                });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    接下来就是创建Netty服务,可以看到一个Bootstrap将线程组group、信道channel、设置option、拦截器handler串在一起,初始化了通道对象。group()用来设置工作线程组,例如bossGroup或者workerGroup;channel()用来设置通道类型,建立连接后根据这个设置创建对应的channel实例,具体类型有NioSocketChannel——异步非阻塞的客户端TCP Socket连接、NioServerSocketChannel——异步非阻塞的服务端TCP Socket连接、OioSocketChannel——同步阻塞的客户端连接、OioServerSocketChannel——同步阻塞的服务器端TCP Socket连接,常用的就是前面的Nio;option()和childOption()前者配置服务端接收连接,后者设置提供给管道的连接,通过ChannelOption类设置,常用的参数有SO_RCVBUF——TCP数据缓冲区大小、SO_KEEPALIVE——保持连接状态、SO_BACKLOG——线程队列得到连接个数等;ChannelInitializer初始化通道SocketChannel

    • Bootstrap:顾名思义,这是Netty的启动类,主要作用就是串联整个各个组件,配置Netty程序,其中,ServerBootstrap是服务端的启动,Bootstrap是客户端的启动
    //一系列的拦截器,统称为责任链
                                ChannelPipeline pipeline = ch.pipeline();
                                // 请求解码器
                                pipeline.addLast("http-decoder", new HttpRequestDecoder());
                                // 将HTTP消息的多个部分合成一条完整的HTTP消息
                                pipeline.addLast("http-aggregator", new HttpObjectAggregator(65535));
                                // 响应转码器
                                pipeline.addLast("http-encoder", new HttpResponseEncoder());
                                // 解决大码流的问题,ChunkedWriteHandler:向客户端发送HTML5文件
                                pipeline.addLast("http-chunked", new ChunkedWriteHandler());
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • ChannelPipeline:相当于一个容器,初始化channel时,把channelHandler按顺序放在pipeline中,就可以按顺序执行,主要处理channel的出入站操作。
    ChannelFuture future = bootstrap.bind(8888).sync();
                System.out.println("服务端启动");
                future.channel().closeFuture().sync();
    
    • 1
    • 2
    • 3
    • ChannelFuture:Netty的I/O操作是异步的,因此所有的操作实际返回的是ChannelFuture对象,而不是实际结果,ChannelFuture是采取类似观察者模式的形式进行获取结果,具体操作如下:
      在这里插入图片描述
      综合上面的代码及主要组件,一个完整的Netty服务创建流程如下:
      在这里插入图片描述

    二、Netty实现简易聊天室

    1、引入依赖

    
                io.netty
                netty-all
                4.1.25.Final
            
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、核心代码

    • 服务端
    package com.example.dailyrecords.netty.bs;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * @author zy
     * @version 1.0.0
     * @ClassName MyNettyServer.java
     * @Description 处理客户端的请求
     * @createTime 2022/8/4
     */
    public class MyNettyServer {
        public static void main(String[] args) throws InterruptedException {
            /**
             * bossGroup 和 workerGroup 是两个线程池, 它们默认线程数为 CPU 核心数乘以 2,bossGroup 用于接收客户端传过来的请求
             * 接收到请求后将后续操作交由 workerGroup 处理
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();//监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的selector中
            EventLoopGroup workerGroup = new NioEventLoopGroup();//处理每一个连接发生的读写事件
            try {
                /**
                 * 创建服务端的启动对象,设置参数
                 */
                ServerBootstrap bootstrap = new ServerBootstrap();
                /**
                 * 设置两个线程组
                 */
                bootstrap.group(bossGroup, workerGroup)
                        /**
                         * 绑定服务端通道实现类型NioServerSocketChannel
                         */
                        .channel(NioServerSocketChannel.class)
                        /**
                         * 设置线程队列得到连接个数
                         */
                        .option(ChannelOption.SO_BACKLOG,128)
                        /**
                         * 设置保持活动连接状态
                         */
                        .childOption(ChannelOption.SO_KEEPALIVE,true)
                        /**
                         * 使用匿名内部类的形式初始化通道对象
                         * 给读写事件的线程通道绑定handler去真正处理读写,ChannelInitializer初始化通道SocketChannel
                         */
                        .childHandler(new ChannelInitializer() {
    
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast("服务端处理器",new MyServerHandler());
                            }
                        });
                /**
                 * 绑定端口,启动服务端
                 */
                ChannelFuture future = bootstrap.bind(8888).sync();
                System.out.println("服务端启动");
                future.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 服务端拦截器
    package com.example.dailyrecords.netty.bs;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * @author zy
     * @version 1.0.0
     * @ClassName MyServerHandler.java
     * @Description TODO
     * @createTime 2022/8/4
     */
    public class MyServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //获取客户端发送过来的消息
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("收到客户端"+ctx.channel().remoteAddress()+"发送的消息:"+byteBuf.toString(CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //发送消息给客户端
            ctx.writeAndFlush(Unpooled.copiedBuffer("服务端收到消息,并给你返回一个大大的❤",CharsetUtil.UTF_8));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            //发生异常,关闭通道
            ctx.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 客户端
    package com.example.dailyrecords.netty.bs;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    /**
     * @author zy
     * @version 1.0.0
     * @ClassName MyClient.java
     * @Description TODO
     * @createTime 2022/8/4
     */
    public class MyClient {
        public static void main(String[] args) throws Exception{
            NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
            try{
                //创建bootstrap对象,配置参数
                Bootstrap bootstrap = new Bootstrap();
                //设置线程组
                bootstrap.group(eventExecutors)
                        //设置客户端的通道实现类型
                        .channel(NioSocketChannel.class)
                        //使用匿名内部类初始化通道
                        .handler(new ChannelInitializer() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //添加客户端通道的处理器
                                ch.pipeline().addLast(new MyClientHandler());
                            }
                        });
                System.out.println("客户端准备就绪,随时可以起飞~");
                //连接服务端
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8888).sync();
                //对通道关闭进行监听
                channelFuture.channel().closeFuture().sync();
            }finally {
                //关闭线程组
                eventExecutors.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 客户端拦截器
    package com.example.dailyrecords.netty.bs;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * @author zy
     * @version 1.0.0
     * @ClassName MyClientHandler.java
     * @Description TODO
     * @createTime 2022/8/4
     */
    public class MyClientHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //发送消息到服务端
            ctx.writeAndFlush(Unpooled.copiedBuffer("你好,我是客户端", CharsetUtil.UTF_8));
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //接收服务端发送过来的消息
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3、运行示例

    先启动服务端在这里插入图片描述
    然后启动客户端
    在这里插入图片描述
    服务端收到消息
    在这里插入图片描述
    客户端收到服务端返回的消息
    在这里插入图片描述

    4、简单总结

    上面建议的在控制台实现了一个聊天室的微小demo,在实际应用中,我们可以对其进行扩展,增加一些必要的页面的参数,实现一对一的聊天或者多对多的群聊,通过这个demo,我们应该比较熟悉Netty的开发过程,下面,展示一个项目中的实际应用。

    三、Netty实现请求转发网关

    1、项目场景

    有一系列的核心数据处理服务1,2,3…,提供了一个sdk给接入端使用(接入端数量很多,同步数据量大,时间不固定),如果直接让各个核心服务直接接收数据,不仅要写很多重复的数据校验,而且核心服务压力很大,还有如果新增服务,会修改很多代码,工作量很大。如下图所示
    在这里插入图片描述
    因此,在核心服务前置一个网关服务(简单的RPC),负责对sdk传入数据进行校验、前置处理、请求转发。网关从缓存读取相关的配置,一个后台管理系统进行服务的相关配置供网关使用,如下图所示:
    在这里插入图片描述

    2、核心代码

    • Netty服务器
    package com.example.dailyrecords.netty.http;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    import io.netty.handler.stream.ChunkedWriteHandler;
    
    /**
     * @author zy
     * @version 1.0.0
     * @ClassName NettyServer.java
     * @Description netty服务端,处理http请求
     * @createTime 2022/7/26
     */
    public class NettyServer {
        public static void main(String[] args) throws InterruptedException {
            /**
             * bossGroup 和 workerGroup 是两个线程池, 它们默认线程数为 CPU 核心数乘以 2,bossGroup 用于接收客户端传过来的请求
             * 接收到请求后将后续操作交由 workerGroup 处理
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                /**
                 * 创建服务端的启动对象,设置参数
                 */
                ServerBootstrap bootstrap = new ServerBootstrap();
                /**
                 * 设置两个线程组
                 */
                bootstrap.group(bossGroup, workerGroup)
                        /**
                         * 绑定服务端通道实现类型NioServerSocketChannel
                         */
                        .channel(NioServerSocketChannel.class)
                        /**
                         * 设置线程队列得到连接个数
                         */
                        .option(ChannelOption.SO_BACKLOG,128)
                        /**
                         * 设置保持活动连接状态
                         */
                        .childOption(ChannelOption.SO_KEEPALIVE,true)
                        /**
                         * 使用匿名内部类的形式初始化通道对象
                         * 给读写事件的线程通道绑定handler去真正处理读写,ChannelInitializer初始化通道SocketChannel
                         */
                        .childHandler(new ChannelInitializer() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //一系列的拦截器,统称为责任链
                                ChannelPipeline pipeline = ch.pipeline();
                                // 请求解码器
                                pipeline.addLast("http-decoder", new HttpRequestDecoder());
                                // 将HTTP消息的多个部分合成一条完整的HTTP消息
                                pipeline.addLast("http-aggregator", new HttpObjectAggregator(65535));
                                // 响应转码器
                                pipeline.addLast("http-encoder", new HttpResponseEncoder());
                                // 解决大码流的问题,ChunkedWriteHandler:向客户端发送HTML5文件
                                pipeline.addLast("http-chunked", new ChunkedWriteHandler());
                                //自定义拦截器,进行业务数据处理,请求转发等
                                pipeline.addLast("自定义拦截器",new NettyParamHandler());
                                pipeline.addLast("自定义拦截器2",new SecondNettyHandler());
                            }
                        });
                /**
                 * 绑定端口,启动服务端
                 */
                ChannelFuture future = bootstrap.bind(8888).sync();
                System.out.println("服务端启动");
                future.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 自定义拦截器,进行一系列的数据处理,例如接入校验、数据校验、数据加解密、请求转发等
    package com.example.dailyrecords.netty.http;
    
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.codec.http.FullHttpRequest;
    import io.netty.handler.codec.http.HttpMethod;
    
    import java.util.Map;
    
    /**
     * @author zy
     * @version 1.0.0
     * @ClassName NettyHandler.java
     * @Description TODO
     * @createTime 2022/7/26
     */
    public class NettyParamHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
            //获取httpRequest
            Map paramMap = null;
            if (fullHttpRequest.method() == HttpMethod.GET) {
                paramMap = ParamUtil.getGetParamsFromChannel(fullHttpRequest);
            } else if (fullHttpRequest.method() == HttpMethod.POST) {
                paramMap = ParamUtil.getPostParamsFromChannel(fullHttpRequest);
            } else {
                ctx.writeAndFlush("出错了").channel().close();
                return;
            }
            if(!paramMap.get("name").equals("zhangyang")){
                System.out.println("*****************");
            }else{
                System.out.println("=================");
            }
    
            //传递给下一个拦截器处理
            ctx.fireChannelRead(paramMap);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("参数处理器发生异常:{}" + cause);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    package com.example.dailyrecords.netty.http;
    
    
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpResponseStatus;
    
    import java.util.Map;
    
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    
    /**
     * @author zy
     * @version 1.0.0
     * @ClassName SecondNettyHandler.java
     * @Description TODO
     * @createTime 2022/7/27
     */
    public class SecondNettyHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Map paramMap = (Map) msg;
            paramMap.put("name",paramMap.get("name").equals("zhangyang")?"yes":"no");
            System.out.println(paramMap.get("name"));
            /**
             * 组装返回信息,调用结束,关闭连接
             */
            FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
            ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 作为SpringBoot项目启动,可以给Netty服务端添加一个初始化器,去掉上面的main方法,然后SpringBoot的启动器初始化,或者直接使用SpringBoot的启动器初始化服务端,如下
    package com.example.dailyrecords.netty.http;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * @className: ServerInit
     * @description: 服务端初始化
     * @author: zhangyang
     **/
    @Slf4j
    public class ServerInit {
        public static void init() throws Exception {
            String portStr = 从配置文件获取;
            int port = Integer.parseInt(portStr);
            NettyServerserver = new NettyServer();
            server.init(port);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    package com.example.dailyrecords.netty.http;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class StartServer {
        public static void main(String[] args) {
            try {
                ServerInit.init();
                或者直接NettyServer初始化
            } catch (Exception e) {
                log.info("网关启动时发生异常:",e);
            }
           log.info("server has closed..........................");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    3、简单总结

    上面是一个实际项目中的Netty服务端作为请求转发网关的实例,代码已经做过了处理,只是一个demo级别的展示,实际应用中上面的网关服务代码也是可以直接使用,只需要在对应的业务上进行自定义拦截器的定制开发。

    4、FAQ

    * netty是一个基于NIO、开源、非阻塞、事件驱动的单线程模型Java网络编程框架,支持多种协议,包括但不限于HTTP、WebSocket、TCP、UDP等,具有高性能、可扩展和易于使用的特点
     *
     * netty工作流程:
     * 1、创建事件循环组和相关对象,用于监听和处理网络事件
     * 2、配置netty服务端或客户端的启动参数,包括线程组、通道类型、TCP参数等
     * 3、给服务器或客户端的ChannelPipeline添加各种ChannelHandler,用于处理不同网络事件
     * 4、绑定端口启动服务器或连接服务器
     * 5、等待服务器或客户端连接关闭,释放相关资源
     *
     * 或者
     *
     * 1、创建EventLoopGroup对象,用于管理和调度EventLoop事件
     * 2、创建ServerBootStrap或BootStrap对象,分别是服务端和客户端的启动器
     * 3、配置启动器,绑定事件组,配置channel参数,例如传输协议、端口等
     * 4、绑定ChannelHandler,用于处理请求的一系列处理器
     * 5、启动服务端或客户端,进行初始化
     *
     * netty相关组件:
     * 1、ByteBuf:netty的字节容器,类似于Java的ByteBuffer,更加强大简洁安全,用于在网络传输二进制数据
     * 2、EventLoopGroup:netty的事件循环组,用于管理和调度事件循环EventLoop
     * 3、ServerBootStrap:netty的服务器启动类,用于启动配置TCP/IP服务器
     * 4、BootStrap:netty的客户端启动类,用于启动配置TCP/IP客户端
     * 5、Channel:netty的核心概念,用于表示一个通信通道,读写数据
     * 6、ChannelPipeline:netty的Channel处理器,用于在传入数据上执行一组ChannelHandler
     * 7、ChannelHandler:netty的核心组件,用于处理各种通信事件,例如读写数据,建立连接、编码解码
     * 8、EventLoop:事件循环器(一个不断循环的I/O线程,负责一个或多个Channel的I/O事件),用于处理所有I/O事件和请求
     * netty的I/O操作都是异步非阻塞的,由EventLoop处理并以事件的方式触发回调函数
     *
     * netty的线程模型?如何优化?
     * netty的线程模型是基于事件驱动的Reactor模型,它使用少量的线程来处理大量的连接和数据传输,以提高性能和吞吐量。在netty中,每个连接
     * 分配一个单独的EventLoop线程,处理连接的相关事件,多个连接可以共享一个EventLoop线程
     * netty提供了一些线程模型和线程池配置选项,以适应不同的应用场景和性能要求。例如,可以使用不同的EventLoopGroup实现不同的线程模型,如
     * 单线程模型、多线程模型和主从线程模型等。同时,还可以设置不同的线程池参数,如线程数、任务队列大小、线程优先级等,以调整线程池的工作负载和性能表现
     * 还可以通过优化网络协议、数据结构、业务逻辑等方面来提高Netty的性能
     *
     * netty长连接 & 心跳机制
     * 长连接:客户端与服务端建立的连接可以保持一段时间,避免了频繁建立和关闭连接的开销,提高数据传输效率
     * netty长连接:通过Channel的keepalive选项保持长连接
     * 心跳机制:通过定期向对方发送心跳消息,来检测连接释放正常,如果一段时间没收到心跳消息,认为连接断开,进行重新连接
     * netty心跳机制:提供了一个IdleStateHandler类,可以设置多个超时时间,当连接空闲时间超过设定时间,触发一个事件,在事件处理方法中进行相应处理,例如发送心跳消息
     * 定义心跳消息类型————>在客户端和服务端的ChannelPipeline中添加IdleStateHandler————>重写useEventTriggered方法————>重写channelRead方法
     *
     * eventLoop和chanel的关系
     * channel代表一个开放的网络连接,用来读取和写入数据
     * eventLoop代表一个执行任务的线程,负责处理channel上的事件和操作
     * 每个channel都与一个eventLoop关联,一个eventLoop可以关联多个channel
     *
     * channelPipeline:
     * 用于处理channel上的读写请求,是基于事件驱动的处理机制,由一系列处理器(handler)组成,每个处理器负责处理一个或多个事件,工作流程
     * 入站:由channel接收到的事件,入站事件将从channelPipeline的第一个InboundHandler开始流动,直到最后一个InboundHandler
     * 出站:由channel发送出的事件,出站事件将从channelPipeline的最后一个OutboundHandler开始流动,直到第一个OutboundHandler
     * ChannelHandlerContext:表示处理器和ChannelPipeline之间的关联关系,每个ChannelHandler都有一个ChannelHandlerContext,在ChannelPipeline中的事件流中向前或向后传递事件
     *
     * ChannelFuture:
     * ChannelFuture表示异步的I/O操作的结果,当执行一个异步操作时,ChannelFuture立即返回,并在将来某个时刻通知操作结果,而不是等待操作完成
     * ChannelFuture在异步操作结束后,将ChannelFuture对象返回调用方,调用方可以添加一个ChannelFutureListener来处理结果
     * ChannelFuture还提供了检查操作是否成功、等待操作完成、添加监听器等
     * ChannelFuture是Netty中异步I/O操作的基础
     *
     * Netty的TCP粘包/拆包
     * 粘包/拆包:在TCP传输过程中,由于TCP不了解上层应用协议的消息边界,会将多个小消息组合成一个大消息,或将一个大消息拆分成多个小消息
     * 解决:消息定长、消息分隔符、消息头部加长度字段
     * 消息定长:将消息固定长度发送,接收端根据固定长度拆分消息
     * 消息分隔符:将消息以特定的分隔符分开,接收端,根据分割符对消息进行拆分
     * 消息头部加长度字段:在消息头部加上消息长度字段,发送端先发送消息长度,再发送消息;接收端先读取消息长度,再根据长度读取消息
     *
     * Netty高性能如何体现:
     * 异步非阻塞I/O:基于NIO的异步非阻塞I/O模型,减少线程阻塞等待事件,提高响应速度和吞吐量
     * 零拷贝技术:避免数据在内核和用户空间之间的多次复制,提高数据传输效率
     * 线程模型优化:根据不同业务场景选取不同线程模型,对于低延迟高吞吐量选择Reactor模型,对于简单场景选择单线程模型
     * 内存池技术:基于内存池的ByteBuf缓冲区,可以重用内存空间,减少内存分配和回收次数
     * 处理器链式调用:按照处理器链的顺序依次调用处理器,从而实现对事件的处理,减少了线程上下文切换和锁竞争等问题
     *
     * Netty的线程模型:
     * 单线程模型:所有I/O操作都由一个线程执行
     * 多线程模型:所有I/O操作由一组线程执行,其中一个线程负责监听客户端的连接请求,其他线程负责处理I/O操作,默认线程数为核心线程数*2,例如客户端的一个EventLoopGroup
     * 主从多线程模型:所有的I/O操作都由一组NIO线程来执行,其中一个主线程负责监听客户端的连接请求,其他从线程负责处理I/O操作,例如服务端的BossGroup负责监听,WorkGroup负责处理事件
     *
     * Netty如何保持长连接:
     * 心跳机制:IdleStateHandler,心跳机制可以定期向服务器发送一个简短的数据包,以保持连接处于活动状态。如果在一段时间内没有收到心跳包,就可以认为连接已经断开,从而及时重新建立连接
     * 断线重连机制:定期检查连接状态,并在连接断开时尝试重新连接,通过ChannelFutureListener和ChannelFuture实现
     * 基于HTTP/1.1协议的长连接:HTTP/1.1协议支持长连接,可以在一个TCP连接上多次发送请求和响应,HttpObjectAggregator
     * WebSocket协议:WebSocket协议也支持长连接,可以在一个TCP连接上双向通信,实现实时数据交换
     *
     * Netty发送消息的方式:
     * Channel.write:通过Channel写入消息,消息会被缓存到Channel的发送缓冲区中,等待下一次调用flush()将消息发送出去
     * ChannelHandlerContext.write:将消息写入ChannelHandlerContext的缓冲区,等待下次调用flush()将消息发出
     * channelHandlerContext.writeAndFlush:相当于合并了写入缓冲区和发送
     *
     * Netty的内存管理机制:
     * 主要通过ByteBuf类实现,Netty的ByteBuf内存管理分为两种方式:
     * 堆内存:ByteBuf以普通字节数据为基础,在JVM堆上分配内存,适用于小型数据传输
     * 直接内存:ByteBuf使用操作系统的对外内存,由操作系统分配和回收内存,适用于大型数据传输
     *
     * Netty和Tomcat都是应用服务器,有什么区别:
     * 底层网络通信模型:netty基于NIO非阻塞模型;tomcat基于BIO阻塞模型
     * 线程模型:netty使用EventLoop线程模型,每个EventLoop负责处理多个连接,通过线程池EventLoopGroup管理;tomcat使用传统多线程,每个请求一个线程
     * 支持协议:netty支持HTTP、HTTPS、TCP、UDP、WebSocket等;tomcat支持HTTP、HTTPS
     * 应用场景:netty适合高性能、低延迟的网络应用,如游戏服务器、即时通讯服务器;tomcat适合传统web程序
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
  • 相关阅读:
    基于SqlSugar的数据库访问处理的封装,在.net6框架的Web API上开发应用
    JSON parse error: Cannot deserialize instance of `xxx` out of START_ARRAY token
    软件建模知识点
    Java AtomicInteger 学习以及理解
    在20.04.4 LTS (Focal Fossa)学习CommonAPI-C---D-Bus
    html网页设计大学生作业成品——公益校园网站设计与实现(HTML+CSS+JavaScript)
    Eclipse ABAP ADT 集成详细安装教程
    求职刷题力扣DAY14 ---二叉树的基础遍历,迭代、递归
    智慧公厕助力数字强市建设,打造善感知新型信息化公共厕所
    LeetCode 74. 搜索二维矩阵
  • 原文地址:https://blog.csdn.net/qq_36933421/article/details/127714734