• 12.netty中tcp粘包拆包问题及解决方法


    【README】

    • 1.本文源代码总结自B站《netty-尚硅谷》;
    • 2.本文介绍了tcp粘包拆包问题;
    • 3.本文po 出了粘包拆包问题解决方案及源代码实现;

    【1】tcp粘包拆包问题

    refer2 How to deal with the problem of packet sticking and unpacking during TCP transmission? - 编程知识

    【1.1】粘包拆包问题描述

    • 假设客户端发送2个连续的数据包到服务器,数据包用packet1,packet2分别表示,则服务器接收到的数据可以分为3种情况

    1)情况1: 服务器接收到2个数据包,没有拆包,也没有粘包问题

     2)情况2: 服务器只接收到一个数据包(存在粘包问题

    • 因为tcp不会丢失数据包,因此这一个数据包就封装了2个原生数据包的信息,这种现象叫做粘包
    • 在这种情况,接收者并不知道2个原生包的界限,因此接收者很难处理;

    3)情况3: 接收者接收到2个冗余或不完整的数据包(粘包与拆包问题同时发生

    • 接收者接收到2个数据包,但这2个数据包要么不完整,要么掺杂了其他数据包的部分数据
    • 在这种情况下,粘包与拆包同时发生
    • 如果这2个包不被特殊处理,对于接收者来说也很难处理;


    【1.2】代码演示粘包拆包问题 

    注意:

    • 限于篇幅,本节没有po出全部代码, 能够表达意思即可;

    1)业务场景:客户端连续发送10条消息(字符串)到服务器,查看服务器接收情况;

    2)客户端发送消息代码:

     3)服务器接收消息代码:

    3.1)服务器接收消息的打印效果:

    1. =================================
    2. 服务器收到的数据 hello server0
    3. 服务器累计收到 [1] 个消息包
    4. =================================
    5. 服务器收到的数据 hello server1
    6. 服务器累计收到 [2] 个消息包
    7. =================================
    8. 服务器收到的数据 hello server2
    9. hello server3
    10. hello server4
    11. hello server5
    12. hello server6
    13. 服务器累计收到 [3] 个消息包
    14. =================================
    15. 服务器收到的数据 hello server7
    16. hello server8
    17. hello server9
    18. 服务器累计收到 [4] 个消息包

    【效果解说】

    • 客户端发送了10条消息,服务器接收到了 4个数据包,而不是10个数据包
    • 显然,发生了tcp粘包
    • 这10条消息本来是10个数据报文,却被合并(粘)为4个数据包;
    • 问题是: 如何把这4个数据包还原为10个数据包呢 (在高并发情况下,各式各样的数据包会更多)
      • 如果无法还原,则服务器无法正确解析报文并做相应处理;

     【2】 粘包与拆包原因 

    1)粘包原因:

    • 发送的数据大小 小于 发送缓冲区,tcp就会把发送的数据多次写入缓冲区,此时发生粘包;
    • 接收数据方的应用层没有及时从 接收缓冲区读取数据,也会发生粘包;

    2)拆包原因:

    • 发送的数据大小 大于 tcp发送缓冲区,就会发生拆包;
    • 发送的数据大小 大于 报文最大长度,也会拆包;

    【3】粘包拆包解决方法

    解决粘包拆包的关键在于 为每一个数据包添加界限标识,常用方法如下:

    • 方法1)发送方为每一个数据包添加报文头部。头部至少包含数据包长度(类似http协议的头部length)。 通过这种方式,接收方通过读取头部的长度知道当前数据包的界限,并在界限处停止读取。
    • 方法2)发送方以固定长度封装数据包。如果不足,则补0填充。
    • 方法3)自定义设置数据包的界限标识,如添加特别标识(如======)。接收方通过标识可以识别不同的数据包;

    【4】粘包拆包问题解决的源代码实现

    解决方法是:采用方法1,设置每个数据包的长度到报文头部

    【4.1】协议数据包封装类

    1. /**
    2. * @Description 协议数据包
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class ProtocolMessage {
    8. private int length;
    9. private byte[] content;
    10. /**
    11. * @description 构造器
    12. * @author xiao tang
    13. * @date 2022/9/10
    14. */
    15. public ProtocolMessage() {
    16. }
    17. public int getLength() {
    18. return length;
    19. }
    20. public void setLength(int length) {
    21. this.length = length;
    22. }
    23. public byte[] getContent() {
    24. return content;
    25. }
    26. public void setContent(byte[] content) {
    27. this.content = content;
    28. }
    29. }

    【4.2】服务器

    1)服务器 :

    1. public class ProtocolNettyServer89 {
    2. public static void main(String[] args) throws InterruptedException {
    3. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    4. EventLoopGroup workerGroup = new NioEventLoopGroup();
    5. try {
    6. ServerBootstrap serverBootstrap = new ServerBootstrap();
    7. serverBootstrap.group(bossGroup, workerGroup)
    8. .channel(NioServerSocketChannel.class)
    9. .childHandler(new ProtocolNettyServerInitializer()); // 自定义一个初始化类
    10. // 自动服务器
    11. ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();
    12. System.out.println("服务器启动成功");
    13. // 监听关闭
    14. channelFuture.channel().closeFuture().sync();
    15. } finally {
    16. bossGroup.shutdownGracefully();
    17. workerGroup.shutdownGracefully();
    18. }
    19. }
    20. }

    2) 服务端初始化器:

    1. public class ProtocolNettyServerInitializer extends ChannelInitializer {
    2. @Override
    3. protected void initChannel(SocketChannel ch) throws Exception {
    4. ChannelPipeline pipeline = ch.pipeline();
    5. // 添加入站解码器-把字节转为协议报文便于业务逻辑处理
    6. pipeline.addLast(new ProtocolMessageDecoder());
    7. // 添加出站编码器-把协议报文转为字节便于网络传输
    8. pipeline.addLast(new ProtocolMessageEncoder());
    9. // 添加业务逻辑handler
    10. pipeline.addLast(new ProtocolNettyServerHandler());
    11. }
    12. }

    3)处理器:

    1. public class ProtocolNettyServerHandler extends SimpleChannelInboundHandler {
    2. private int count = 0;
    3. @Override
    4. protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage msg) throws Exception {
    5. // 接收到数据并处理
    6. int length = msg.getLength();
    7. String bodyStr = new String(msg.getContent(), StandardCharsets.UTF_8);
    8. System.out.println("====================================");
    9. System.out.println("服务器接收的消息如下:");
    10. System.out.println("报文长度:" + length);
    11. System.out.println("报文体内容: " + bodyStr);
    12. System.out.println("服务器累计接收到的消息包数量 = " + ++this.count);
    13. // 回复客户端
    14. byte[] body = ("我是服务器" + count).getBytes(StandardCharsets.UTF_8);
    15. int responseLen = body.length;
    16. // 构建一个响应协议包
    17. ProtocolMessage responseMsg = new ProtocolMessage();
    18. responseMsg.setLength(responseLen);
    19. responseMsg.setContent(body);
    20. ctx.writeAndFlush(responseMsg);
    21. }
    22. @Override
    23. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    24. cause.printStackTrace();
    25. ctx.close();
    26. }
    27. }

     【4.3】客户端

    1)客户端:

    1. public class ProtocolNettyClient89 {
    2. public static void main(String[] args) throws InterruptedException {
    3. EventLoopGroup group = new NioEventLoopGroup();
    4. try {
    5. Bootstrap bootstrap = new Bootstrap();
    6. bootstrap.group(group)
    7. .channel(NioSocketChannel.class)
    8. .handler(new ProtocolNettyClientInitializer()); // 自定义一个初始化类
    9. // 连接服务器
    10. ChannelFuture channelFuture = bootstrap.connect("localhost", 8089).sync();
    11. channelFuture.channel().closeFuture().sync();
    12. } finally {
    13. group.shutdownGracefully();
    14. }
    15. }
    16. }

    2)初始化器:

    1. public class ProtocolNettyClientInitializer extends ChannelInitializer {
    2. @Override
    3. protected void initChannel(SocketChannel ch) throws Exception {
    4. ChannelPipeline pipeline = ch.pipeline();
    5. // 添加出站处理器- 协议报文转字节以便网络传输
    6. pipeline.addLast(new ProtocolMessageEncoder());
    7. // 添加入站解码器-把字节转为协议报文对象以便业务逻辑处理
    8. pipeline.addLast(new ProtocolMessageDecoder());
    9. // 添加一个自定义handler,处理业务逻辑
    10. pipeline.addLast(new ProtocolNettyClientHandler());
    11. }
    12. }

    3)处理器:

    1. public class ProtocolNettyClientHandler extends SimpleChannelInboundHandler {
    2. private int count;
    3. @Override
    4. protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage msg) throws Exception {
    5. // 读取服务器响应报文
    6. int length = msg.getLength();
    7. byte[] body = msg.getContent();
    8. System.out.println("=============================");
    9. System.out.println("客户端接收的消息如下:");
    10. System.out.println("长度 = " + length);
    11. System.out.println("报文体 = " + new String(body, StandardCharsets.UTF_8));
    12. System.out.println("客户端累计接收的消息包数量 = " + ++count);
    13. }
    14. @Override
    15. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    16. // 发送10条数据到服务器
    17. for (int i = 1; i <= 5; i++) {
    18. byte[] body = ("你好服务器,我是客户端张三" + i).getBytes(StandardCharsets.UTF_8);
    19. // 创建协议包对象
    20. ProtocolMessage message = new ProtocolMessage();
    21. message.setContent(body);
    22. message.setLength(body.length);
    23. // 发送
    24. ctx.writeAndFlush(message);
    25. }
    26. }
    27. @Override
    28. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    29. cause.printStackTrace();
    30. ctx.close();
    31. }
    32. }

    【4.4】编码器与解码器

    1)解码器

    1. /**
    2. * @Description 协议报文解码器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月10日
    6. */
    7. public class ProtocolMessageDecoder extends ByteToMessageDecoder {
    8. @Override
    9. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
    10. System.out.println("ProtocolMessageDecoder.decode() 被调用");
    11. // 把字节 转为 协议报文
    12. int length = in.readInt();
    13. byte[] body = new byte[length];
    14. in.readBytes(body);
    15. // 封装成 ProtocolMessage,放入out,送入下一个 Handler处理
    16. ProtocolMessage protocolMessage = new ProtocolMessage();
    17. protocolMessage.setLength(length);
    18. protocolMessage.setContent(body);
    19. // 添加到out
    20. out.add(protocolMessage);
    21. }
    22. }
    23. 2)编码器 :

      1. /**
      2. * @Description 协议消息编码器
      3. * @author xiao tang
      4. * @version 1.0.0
      5. * @createTime 2022年09月10日
      6. */
      7. public class ProtocolMessageEncoder extends MessageToByteEncoder {
      8. @Override
      9. protected void encode(ChannelHandlerContext ctx, ProtocolMessage msg, ByteBuf out) throws Exception {
      10. System.out.println("ProtocolMessageEncoder.encode() 被调用");
      11. out.writeInt(msg.getLength());
      12. out.writeBytes(msg.getContent());
      13. }
      14. }

      【4.5】目录结构:

      【4.6】打印效果:

      1)客户端发送5条消息到服务器:

      2)服务器接收的数据包为 5个,如下(显然没有发生拆包粘包问题):

      1. ProtocolMessageDecoder.decode() 被调用
      2. ====================================
      3. 服务器接收的消息如下:
      4. 报文长度:40
      5. 报文体内容: 你好服务器,我是客户端张三1
      6. 服务器累计接收到的消息包数量 = 1
      7. ProtocolMessageEncoder.encode() 被调用
      8. ProtocolMessageDecoder.decode() 被调用
      9. ====================================
      10. 服务器接收的消息如下:
      11. 报文长度:40
      12. 报文体内容: 你好服务器,我是客户端张三2
      13. 服务器累计接收到的消息包数量 = 2
      14. ProtocolMessageEncoder.encode() 被调用
      15. ProtocolMessageDecoder.decode() 被调用
      16. ====================================
      17. 服务器接收的消息如下:
      18. 报文长度:40
      19. 报文体内容: 你好服务器,我是客户端张三3
      20. 服务器累计接收到的消息包数量 = 3
      21. ProtocolMessageEncoder.encode() 被调用
      22. ProtocolMessageDecoder.decode() 被调用
      23. ====================================
      24. 服务器接收的消息如下:
      25. 报文长度:40
      26. 报文体内容: 你好服务器,我是客户端张三4
      27. 服务器累计接收到的消息包数量 = 4
      28. ProtocolMessageEncoder.encode() 被调用
      29. ProtocolMessageDecoder.decode() 被调用
      30. ====================================
      31. 服务器接收的消息如下:
      32. 报文长度:40
      33. 报文体内容: 你好服务器,我是客户端张三5
      34. 服务器累计接收到的消息包数量 = 5
      35. ProtocolMessageEncoder.encode() 被调用

    24. 相关阅读:
      Vue3像Vue2一样在prototype(原型)上挂载数据
      蓝桥杯算法竞赛系列第九章·巧解哈希题,用这3种数据类型足矣
      【重温基础算法】内部排序之桶排序法
      Squid代理服务器
      Python条件语句的用法
      Redis常用命令
      重学java 71.网络编程
      elasticsearch 之时间类型
      B+tree 与 B-tree区别
      java计算机毕业设计学生日常事务管理系统源码+mysql数据库+lw文档+系统+调试部署
    25. 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126798838