• 【Netty源码系列(一)】SpringBoot整合Netty实现多端口绑定


    1 新建springboot项目引入jar包

    本次采用jdk1.8、springboot2.7.5、Netty4.1.84.Final版本

    	<dependency>
    	    <groupId>org.springframework.bootgroupId>
    	    <artifactId>spring-boot-starter-webartifactId>
    	dependency>
    	
    	<dependency>
    	    <groupId>io.nettygroupId>
    	    <artifactId>netty-allartifactId>
    	dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2 创建ServerBootstrap Netty服务端启动类

    主要包括以下几步:

    1. 创建NioEventLoopGroup线程组
    2. 创建ServerBootstrap辅助启动类
    3. 设置group线程组
    4. 设置channel通道的类型
    5. 初始化通道,这一步可以设置连接类型,编解码信息及自定义请求处理器
    6. 绑定端口并启动
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    /**
     * @author lics
     * @version 1.0.0
     * @date 2022/10/31 15:14
     */
    public class MyNettyServer {
    
        public void startNettyServer(int... ports) {
    
            /*
             * 配置服务端的NIO线程组
             * NioEventLoopGroup 是用来处理I/O操作的Reactor线程组
             * bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写,
             * bossGroup接收到连接后就会把连接信息注册到workerGroup
             * workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍
             */
            EventLoopGroup bossGroup = new NioEventLoopGroup(2);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                // ServerBootstrap 是一个启动NIO服务的辅助启动类
                ServerBootstrap serverBootstrap = new ServerBootstrap();
    
                // 设置group,将bossGroup, workerGroup线程组传递到ServerBootstrap
                serverBootstrap = serverBootstrap.group(bossGroup, workerGroup);
    
                // ServerSocketChannel是以NIO的selector为基础进行实现的,用来接收新的连接,这里告诉Channel通过NioServerSocketChannel获取新的连接
                serverBootstrap = serverBootstrap.channel(NioServerSocketChannel.class);
                
               // 初始化通道,主要用于网络I/O事件,记录日志,编码、解码消息
                serverBootstrap = serverBootstrap.childHandler(new MyNettyChannelInitializer());
    
                // 绑定端口,同步等待成功
                ChannelFuture[] channelFutures;
                ChannelFuture futureTcp;
                if (ports.length > 0) {
                    channelFutures = new ChannelFuture[ports.length];
                    int i = 0;
                    for (Integer port : ports) {
                        // 绑定端口,同步等待成功 绑定的服务器
                        futureTcp = serverBootstrap.bind(port).sync();
                        channelFutures[i++] = futureTcp;
                        futureTcp.addListener(future -> {
                            if (future.isSuccess()) {
                                System.out.println("netty server 启动成功!" + port);
                            } else {
                                System.out.println("netty server 启动失败!" + port);
                            }
                        });
                    }
                    for (ChannelFuture future : channelFutures) {
                        // 等待服务器监听端口关闭
                        future.channel().closeFuture().sync();
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // 退出,释放线程池资源
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    3 自定义channel初始化设置

    继承ChannelInitializer类,重写initChannel()方法

    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    
    /**
     * 通道初始化
     *
     * @author lics
     * @version 1.0.0
     * @date 2022/10/31 15:16
     */
    public class MyNettyChannelInitializer extends ChannelInitializer<Channel> {
    
        @Override
        protected void initChannel(Channel channel) {
            /**
             * 处理TCP请求
             */
            // ChannelOutboundHandler,依照逆序执行
            channel.pipeline().addLast("encoder", new StringEncoder());
            // 属于ChannelInboundHandler,依照顺序执行
            channel.pipeline().addLast("decoder", new StringDecoder());
            // 自定义TCP请求的处理器
            channel.pipeline().addLast(new TcpRequestHandler());
            
            /**
             * 下面代码是处理HTTP请求,测试时请把上面的TCP请求设置注释掉
             */
            // HTTP协议解析,用于握手阶段;此处放开则会处理HTTP请求,TCP请求不会处理
            channel.pipeline().addLast(new HttpServerCodec());
            channel.pipeline().addLast(new HttpObjectAggregator(1024 * 1024 * 100));
            // 自定义解析HTTP请求处理器
            channel.pipeline().addLast(new HttpRequestHandler());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    4 自定义I/O数据读写处理类

    TCP请求,创建TcpRequestHandler类,继承ChannelInboundHandlerAdapter,重写里面的方法,channelRead()方法读取客户端传入的数据

    import java.net.InetSocketAddress;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * I/O数据读写处理类 TCP连接
     *
     * @author lics
     * @version 1.0.0
     * @date 2022/10/31 15:17
     */
    @Slf4j
    public class TcpRequestHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * 从客户端收到新的数据时,这个方法会在收到消息时被调用
         *
         * @param ctx
         * @param msg
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("channelRead: " + msg.toString());
            //回应客户端
            ctx.write("I got it");
        }
    
        /**
         * 从客户端收到新的数据、读取完成时调用
         *
         * @param ctx
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            System.out.println("channel Read Complete");
            ctx.flush();
        }
    
        /**
         * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
         *
         * @param ctx
         * @param cause
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            System.out.println("exceptionCaught");
            cause.printStackTrace();
            ctx.close();//抛出异常,断开与客户端的连接
        }
    
        /**
         * 客户端与服务端第一次建立连接时 执行
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
            ctx.channel().read();
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            //此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
            System.out.println("channelActive: " + clientIp + ctx.name());
        }
    
        /**
         * 客户端与服务端 断连时 执行
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            ctx.close(); //断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
            System.out.println("channelInactive: " + clientIp);
        }
    
        /**
         * 服务端当read超时, 会调用这个方法
         *
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            super.userEventTriggered(ctx, evt);
            InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = insocket.getAddress().getHostAddress();
            ctx.close();//超时时断开连接
            System.out.println("userEventTriggered: " + clientIp);
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
            System.out.println("channelRegistered");
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) {
            System.out.println("channelUnregistered");
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) {
            System.out.println("channelWritabilityChanged");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115

    HTTP请求,创建HttpRequestHandler类,继承SimpleChannelInboundHandler,重写channelRead0()方法

    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.multipart.Attribute;
    import io.netty.handler.codec.http.multipart.HttpPostRequestDecoder;
    import io.netty.handler.codec.http.multipart.InterfaceHttpData;
    
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    import java.util.Map;
    
    /**
     * HTTP连接请求处理
     *
     * @author lics
     * @version 1.0.0
     * @date 2022/11/2 11:03
     */
    public class HttpRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    
            String uri = fullHttpRequest.uri();
            System.out.println(uri);
            switch (fullHttpRequest.method().name()) {
                case "GET":
                    processGetRequest(fullHttpRequest);
                    break;
                case "POST":
                    if (fullHttpRequest.headers().get("Content-Type").contains("x-www-form-urlencoded")) {
                        processPostFormRequest(fullHttpRequest);
                    } else if (fullHttpRequest.headers().get("Content-Type").contains("application/json")) {
                        processPostJsonRequest(fullHttpRequest);
                    }
                    break;
                default:
            }
            ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer();
            buf.writeCharSequence("sucess", StandardCharsets.UTF_8);
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buf);
            response.headers().set("Content-Type","application/json;charset=UTF-8");
            response.headers().set("Content-Length",response.content().readableBytes());
    
            channelHandlerContext.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
        }
    
        /**
         * 处理get请求
         * @param request
         */
        private void processGetRequest(FullHttpRequest request){
            QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
            Map<String, List<String>> params = decoder.parameters();
            params.forEach((key, value) -> System.out.println(key + " ==> "+ value));
        }
    
        /**
         * 处理post form请求
         * @param request
         */
        private void processPostFormRequest(FullHttpRequest request){
            HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(request);
            List<InterfaceHttpData> httpDataList = decoder.getBodyHttpDatas();
            httpDataList.forEach(item -> {
                if (item.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute){
                    Attribute attribute = (Attribute) item;
                    try {
                        System.out.println(attribute.getName() + " ==> " + attribute.getValue());
                    }catch (Exception e){
                        e.printStackTrace();
                    }
    
                }
            });
        }
    
        /**
         * 处理post json请求
         * @param request
         */
        private void processPostJsonRequest(FullHttpRequest request){
            ByteBuf content = request.content();
            byte[] bytes = new byte[content.readableBytes()];
            content.readBytes(bytes);
    
            JSONObject jsonObject = JSONObject.parseObject(new String(bytes));
            jsonObject.getInnerMap().forEach((key,value) -> System.out.println(key + " ==> " + value));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    5 启动Netty服务端

    import com.primeton.netty.server.MyNettyServer;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.scheduling.annotation.EnableAsync;
    
    /**
     * 启动类
     *
     * @author lics
     * @version 1.0.0
     * @date 2022/10/31 15:17
     */
    @EnableAsync
    @SpringBootApplication
    public class NettyApplication implements CommandLineRunner {
    
        public static void main(String[] args) {
            SpringApplication app = new SpringApplication(NettyApplication.class);
            app.run(args);
        }
    
        @Async
        @Override
        public void run(String... args) {
            /*
             * 使用异步注解方式启动netty服务端服务
             */
            new MyNettyServer().startNettyServer(5678,8888);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    启动成功后,可以看到控制台会打印两行输出
    在这里插入图片描述

    5.1 TCP连接测试

    使用客户端连接工具进行连接测试
    输入端口号,地址,点击TCP连接测试

    发送 ‘Hello world’ 数据,控制台打印并发送 ‘I got it’ 到客户端
    发送数据后,结果如下

    TCP/UDP连接工具下载
    链接:https://pan.baidu.com/s/1YMg602U9xPgGVsFZx5zVaw
    提取码:144t

    5.2 HTTP连接测试

    这里进行HTTP连接测试时,要把MyNettyChannelInitializer类中有关TCP连接的代码注掉,否则会连接失败

    在这里插入图片描述
    到此,spring boot整合Netty完成,后面将根据这个demo去跟踪学习Netty的源码,喜欢可以关注一下,大家一起学习,共同进步。

  • 相关阅读:
    “节省内存、提升性能:享元模式的神奇之处“
    点到参数曲面的最小距离、参数曲面间的最小距离(如样条曲面)、拓扑面间的最小距离(OpenCascade)
    树与堆(详解)
    【HarmonyOS NEXT星河版开发学习】小型测试案例05-得物列表项
    js-includes()方法
    Ubuntu使用dense_flow提取视频图像的光流图像
    让一个模型处理多种数据的N种方法
    java计算机毕业设计汇美食电子商城源码+mysql数据库+系统+LW文档+部署
    计算机毕业设计之java+javaweb社区共享食堂信息系统
    继承(六)—— 菱形继承的问题以及解决方案(初识虚基表)
  • 原文地址:https://blog.csdn.net/weixin_43407520/article/details/127750062