在基于TCP协议的网络编程中,不可避免地都会遇到粘包和拆包的问题。
先来看个例子,还是上篇文章 《Java网络编程——NIO的阻塞IO模式、非阻塞IO模式、IO多路复用模式的使用》 中“IO多路复用模式”一节中的代码:
服务端
@Slf4j
public class NIOServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080), 50);
Selector selector = Selector.open();
SelectionKey serverSocketKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
log.info("select event count:" + count);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
}
else if (selectionKey.isReadable()) {
handleRead(selectionKey);
}
iterator.remove();
}
}
}
private static void handleAccept(SelectionKey selectionKey) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (Objects.nonNull(socketChannel)) {
log.info("receive connection from client. client:{}", socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
Selector selector = selectionKey.selector();
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
private static void handleRead(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(4);
int length = socketChannel.read(readBuffer);
if (length > 0) {
log.info("receive message from client. client:{} message:{}", socketChannel.getRemoteAddress()
, new String(readBuffer.array(), 0, length, "UTF-8"));
} else if (length == -1) {
socketChannel.close();
return;
}
}
}
客户端
@Slf4j
public class NIOClient {
@SneakyThrows
public static void main(String[] args) {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
ByteBuffer byteBuffer1, byteBuffer2;
socketChannel.write(byteBuffer1 = ByteBuffer.wrap("你".getBytes(StandardCharsets.UTF_8)));
socketChannel.write(byteBuffer2 = ByteBuffer.wrap("好".getBytes(StandardCharsets.UTF_8)));
log.info("client send finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
socketChannel.close();
}
}
}
Run模式启动服务端后后,再运行客户端,发现服务端接收并打印的结果如下:
receive message from client. client:/127.0.0.1:63618 message:你�
receive message from client. client:/127.0.0.1:63618 message:��
咦?客户端发送的虽然是汉字,但发送和接收的都是UTF-8编码格式,怎么会乱码呢?而且第一个“你”字也被服务端解析出来了,并没有乱码。
再分别以Debug模式启动服务端、客户端来分析:
当客户端运行到log.info("client send finished");
时,可以发现“你”转化为UTF-8的字节数组为[-28, -67, -96]
,“好”转化为UTF-8的字节数组为其中“你”转化为[-27, -91, -67]
,先后两次分别向服务端发送了3个字节的数据:
服务端读数据的Buffer大小为4字节,所以得分两次读取,第一次读取了前4个字节[-28, -67, -96, -27]
:
在第一次读取到前4个字节并根据UTF-8规则解析为汉字时,前3个字节是完整的,可以转换为“你”,但第4个字节只是“好”对应的UTF-8字节数组的一部分,是不完整的,所以在解析的时候失败了,就显示出了乱码符号。
同理,在第二次读取的后2个字节也是不完整的,解析也不会成功,也显示了2个乱码符号。
那就有人会说了,不能在读取的时候把Buffer的大小置为3、6、9吗?
这只是模拟这种情况的一个简单的例子,如果把Buffer大小设置为6,那客户端要发送“你a好”呢(UTF-8字节数组为[-28, -67, -96, 97, -27, -91, -67]
)?还是可能会乱码(还是会把UTF-8字节数组拆开为[-28, -67, -96, 97, -27, -91]
和[-67]
),服务端分收到这两段数据后同样无法成功解析。
这就是我们常说的拆包(也有人叫半包),对应的还有粘包,就是在通过TCP协议交互数据过程中,TCP底层并不了解它的上层业务数据(比如此文的例子中放入ByteBuffer中要发送的数据,或者HTTP报文等)的具体含义,可能会根据实际情况(比如TCP缓冲区或者此文中定义的NIO接收数据的缓冲区ByteBuffer)对数据包进行拆分或合并。
当客户端发送了一段较长的数据包时,在客户端可能会分成若干个较小的数据包分别发送,或者在服务端也可能分成了若干个较小的数据包来接收。用一句话总结就是,客户端发送的一段数据包到服务端接收后可能会被拆分为多个数据包。
当客户端发送了若干个较短的数据包时,在发送端可能会拼接成一个较大的数据包来发送,在接收端也可能会合并成一个较大的数据包来接收。用一句话总结就是,客户端发送的多段数据包到服务端接收后可能会合并分为一个数据包。
在之前的文章 《TCP协议学习笔记、报文分析》 中也遇到了粘包的情况,客户端先后向服务端分别发送了长度为20、30、40的字符串,但是通过tcpdump抓包分析的结果是客户端向服务端只发送了一条length=90的TCP报文。
如上图所示:
从逻辑上来说,客户端先后向服务端发送了两段数据包:“你”和“好”对应的字节数组。实际上可能会发生很多种情况:
理想情况:理想情况下,数据包会按照正常逻辑一样,一个一个发送到服务端。
粘包:在某些情况下,比如当TCP缓冲区剩余空间大于所有数据包的大小,且发送时间间隔很短时,客户端也有可能会把这两段数据包合并成一个进行发送。
拆包:在某些情况下,比如当TCP缓冲区剩余空间大于某个数据包的大小时,客户端可能会把这个大的数据包拆分成若干个小的数据包分别发送。
解决粘包、拆包问题的核心,就是要确认消息边界,当接收到一个较大的数据包时,可以正确把它拆分为几段正确的、有意义的业务数据,当收到若干个小的数据包时,也可以根据消息边界把他们合并、再拆分为正确的、有意义的业务数据。
常见的解决粘包、拆包的思路有:分隔符、固定消息长度、TLV格式消息等。
可以用特定的分隔符来分隔消息,比如当发送“你好”([-28, -67, -96, -27, -91, -67])时,需要让“你”对应的字节数组([-28, -67, -96])作为一个整体被服务端解析,让“好”对应的字节数组([-27, -91, -67])作为一个整体被服务端解析,所以就可以在发送的时候,在“你”和“好”后面加一个分隔符(比如 “\n”),当服务端解析到“\n”就表示一个完整的数据包结束了。
当发生粘包时,服务端把“\n”之前的数据当成一个完整的数据包来处理,然后继续读取数据直到再遇到“\n”时,说明又读取到了一个完整的数据包,…… 直到把数据读完。需要注意的是,在最后一段数据最后也需要加分隔符,因为不加的话服务端可能会认为还有数据没发送完,就不会把最后一段数据当作一个完整的数据包。
在发生拆包①时,服务端读取到第一个数据包([-28, -67, -96, 10, -27])后,只会把 [-28, -67, -96] 当成一个完整的数据包来处理,然后把剩余的 [-27] 缓存起来,到了后面遇到“\n”后,再把 [-27] 和“\n”前面的 [-91, -67] 拼接起来当成一个完整的数据包,就可以解析成“好”。拆包②也一样。
代码实现
服务端
@Slf4j
public class NIOServer {
private final static char SPLIT = '\n';
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080), 50);
Selector selector = Selector.open();
SelectionKey serverSocketKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
} else if (selectionKey.isReadable()) {
handleRead(selectionKey);
}
iterator.remove();
}
}
}
private static void handleAccept(SelectionKey selectionKey) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (Objects.nonNull(socketChannel)) {
log.info("receive connection from client. client:{}", socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
Selector selector = selectionKey.selector();
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
private static void handleRead(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = Objects.isNull(selectionKey.attachment()) ? ByteBuffer.allocate(4) : (ByteBuffer) selectionKey.attachment();
int length = socketChannel.read(readBuffer);
if (length > 0) {
String readMessage = getSplitMessage(readBuffer);
log.info("receive message from client. client:{} message:{}", socketChannel.getRemoteAddress(), readMessage);
if (readBuffer.position()==readBuffer.capacity()){
ByteBuffer newReadBuffer=ByteBuffer.allocate(readBuffer.capacity()*2);
readBuffer.flip();
newReadBuffer.put(readBuffer);
readBuffer=newReadBuffer;
}
selectionKey.attach(readBuffer);
} else if (length == -1) {
socketChannel.close();
return;
}
}
private static String getSplitMessage(ByteBuffer readBuffer) throws UnsupportedEncodingException {
readBuffer.flip();
StringBuilder receivedMessage = new StringBuilder();
for (int i = 0; i < readBuffer.limit(); i++) {
byte split = readBuffer.get(i);
if (split == SPLIT) {
int length = i - readBuffer.position();
ByteBuffer byteBuffer = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
byteBuffer.put(readBuffer.get());
}
readBuffer.get();//把间隔符取出来
receivedMessage.append(new String(byteBuffer.array(),0,length,"UTF-8")).append("\n");
}
}
readBuffer.compact();
return receivedMessage.toString();
}
}
客户端
@Slf4j
public class NIOClient {
@SneakyThrows
public static void main(String[] args) {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
ByteBuffer byteBuffer1, byteBuffer2, byteBuffer3, byteBuffer4;
socketChannel.write(byteBuffer1 = ByteBuffer.wrap("你".getBytes(StandardCharsets.UTF_8)));
socketChannel.write(byteBuffer2 = ByteBuffer.wrap("\n".getBytes(StandardCharsets.UTF_8)));
socketChannel.write(byteBuffer3 = ByteBuffer.wrap("好".getBytes(StandardCharsets.UTF_8)));
socketChannel.write(byteBuffer4 = ByteBuffer.wrap("\n".getBytes(StandardCharsets.UTF_8)));
log.info("client send finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
socketChannel.close();
}
}
}
让每个具有意义的数据包占用固定长度的空间进行传送,当实际数据长度小于固定长度时用某种无意义的数据填充(比如空格)。假设固定长度为4,用空格填充无效数据,当发送“你好”([-28, -67, -96, -27, -91, -67])时,需要把“你”对应的字节数组([-28, -67, -96])放到一个固定长度为4的数组里([-28, -67, -96, 32]),因为“你”对应字节数组只占3位,所以剩余的一位用空格(32)来填充。同理,“好”也放到一个长度为4的字节数组中([-27, -91, -67, 32])。
当发生粘包时,服务端会依次把每4(约定的固定长度)个字节当成一个完整的数据包来处理,如果收到的数据包长度不是4的倍数,说明有拆包的情况,会把剩余数据缓存起来,等后面读取到新的数据包,会把加上之前剩余未处理的数据再次每4个字节、4个字节地读取。
代码实现
服务端
@Slf4j
public class NIOServer {
private final static int FIXED_LENGTH = 4;
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("10.100.63.93", 8080), 50);
Selector selector = Selector.open();
SelectionKey serverSocketKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
} else if (selectionKey.isReadable()) {
handleRead(selectionKey);
}
iterator.remove();
}
}
}
private static void handleAccept(SelectionKey selectionKey) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (Objects.nonNull(socketChannel)) {
log.info("receive connection from client. client:{}", socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
Selector selector = selectionKey.selector();
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
private static void handleRead(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = Objects.isNull(selectionKey.attachment()) ? ByteBuffer.allocate(4) : (ByteBuffer) selectionKey.attachment();
int length = socketChannel.read(readBuffer);
if (length > 0) {
String readMessage = getSplitMessage(readBuffer);
log.info("receive message from client. client:{} message:{}", socketChannel.getRemoteAddress(), readMessage);
if (readBuffer.position() == readBuffer.capacity()) {
ByteBuffer newReadBuffer = ByteBuffer.allocate(readBuffer.capacity() * 2);
readBuffer.flip();
newReadBuffer.put(readBuffer);
readBuffer = newReadBuffer;
}
selectionKey.attach(readBuffer);
} else if (length == -1) {
socketChannel.close();
return;
}
}
private static String getSplitMessage(ByteBuffer readBuffer) throws UnsupportedEncodingException {
readBuffer.flip();
StringBuilder receivedMessage = new StringBuilder();
while ((readBuffer.limit() - readBuffer.position()) >= FIXED_LENGTH) {
ByteBuffer byteBuffer = ByteBuffer.allocate(FIXED_LENGTH);
for (int i = 0; i < FIXED_LENGTH; i++) {
byteBuffer.put(readBuffer.get());
}
receivedMessage.append(new String(byteBuffer.array(), 0, FIXED_LENGTH, "UTF-8")).append("\n");
}
readBuffer.compact();
return receivedMessage.toString();
}
}
客户端
@Slf4j
public class NIOClient {
@SneakyThrows
public static void main(String[] args) {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
ByteBuffer byteBuffer1, byteBuffer2;
socketChannel.write(byteBuffer1 = ByteBuffer.wrap("你 ".getBytes(StandardCharsets.UTF_8)));
socketChannel.write(byteBuffer2 = ByteBuffer.wrap("好 ".getBytes(StandardCharsets.UTF_8)));
log.info("client send finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
socketChannel.close();
}
}
}
TLV(Type Length Value),Type表示数据类型,Length表示数据长度,Value表示数据本身。类似于常见的网络协议(如TCP协议规定一个TCP报文由TCP首部和数据部分组成,TCP首部存放了源主机和目标主机的ip地址、端口号等信息,数据部分存放真正要传输的数据;还有HTTP协议,也是由HTTP头部和body两部分组成),把消息分成消息头和消息体(还可以有消息尾部,比如数据链路层的数据帧尾部就有4个字节校验位,用于保证数据包的完整性)。
为了防止粘包/拆包造成数据不可读的情况,可以约定把消息分为两部分:消息长度+消息内容,客户端发送数据之前,先把数据长度放到数据包的开头,接着再加上真正的数据内容,服务端读取的时候,可以先获取到消息的长度(比如是n),然后再依次读取n个字节数据,就是这个数据包的有效部分。
代码实现
服务端
@Slf4j
public class NIOServer {
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 8080), 50);
Selector selector = Selector.open();
SelectionKey serverSocketKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int count = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
handleAccept(selectionKey);
} else if (selectionKey.isReadable()) {
handleRead(selectionKey);
}
iterator.remove();
}
}
}
private static void handleAccept(SelectionKey selectionKey) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
if (Objects.nonNull(socketChannel)) {
log.info("receive connection from client. client:{}", socketChannel.getRemoteAddress());
socketChannel.configureBlocking(false);
Selector selector = selectionKey.selector();
socketChannel.register(selector, SelectionKey.OP_READ);
}
}
private static void handleRead(SelectionKey selectionKey) throws IOException {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer readBuffer = Objects.isNull(selectionKey.attachment()) ? ByteBuffer.allocate(4) : (ByteBuffer) selectionKey.attachment();
int length = socketChannel.read(readBuffer);
if (length > 0) {
String readMessage = getSplitMessage(readBuffer);
log.info("receive message from client. client:{} message:{}", socketChannel.getRemoteAddress(), readMessage);
if (readBuffer.position() == readBuffer.capacity()) {
ByteBuffer newReadBuffer = ByteBuffer.allocate(readBuffer.capacity() * 2);
readBuffer.flip();
newReadBuffer.put(readBuffer);
readBuffer = newReadBuffer;
}
selectionKey.attach(readBuffer);
} else if (length == -1) {
socketChannel.close();
return;
}
}
private static String getSplitMessage(ByteBuffer readBuffer) throws UnsupportedEncodingException {
readBuffer.flip();
StringBuilder receivedMessage = new StringBuilder();
while (readBuffer.position() < readBuffer.limit() && readBuffer.get(readBuffer.position()) <= (readBuffer.limit() - readBuffer.position())) {
int length = readBuffer.get();// 有效数据长度
ByteBuffer byteBuffer = ByteBuffer.allocate(length);
for (int i = 0; i < length; i++) {
byteBuffer.put(readBuffer.get());
}
receivedMessage.append(new String(byteBuffer.array(), 0, length, "UTF-8")).append("\n");
}
readBuffer.compact();
return receivedMessage.toString();
}
}
客户端
@Slf4j
public class NIOClient {
@SneakyThrows
public static void main(String[] args) {
SocketChannel socketChannel = SocketChannel.open();
try {
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
ByteBuffer byteBuffer1, byteBuffer2;
socketChannel.write(byteBuffer1 = generateTLVBuffer("你"));
socketChannel.write(byteBuffer2 = generateTLVBuffer("好"));
log.info("client send finished");
} catch (Exception e) {
e.printStackTrace();
} finally {
socketChannel.close();
}
}
// 开头用1个字节来表示有效数据的长度(假设每个有效数据包长度不大于127),因此整个数据包长度为(1+有效数据的长度)
private static ByteBuffer generateTLVBuffer(String msg) {
byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
ByteBuffer byteBuffer = ByteBuffer.allocate(1 + bytes.length);
byteBuffer.put((byte) bytes.length);
byteBuffer.put(bytes);
byteBuffer.flip();
return byteBuffer;
}
}
以上代码只是用NIO来简单演示一下,解决粘包/拆包的方法,在Netty中提供了丰富的解码器(如StringDecoder、LineBasedFrameDecoder、DelimiterBasedFrameDecoder、FixedLengthFrameDecoder等)来解决粘包/拆包问题,开发者在开发过程中无需再考虑粘包/拆包的情况。
转载请注明出处——胡玉洋 《Java网络编程——粘包拆包出现的原因及解决方式》