TCP是以流的方式来处理数据,
拆包:一个完整的数据包可能会被TCP拆分成多个包进行发送。
粘包:TCP 可能把多个小的包粘成一个大的数据包。
io.netty
netty-all
4.1.29.Final
/**
* 服务端收到客户端的消息后,会进行响应。
*/
public final class EchoServer {
/**
* 端口
*/
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// 配置 EventLoopGroup
// 主从 Reactor 多线程模式,bossGroup是 主 Reactor,workerGroup是 从Reactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//初始化服务器的引导类 ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
//指定 EventLoopGroup
serverBootstrap.group(bossGroup, workerGroup)
//指定 channel
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
//指定 ChannelHandler,用于处理 Channel
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//ChannelPipeline,基于责任链模式,可以添加多个 ChannelHandler
ChannelPipeline channelPipeline = ch.pipeline();
//ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
//固定长度的拆包器 FixedLengthFrameDecoder
// channelPipeline.addLast(new FixedLengthFrameDecoder(19));
channelPipeline.addLast(new EchoServerHandler());
}
});
// 开启服务器,将服务器绑定到它要监听连接请求的端口上
ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync();
// 等待直到服务器socket关闭
channelFuture.channel().closeFuture().sync();
} finally {
//关闭所有 eventLoop,终止线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
/**
* 服务端的 ChannelHandler.
*
* ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
* 继承 ChannelInboundHandlerAdapter,用来定义响应入站事件的方法。
*
*/
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* channelRead() :读取 channel 传入的消息
*
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf= (ByteBuf) msg;
log.info("客户端发来的消息:"+ buf.toString(CharsetUtil.UTF_8) +"\n");
}
/**
* channelReadComplete() :表示当前 ChannelHandler 读取完毕.
* 执行后会自动跳转到 ChannelPipeline 中的下一个 ChannelHandler.
*
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
//向客户端返回数据,writeAndFlush() 也可以拆分成 write(msg) 和 flush()
ctx.writeAndFlush(Unpooled.copiedBuffer("见到你,我也很高兴^_^",CharsetUtil.UTF_8));
}
/**
* exceptionCaught(): 在读取操作期间,有异常抛出时会调用。
*
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 发生异常时关闭连接
cause.printStackTrace();
ctx.close();
}
}
/**
* 客户端。发送消息给服务端,并接收服务端的响应。
*
*/
public final class EchoClient {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer() {
@Override
public void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline channelPipeline = socketChannel.pipeline();
//ChannelHandler,用于处理 channel,实现对接收的数据的处理,实现业务逻辑。
//固定长度的拆包器 FixedLengthFrameDecoder
// channelPipeline.addLast(new FixedLengthFrameDecoder(19));
channelPipeline.addLast(new ClientNoDecoderHandler());
}
});
// 开启客户端,连接服务端的端口
ChannelFuture channelFuture = bootstrap.connect(HOST, PORT).sync();
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
public class ClientNoDecoderHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
for (int i = 0; i < 1000; i++) {
ByteBuf buffer = getByteBuf(ctx);
ctx.channel().writeAndFlush(buffer);
}
}
private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
byte[] bytes = "粘包拆包测试.".getBytes(StandardCharsets.UTF_8);
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(bytes);
return buffer;
}
}
客户端发送了多个字节,服务端在接收时可能会发生粘包拆包。
比如以下客户端发送了多个 “粘包拆包测试.” ,结果有好几条数据粘在了一起,有些又被拆开了。
21:32:18.793 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.
21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.�
21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:�包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆�
21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:�测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.粘包拆包测试.
21:32:18.799 [nioEventLoopGroup-3-2] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.粘包拆包测试.粘包拆包测试.
Netty 可以使用多个拆包器处理粘包拆包。
固定长度拆包器,Netty 在消息长度固定的场景下,对固定长度的流数据进行解码。
行拆包器,发送端发送数据包的时候,每个数据包之间以换行符(即\n或者\r\n)作为分隔。
分隔符拆包器。DelimiterBasedFrameDecoder 通过用户指定的分隔符对数据进行粘包和拆包处理。
基于长度域拆包器。最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。
使用 FixedLengthFrameDecoder 进行拆包。
将示例的 EchoClient 、 EchoServer 这两个类注释掉的以下代码放开。
channelPipeline.addLast(new FixedLengthFrameDecoder(19));
可以看到,数据流已经正常展示了,没有粘包拆包。
22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.
22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.
22:50:03.127 [nioEventLoopGroup-3-1] INFO com.example.demo.netty.echo.EchoServerHandler - 客户端发来的消息:粘包拆包测试.
https://zhuanlan.zhihu.com/p/415450910