• Netty实践 -- Netty处理粘包拆包


    TCP 粘包/拆包

    TCP是以流的方式来处理数据,

    拆包:一个完整的数据包可能会被TCP拆分成多个包进行发送。

    粘包:TCP 可能把多个小的包粘成一个大的数据包。

    引入依赖包:

            
                io.netty
                netty-all
                4.1.29.Final
            
    
    • 1
    • 2
    • 3
    • 4
    • 5

    粘包拆包示例:

    • 服务端 EchoServer:
    /**
     * 服务端收到客户端的消息后,会进行响应。
     */
    public final class EchoServer {
        /**
         * 端口
         */
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
    
        public static void main(String[] args) throws Exception {
            // 配置 EventLoopGroup
            // 主从 Reactor 多线程模式,bossGroup是 主 Reactor,workerGroup是 从Reactor
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //初始化服务器的引导类 ServerBootstrap
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                //指定 EventLoopGroup
                serverBootstrap.group(bossGroup, workerGroup)
                    //指定 channel
                 .channel(NioServerSocketChannel.class)
                 .option(ChannelOption.SO_BACKLOG, 100)
                    //指定 ChannelHandler,用于处理 Channel
                 .handler(new LoggingHandler(LogLevel.INFO))
                 .childHandler(new ChannelInitializer() {
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         //ChannelPipeline,基于责任链模式,可以添加多个 ChannelHandler
                         ChannelPipeline channelPipeline = ch.pipeline();
                         //ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
                         //固定长度的拆包器 FixedLengthFrameDecoder
    //                     channelPipeline.addLast(new FixedLengthFrameDecoder(19));
                         channelPipeline.addLast(new EchoServerHandler());
                     }
                 });
    
                // 开启服务器,将服务器绑定到它要监听连接请求的端口上
                ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
    
                // 等待直到服务器socket关闭
                channelFuture.channel().closeFuture().sync();
            } finally {
                //关闭所有 eventLoop,终止线程
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 服务端 EchoServerHandler:
    
    /**
     * 服务端的 ChannelHandler.
     *
     * ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
     * 继承 ChannelInboundHandlerAdapter,用来定义响应入站事件的方法。
     *
     */
    @Sharable
    public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    
        /**
         * channelRead() :读取 channel 传入的消息
         *
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ByteBuf buf= (ByteBuf) msg;
            log.info("客户端发来的消息:"+ buf.toString(CharsetUtil.UTF_8) +"\n");
        }
    
        /**
         * channelReadComplete() :表示当前 ChannelHandler 读取完毕.
         * 执行后会自动跳转到 ChannelPipeline 中的下一个 ChannelHandler.
         *
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) {
            //向客户端返回数据,writeAndFlush() 也可以拆分成 write(msg) 和 flush()
            ctx.writeAndFlush(Unpooled.copiedBuffer("见到你,我也很高兴^_^",CharsetUtil.UTF_8));
        }
    
        /**
         * exceptionCaught(): 在读取操作期间,有异常抛出时会调用。
         *
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
            // 发生异常时关闭连接
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 客户端 EchoClient
    /**
     * 客户端。发送消息给服务端,并接收服务端的响应。
     *
     */
    public final class EchoClient {
    
        static final String HOST = System.getProperty("host", "127.0.0.1");
        static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
        static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
    
        public static void main(String[] args) throws Exception {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)
                 .channel(NioSocketChannel.class)
                 .option(ChannelOption.TCP_NODELAY, true)
                 .handler(new ChannelInitializer() {
                     @Override
                     public void initChannel(SocketChannel socketChannel) throws Exception {
                         ChannelPipeline channelPipeline = socketChannel.pipeline();
                         //ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
                         //固定长度的拆包器 FixedLengthFrameDecoder
    //                     channelPipeline.addLast(new FixedLengthFrameDecoder(19));
                         channelPipeline.addLast(new ClientNoDecoderHandler());
                     }
                 });
    
                // 开启客户端,连接服务端的端口
                ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();
    
                channelFuture.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 客户端 ClientNoDecoderHandler:
    public class ClientNoDecoderHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            for (int i = 0; i < 1000; i++) {
                ByteBuf buffer = getByteBuf(ctx);
                ctx.channel().writeAndFlush(buffer);
            }
        }
    
        private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
            byte[] bytes = "粘包拆包测试.".getBytes(StandardCharsets.UTF_8);
            ByteBuf buffer = ctx.alloc().buffer();
            buffer.writeBytes(bytes);
            return buffer;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    测试结果:

    客户端发送了多个字节,服务端在接收时可能会发生粘包拆包。

    比如以下客户端发送了多个 “粘包拆包测试.” ,结果有好几条数据粘在了一起,有些又被拆开了。

    21:32:18.793 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.
    
    21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.�
    
    21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:�包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆�
    
    21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:�测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.
    
    21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.粘包拆包测试.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Netty解决粘包拆包

    Netty 可以使用多个拆包器处理粘包拆包。

    • FixedLengthFrameDecoder:

    固定长度拆包器,Netty 在消息长度固定的场景下,对固定长度的流数据进行解码。

    • LineBasedFrameDecoder:

    行拆包器,发送端发送数据包的时候,每个数据包之间以换行符(即\n或者\r\n)作为分隔。

    • DelimiterBasedFrameDecoder:

    分隔符拆包器。DelimiterBasedFrameDecoder 通过用户指定的分隔符对数据进行粘包和拆包处理。

    • LengthFieldBasedFrameDecoder:

    基于长度域拆包器。最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。

    测试:

    使用 FixedLengthFrameDecoder 进行拆包。

    将示例的 EchoClient 、 EchoServer 这两个类注释掉的以下代码放开。

    channelPipeline.addLast(new FixedLengthFrameDecoder(19));
    
    • 1

    可以看到,数据流已经正常展示了,没有粘包拆包。

    22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.
    
    22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.
    
    22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.
    
    • 1
    • 2
    • 3
    • 4
    • 5

    参考资料:

    https://zhuanlan.zhihu.com/p/415450910

  • 相关阅读:
    网络安全(黑客)自学
    用汇编语言编程的计算机
    乱飞的悟空,看我程咬金天罡3斧
    Java后端面试该复习什么?只需一张图
    leetcode363周赛
    HJ3 明明的随机数
    Ubuntu:编译升级Linux内核
    解决办法‘npm‘ 不是内部或外部命令,也不是可运行的程序或批处理文件。
    编程杂谈|十余年后再做课堂练习题
    pixel2的root过程
  • 原文地址:https://blog.csdn.net/sinat_32502451/article/details/134001469