• Spring Boot与Netty打造TCP服务端(解决粘包问题)


    欢迎来到我的博客,代码的世界里,每一行都是一个故事


    在这里插入图片描述

    前言

    物联网时代,设备之间的通信变得愈发重要。本文将带你踏上一场关于如何用Spring Boot和Netty搭建TCP服务端的冒险之旅。无论是智能家居、工业自动化还是其他物联网应用,构建一个稳健的通信桥梁将成为连接未来的关键。

    功能目标

    • 实现springboot+netty整合TCP服务端(基础)
    • 实现消息回复功能
    • 实现消息太长导致的粘包问题(比如发送一个base64的图片信息)
    • 实现在自定义Handler中注入spring的bean
    • 保证完成任务,哈哈哈哈哈

    项目实现

    maven坐标

    
    <dependency>
      <groupId>io.nettygroupId>
      <artifactId>netty-commonartifactId>
      <version>4.1.79.Finalversion>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    构建自定义Handler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    
    /**
     * I/O数据读写处理类
     *
     * @author xiaobo
     */
    @Slf4j
    public class CarTcpNettyChannelInboundHandlerAdapter extends ChannelInboundHandlerAdapter {
    
    
        /**
         * 从客户端收到新的数据时,这个方法会在收到消息时被调用
         *
         * @param ctx
         * @param msg
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
            // 这里是在前面的DelimiterBasedFrameDecoder转为了ByteBuf,验证是否是ByteBuf
            if (msg instanceof ByteBuf) {
                ByteBuf byteBuf = (ByteBuf) msg;
                try {
                    String receivedData = byteBuf.toString(CharsetUtil.UTF_8);
                    // 接收完整数据
                    handleReceivedData(receivedData);
                } finally {
                    // 释放 ByteBuf 占用的资源
                    byteBuf.release();
                    // 回复消息
                    ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8));
                }
            }
        }
    
        private void handleReceivedData(String receivedData) {
            // 数据处理
            // 这里如果想实现spring中bean的注入,可以用geBean的方式获取
        }
    
        /**
         * 从客户端收到新的数据、读取完成时调用
         *
         * @param ctx
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
            log.info("channelReadComplete");
            ctx.flush();
        }
    
        /**
         * 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
         *
         * @param ctx
         * @param cause
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {
            cause.printStackTrace();
            ctx.close();// 抛出异常,断开与客户端的连接
        }
    
        /**
         * 客户端与服务端第一次建立连接时 执行
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {
            super.channelActive(ctx);
            ctx.channel().read();
            InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = socket.getAddress().getHostAddress();
            // 此处不能使用ctx.close(),否则客户端始终无法与服务端建立连接
            System.out.println("channelActive:" + clientIp + ctx.name());
          	// 这里是向客户端发送回应
            ctx.writeAndFlush(Unpooled.copiedBuffer("收到over", CharsetUtil.UTF_8));
        }
    
        /**
         * 客户端与服务端 断连时 执行
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
            super.channelInactive(ctx);
            InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = socket.getAddress().getHostAddress();
            // 断开连接时,必须关闭,否则造成资源浪费,并发量很大情况下可能造成宕机
            ctx.close();
            log.info("channelInactive:{}", clientIp);
        }
    
        /**
         * 服务端当read超时, 会调用这个方法
         *
         * @param ctx
         * @param evt
         * @throws Exception
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
            super.userEventTriggered(ctx, evt);
            InetSocketAddress socket = (InetSocketAddress) ctx.channel().remoteAddress();
            String clientIp = socket.getAddress().getHostAddress();
            ctx.close();// 超时时断开连接
            log.info("userEventTriggered:" + clientIp);
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            log.info("channelRegistered");
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            log.info("channelUnregistered");
        }
    
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            log.info("channelWritabilityChanged");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137

    ChannelInitializer实现

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    
    /**
     * description: 

    通道初始化

    * * @author bo * @version 1.0 * @date 2024/2/27 16:13 */
    public class CarTcpNettyChannelInitializer<SocketChannel> extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf delemiter = Unpooled.buffer(); delemiter.writeBytes("$".getBytes()); // 这里就是解决数据过长问题,而且数据是以$结尾的 ch.pipeline().addLast(new DelimiterBasedFrameDecoder(907200, true, true, delemiter)); // 自定义ChannelInboundHandlerAdapter ch.pipeline().addLast(new CarTcpNettyChannelInboundHandlerAdapter()); } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    server实现

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.AdaptiveRecvByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    /**
     * description: 

    netty创建的TCP

    * * @author bo * @version 1.0 * @date 2024/2/27 16:25 */
    @Slf4j public class CarTcpNettyServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 // NioEventLoopGroup 是用来处理I/O操作的Reactor线程组 // bossGroup:用来接收进来的连接,workerGroup:用来处理已经被接收的连接,进行socketChannel的网络读写, // bossGroup接收到连接后就会把连接信息注册到workerGroup // workerGroup的EventLoopGroup默认的线程数是CPU核数的二倍 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) // netty 默认数据包传输大小为1024字节, 设置它可以自动调整下一次缓冲区建立时分配的空间大小,避免内存的浪费 最小 初始化 最大 (根据生产环境实际情况来定) // 使用对象池,重用缓冲区 .option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576)) .childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(64, 10496, 1048576)) // 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息 .childHandler(new CarTcpNettyChannelInitializer<SocketChannel>()); log.info("<===========netty server start success!==============>"); // 绑定端口,同步等待成功 ChannelFuture f = serverBootstrap.bind(port).sync(); // 等待服务器监听端口关闭 f.channel().closeFuture().sync(); } finally { // 退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
  • 相关阅读:
    操作系统安全:Windows与Linux的安全标识符,身份鉴别和访问控制
    多商户商城系统功能拆解43讲-平台端应用-客服话术
    lio-sam框架:后端里程计、回环、gps融合
    算法题:给定一个字符串,字符串中包含一些空格,将字符串中由空格隔开的单词反序,并反转每个字符的大小写。
    14.haproxy+keepalived负载均衡和高可用
    茶百道:门店数量狂飙,食品安全问题成最大绊脚石
    几种点云(网格)孔洞填充方法(1)
    SwiftUI 动态岛开发教程之 07 Live Activities实时活动的要求和限制
    基于云服务MRS构建DolphinScheduler2调度系统
    TiUP 镜像参考指南
  • 原文地址:https://blog.csdn.net/Mrxiao_bo/article/details/136435751