• Netty的拆包粘包问题


    Netty使用的是TCP/IP协议,必然会遇到拆包粘包的问题,Netty也给出了相关的解决方案,记录下Netty如何解决拆包粘包问题。

    TCP/IP协议是"流"协议,就是类似水流一样的数据传输方式,当我们多次请求的时候,就会存在多发送和少发送的问题,也就是拆包粘包问题,简单来讲,粘包就是本来想独立发送M、N两个数据,但是最后把M、N一起发送了,把MN粘在一起了,就是粘包;发送M数据的时候,M被拆了几份,但是不是自己主动拆的,而是TCP自动拆的,就是拆包。

    Netty的解决方案:

    • 1、消息定长
    • 2、特殊结束符
    • 3、协议

    基础Netty的程序

    服务端

    import java.nio.charset.Charset;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server {
        //监听线程组,监听客户端请求
        private EventLoopGroup acceptorGroup = null;
        //处理客户端相关操作线程组,负责处理与客户端的数据通讯
        private EventLoopGroup clientGroup = null;
        //服务端启动相关配置信息
        private ServerBootstrap bootstrap = null;
    
        public Server(){
            init();
        }
    
        private void init() {
            //初始化线程组
            acceptorGroup = new NioEventLoopGroup();
            clientGroup = new NioEventLoopGroup();
            //初始化服务端配置
            bootstrap = new ServerBootstrap();
            //绑定线程组
            bootstrap.group(acceptorGroup,clientGroup)
            //设定通讯模式为NIO,同步非阻塞
            .channel(NioServerSocketChannel.class)
            //设定缓冲区的大小 缓存区的单位是字节
            .option(ChannelOption.SO_BACKLOG, 1024)
            //SO_SNDBUF 发送缓冲区 SO_RCVBUF 接受缓冲区  SO_KEEPALIVE 开启心跳监测(保证连接有效)
            .option(ChannelOption.SO_SNDBUF, 16*1024)
                    .option(ChannelOption.SO_RCVBUF, 16*1024)
                    .option(ChannelOption.SO_KEEPALIVE, true);
    
        }
    
        /**
         * 监听处理逻辑
         * @param port 端口
         * @return
         * @throws InterruptedException
         */
        public ChannelFuture getChannelFuture(int port ) throws InterruptedException{
            bootstrap.childHandler(new ChannelInitializer() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {        
                    //字符串解码器handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new ServerHandler());
                }
            });
            ChannelFuture future = bootstrap.bind(port).sync();
            return future;
        }
    
        public void release(){
            this.acceptorGroup.shutdownGracefully();
            this.clientGroup.shutdownGracefully();
        }
    
        public static void main(String[] args) {
            ChannelFuture future = null;
            Server server = null;
    
            try{
                server = new Server();
                future = server.getChannelFuture(9999);
                System.out.println("server started ... ");
    
                future.channel().closeFuture().sync();
    
            }catch (Exception e) {
                e.printStackTrace();
            }finally {
                if(null != future){
                    try {
                        future.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(null != server){
                    server.release();
                }
            }    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96

    服务端消息处理器

    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            String message = msg.toString();
    
            System.out.println("from client : "+ message);
    
            String line = "ok";
            ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
        }
    
        /**
         * 异常处理逻辑,当客户端异常退出时,也会运行
         *     ChannelHandlerContext关闭,也代表当前客户端连接的资源关闭
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("server exceptionCaught method run...");
            ctx.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    客户端

    import java.nio.charset.Charset;
    import java.util.Scanner;
    import java.util.concurrent.TimeUnit;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client {
    
        //处理请求和处理服务端响应的线程组
        private EventLoopGroup group = null;
    
        //客户启动相关配置信息
        private Bootstrap bootstrap = null;
    
        public Client(){
            init();
        }
    
        private void init() {
            group = new NioEventLoopGroup();
            bootstrap= new Bootstrap();
            //绑定线程组
            bootstrap.group(group);
            //设定通讯模式为NIO,同步非阻塞
            bootstrap.channel(NioSocketChannel.class);
        }
    
        public ChannelFuture getChannelFuture(String host,int port) throws InterruptedException{
            this.bootstrap.handler(new ChannelInitializer() {
                protected void initChannel(SocketChannel ch) throws Exception {    
    
                    //字符串解码器handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new ClientHandler());
                }
            });
    
            ChannelFuture future = this.bootstrap.connect(host,port).sync();
            return future;
        }
    
        public void release(){
            this.group.shutdownGracefully();
        }
    
        public static void main(String[] args) {
            Client client = null;
            ChannelFuture future = null;
            Scanner scanner =null;
            try {
                client = new Client();
                future = client.getChannelFuture("localhost", 9999);
    
                while(true){
                    scanner = new Scanner(System.in);
                    System.out.println("enter message send to server > ");
                    String line = scanner.nextLine();
                    future.channel().writeAndFlush(Unpooled.copiedBuffer(line.getBytes("UTF-8")));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                if(null != scanner){
                    scanner.close();
                }
                if(null != future){
                    try {
                        future.channel().closeFuture().sync();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                if(null != client){
                    client.release();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    客户端消息处理器

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            try{
    
                String message = msg.toString();
                System.out.println("from server : "+message);
            }finally {
                //用于释放缓存。避免内存溢出
                ReferenceCountUtil.release(msg);
            }
    
        }
    
        /**
         * 异常处理逻辑,当客户端异常退出时,也会运行
         *     ChannelHandlerContext关闭,也代表当前客户端连接的资源关闭
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("client exceptionCaught method run...");
            ctx.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    Netty解决方案

    消息定长

    数据用一定的长度发送,如果数据长度不够使用空白字符代替。

    在以下方法中添加定长消息的Handler

    public ChannelFuture getChannelFuture(int port ) throws InterruptedException{
        bootstrap.childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {    
                //通过构造参数设置消息长度(单位是字节)
                ch.pipeline().addLast(new FixedLengthFrameDecoder(3));    
                //字符串解码器handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串
                ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(new ServerHandler());
            }
        });
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    上述是其中一个Handler,其中还有一个LengthFieldBasedFrameDecoder它是从头部字段确定为帧长,然后从数据流中提取指定的字节数。

    特殊结束符

    客户端和服务端协商特殊的分隔符,以此为结束符。

    在以下方法中添加处理分隔符的Handler

    public ChannelFuture getChannelFuture(int port ) throws InterruptedException{
        bootstrap.childHandler(new ChannelInitializer() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {    
                //数据分割符,定义的数据分隔符一定是一个ByteBuf类型的数据对象
                ByteBuf delimiter = Unpooled.copiedBuffer("$E$".getBytes());
                //使用特殊符号分割
                ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));    
                //字符串解码器handler,会自动处理channelRead方法的msg参数,将ByteBuf类型的数据转换为字符串
                ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                ch.pipeline().addLast(new ServerHandler());
            }
        });
        ChannelFuture future = bootstrap.bind(port).sync();
        return future;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    除了上述的一个Handler,还有一个LineBasedFrameDecoder它是提取由行尾符(\r或\r\n)分割的Handler。它比DelimiterBasedFrameDecoder效率高。

    协议

    服务端提供一个固定的协议标准。然后客户端和服务端以协议来传输数据,类似一个HTTPS协议。协议需要自己定义灵活度比较高,这里就不用代码测试了,当然协议这种方式也是一种相对比较好的方法。

  • 相关阅读:
    使用HTML CSS制作静态网站【中秋节】
    智能可观测性如何赋能智能汽车主机厂
    浅谈微信小程序的功能定位和使用场景
    海康威视面试经历
    .NET Core 日志系统
    Redis 分布式锁
    最快的 TCP 拥塞控制算法
    吉利银河L6顶配续航测试 记录 方便后续对比
    Pytorch常用api详解
    23.09.05内网盲区记录
  • 原文地址:https://blog.csdn.net/flash_love/article/details/132752718