• Netty(三)NIO-进阶


    Netty进阶

    1. 粘包与半包

    1.1 粘包现象

    //client端分10次每次发送16字节数据
    public void channelActive(ChannelHandlerContext ctx) {
    	for (int i = 0; i < 10; i++) {
    		ByteBuf buf = ctx.alloc().buffer(16);
    		buf.writeBytes(new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
    		ctx.writeAndFlush(buf);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    服务端输出,可以看到一次就收到了160字节数据,而非10次接收。
    粘包现象

    1.2 半包现象

    //client端一次发送160字节数据
    ByteBuf buffer = ctx.alloc().buffer();
    for (int i = 0; i < 10; i++) {
        buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
    }
    ctx.writeAndFlush(buffer);
    //server端修改接收缓冲区
    serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);  //影响底层接收缓冲区(滑动窗口)大小,仅决定netty读取最小单位,实际读取为其整数倍
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在服务端输出中可看到数据被分为两节,一节20字节,一节140字节
    半包现象

    1.3 现象分析

    粘包:发送abc def,接收abcdef。原因:

    1. 应用层:接收方ByteBuf设置太大(Netty默认1024)
    2. 滑动窗口:假设发送方256bytes表示一个完整报文,但由于接收方处理不及时且窗口大小够大,这256字节数据会缓冲在接收方的滑动窗口中,当滑动窗口缓了多个报文就会粘包
    3. Nagle算法:会造成粘包
      半包:发送abcdef,接收abc def。原因:
    4. 应用层:接收方ByteBuf小于实际发送数据大小
    5. 滑动窗口:假设接收方的窗口只剩128字节,发送方发送256字节,只能先发送128自己二,等待ack后才能发送剩余部分
    6. MSS(链路层MTU去掉tcp报头和ip头部分)限制:当发送数据超过MSS限制后,会将数据切分发送

    本质都是因为TCP是流式协议,消息无边界
    Nagle算法:为了提高网络利用率,发送足够多的数据,如果发送数据少的话,则进行延时发送:SO_SNDBUF达到MSS或含有FIN;TCP_NODELAY=TRUE,收到ACK,超时时发送。除了以上几种情况则延时发送。
    MSS限制:不同设备的MTU不同,以太网MTU是1500,FDDI的MTU是4352,本地回环地址的MTU是65535本地不走网卡,
    MSS :是最大段长度,它是MTU刨去 tcp和ip 头后剩余能够作为数据传输的字节数,ipv4tcp头占用 20,ip头占用 20,因此以太网 MSS 的值为1500- 40=1460,TCP在传递大量数据时,会按照 MSS 大小将数据进行分割发送,MSS的值在三次握手时通知对方自己 MSS 的值,然后在两者之间选择一个小值作为MSS。

    1.4 滑动窗口

    TCP以段(Segment)为单位发送一次数据就需要却仍应答ack处理,但是往返时间长性能差,因此引了了窗口概念,窗口大小决定了无需等待应答而可以继续发送数据最大值。
    滑动窗口示意
    滑动窗口起到一个缓冲区的作用,也能进行流量控制。窗口内的数据才允许发送,当应答未到达前,窗口必须停止滑动,接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收。

    1.5 解决方案(短连接,定长数据,分隔符,预设长度)

    1. 短链接,发一个包建立一次连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	log.debug("sending...");
    	ByteBuf buffer = ctx.alloc().buffer();
    	buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
    	ctx.writeAndFlush(buffer);
    	// 发完即关
    	ctx.close();
    }
    //调整netty的接收缓冲区
    serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16,16,16));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 每一条消息采用固定长度,缺点浪费空间
    //让所有数据包长度固定,服务端加入
    ch.pipeline().addLast(new FixedLengthFrameDecoder(8));
    //客户端什么时候 flush 都可以
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	log.debug("sending...");
    	// 发送内容随机的数据包
    	Random r = new Random();
    	char c = 'a';
    	ByteBuf buffer = ctx.alloc().buffer();
    	for (int i = 0; i < 10; i++) {
    		byte[] bytes = new byte[8];
    		for (int j = 0; j < r.nextInt(8); j++) {
    			bytes[j] = (byte) c;
    		}
    		c++;
    		buffer.writeBytes(bytes);
    	}
    	ctx.writeAndFlush(buffer);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    缺点:长度定的太大,浪费,长度定的太小,对某些数据包又显得不够

    1. 每一条消息采用分隔符,例如 \n,缺点需要转义
    //服务端加入,默认以 \n 或 \r\n 作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常
    ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
    //客户端发送+\n
        public static StringBuilder makeString(char c, int len) {
            StringBuilder sb = new StringBuilder(len + 2);
            for (int i = 0; i < len; i++) {
                sb.append(c);
            }
            sb.append("\n");
            return sb;
        }
        ByteBuf buf = ctx.alloc().buffer();
     	char c = '0';
    	Random r = new Random();
    	for (int i = 0; i < 10; i++) {
    		StringBuilder sb = makeString(c, r.nextInt(256) + 1);
    		c++;
    		buf.writeBytes(sb.toString().getBytes());
    	}
    	ctx.writeAndFlush(buf);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    缺点,处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误

    1. 每一条消息分为 head 和 body,head 中包含 body 的长度
    //在发送消息前,先约定用定长字节表示接下来数据的长度
    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1));//最大长度,长度偏移,长度占用字节,长度调整,剥离字节数
    
    • 1
    • 2

    2. 协议设计与解析

    2.1 协议

    TCP/IP 中消息传输基于流的方式,没有边界。协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。

    2.2 redis 协议举例

    //模拟客户端给本机redis发送set name aric命令
    public static void main(String[] args) {
            final byte[] LINE = {13,10};
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(worker);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) {
                                ByteBuf buf = ctx.alloc().buffer();
                                buf.writeBytes("*3".getBytes());
                                buf.writeBytes(LINE);
                                buf.writeBytes("$3".getBytes());
                                buf.writeBytes(LINE);
                                buf.writeBytes("set".getBytes());
                                buf.writeBytes(LINE);
                                buf.writeBytes("$4".getBytes());
                                buf.writeBytes(LINE);
                                buf.writeBytes("name".getBytes());
                                buf.writeBytes(LINE);
                                buf.writeBytes("$4".getBytes());
                                buf.writeBytes(LINE);
                                buf.writeBytes("aric".getBytes());
                                buf.writeBytes(LINE);
                                ctx.writeAndFlush(buf);
                            }
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                buf.toString(Charset.defaultCharset());
                            }
                        });
                    }
                });
                ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    2.3 http 协议举例

    ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
    	@Override
    	protected void channelRead0(ChannelHandlerContext ctx, HttpRequest httpRequest) throws Exception {
    	log.debug(httpRequest.getUri());
    	//返回响应
    	DefaultFullHttpResponse response = new DefaultFullHttpResponse(httpRequest.protocolVersion(),HttpResponseStatus.OK);
    	byte[] bytes = "

    hello,world!

    "
    .getBytes(); response.headers().setInt(CONTENT_LENGTH, bytes.length) response.content().writeBytes(bytes); //写回响应 ctx.writeAndFlush(response); } });
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.4 自定义协议要素

    • 魔数:先判断是否无效数据包
    • 版本号:可支持协议的升级
    • 序列化算法:消息正文采用哪种序列化方式如:json,protobuf,hessian,jdk
    • 指令类型:登录,注册,单聊,群聊。。。跟业务相关
    • 请求序号:为了双工通信,提供异步能力
    • 正文长度
    • 消息正文
    public class MessageCodec extends ByteToMessageCodec<Message> {
        //编码:出站前把msg编码成ByteBuf
        @Override
        protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) throws Exception {
            //1. 魔数4字节
            out.writeBytes(new byte[]{1, 2, 3, 4});
            //2. 版本号1字节
            out.writeByte(1);
            //3. 字节序列化算法 jdk 0, json 1
            out.writeByte(0);
            //4. 指令类型1字节
            out.writeByte(message.getMessageType());
            //5. 请求序号4字节
            out.writeInt(message.getSequenceId());
            out.writeByte(0xff);  //用于对齐字节
            //6. 获取内容字节数组
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(message);
            byte[] bytes = bos.toByteArray();
            //7. 长度
            out.writeInt(bytes.length);
            //8。 写入内容
            out.writeBytes(bytes);
        }
    
        //解码:把ByteBuf转化为msg
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            int magicNum = in.readInt();
            byte version = in.readByte();
            byte serializerType = in.readByte();
            byte messageType = in.readByte();
            int sequenceId = in.readInt();
            in.readByte();
            byte length = in.readByte();
            byte[] bytes = new byte[length];
            in.readBytes(bytes, 0, length);
            if (serializerType == 0) {
                ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
                Message message = (Message) ois.readObject();
                out.add(message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    自定义协议解析

    什么时候可以加@Sharable
    • 当handler不保存状态时,就可以安全地在多线程下被共享
    • 对于编解码器类,不能继承ByteToMessageCodec或CombinedChannelDuplexHandler父类,他们的构造方法对@Sharable有限制。
    • 如果能确保编解码器不会保存状态,可以继承MessageToMessageCodec父类。
    @Slf4j
    @ChannelHandler.Sharable
    /**
     * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
     */
    public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
        @Override
        protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {}
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    3. 聊天室案例

    3.1 聊天室业务介绍

    netty实现聊天室,可以登录,单聊,创建群聊,群聊,加群,退群,退出功能。

    3.2 聊天室业务-登录

    见ch.pipeline().addLast(LOGIN_HANDLER);方法。

    3.3 聊天室业务-单聊

    服务器端将 handler 独立出来。ch.pipeline().addLast(CHAT_HANDLER);

    3.4 聊天室业务-群聊

    ch.pipeline().addLast(GROUP_CREATE_HANDLER); //创建群聊
    ch.pipeline().addLast(GROUP_JOIN_HANDLER); //加入群聊
    ch.pipeline().addLast(GROUP_MEMBER_HANDLER); //查看群成员
    ch.pipeline().addLast(GROUP_QUIT_HANDLER); //退出群聊
    ch.pipeline().addLast(GROUP_CHAT_HANDLER); /群聊消息

    3.5 聊天室业务-退出

    ch.pipeline().addLast(QUIT_HANDLER);

    3.6 聊天室业务-空闲检测

    连接假死

    原因:

    • 网络故障,底层TCP断开连接,应用程序没有感知到,仍占用资源
    • 公网网络不稳,丢包。客户端和服务端都都不到数据
    • 应用程序线程阻塞,无法进行数据读写
      问题:
    • 假死连接占用资源不能自动释放
    • 向假死连接发送数据,得到反馈为发送超时
      netty服务器端解决
      空闲状态检测器:每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死。
    //空闲状态检测器,5s没收客户端消息,会触发IdleState#READER_IDLE事件
    ch.pipeline().addLast(new IdleStateHandler(5,0,0));
    //ChannelDuplexHandler可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {
    	//用来触发特殊事件
    	@Override
    	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    	IdleStateEvent event = (IdleStateEvent) evt;
    	if (event.state() == IdleState.READER_IDLE) {  //触发5s读写空闲事件,断开
    		ctx.channel().close();
    	}
    	super.userEventTriggered(ctx, evt);
    	}
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    客户端定时心跳
    客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器

    4. 代码

    https://gitee.com/xuyu294636185/netty-demo.git

  • 相关阅读:
    并查集介绍 & 代码实现 & 优化思路详解
    Qt-OpenCV学习笔记--边缘检测--Canny()
    SAP AIF BTI750
    引入移码的目的
    Dockerfile自定义镜像、CentOS安装DockerCompose及Docker镜像仓库
    k8s简介以及各个组件
    【Golang】简记操作:Centos安装、卸载、升级Golang运行环境
    性能优化:编译器优化选项 -O2/-O3 究竟有多强大?
    SpringBoot接口 - 如何优雅的写Controller并统一异常处理?
    leetcode day12 二叉树的最大深度
  • 原文地址:https://blog.csdn.net/xy294636185/article/details/132888084