• Netty高级应用及聊天室实战


    Netty 高级应用

    1. 编解码器

    概念:在网络应用中,需要实现某种编解码器。将原始字节数据与自定义消息数据进行相互转换。网络中都是以字节码的形式传输的。

    对Netty而言,编解码器由两部分组成:编码器、解码器

    • 编码器:将消息对象转为字节或其他序列形式在网络上传输
    • 解码器:负责将字节或其他序列形式转为指定的消息对象

    Netty的编解码器实现了ChannelHandlerAdapter,也是一种特殊的ChannelHandler,所以依赖与ChannelPipeline,可以将多个编解码器链接在一起,以实现复杂的转换逻辑。

    1. 解码器
    • ByteToMessageDecoder:用于将字节转为消息,需要检查缓冲区是否有足够的字节
    • ReplayingDecoder:继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecode
    • MessageToMessageDecoder:用于从一种消息解码为另一种消息(如POJO到POJO)

    解码器示例:

    public class DemoDecoder extends MessageToMessageDecoder<ByteBuf> {
        
        @Override
        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            String msg = byteBuf.toString(CharsetUtil.UTF_8);
            list.add(msg);
        }
    }
    

    通道里加入解码器:

     protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new DemoDecoder());
                            socketChannel.pipeline().addLast(new DemoNettyServerHandle());
                        }
    
    1. 编码器
    • MessageToByteEncoder:将消息转为字节
    • MessageToMessageEncoder:用于从一种消息编码为另外一种消息(例如POJO到POJO)

    编码器示例:

    public class DemoEncoder extends MessageToMessageEncoder<String> {
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, String s, List<Object> list) throws Exception {
            list.add(Unpooled.copiedBuffer(s,CharsetUtil.UTF_8));
        }
    }
    
    1. 编码解码器Codec

    同时具备编码与解码功能

    • ByteToMessageCodec
    • MessageToMessageCodec

    2. 基于Netty的HTTP服务器开发

    效果如图:

    代码如下:

    public class NettyHttpServer {
    
        private int port;
    
        public NettyHttpServer(int port) {
            this.port = port;
        }
    
        public static void main(String[] args) {
            new NettyHttpServer(8090).run();
        }
    
        public void run(){
            EventLoopGroup bossGroup=null;
            EventLoopGroup workerGroup=null;
            try{
                bossGroup=new NioEventLoopGroup(1);
                workerGroup=new NioEventLoopGroup();
                ServerBootstrap serverBootstrap=new ServerBootstrap();
                serverBootstrap.group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG,128)
                        .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                //添加编解码器
                                socketChannel.pipeline().addLast(new HttpServerCodec());
                                socketChannel.pipeline().addLast(new NettyHttpServerHandler());
    
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    
    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
            if(httpObject instanceof HttpRequest){
                DefaultHttpRequest request=(DefaultHttpRequest)httpObject;
                if(request.uri().equals("/favicon.ico")){
                    //图标不响应
                    return;
                }
                System.out.println("接收到请求:"+request.uri());
                ByteBuf byteBuf = Unpooled.copiedBuffer("你好,我是服务端", CharsetUtil.UTF_8);
                DefaultFullHttpResponse response=new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,byteBuf);
                //设置响应头
                response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());
                channelHandlerContext.writeAndFlush(response);
            }
        }
    }
    

    3. 粘包和拆包

    简介:粘包和拆包是TCP网络编程中不可避免的,无论客户端还是服务端,当我们读取或发送消息的时候都要考虑TCP底层的粘包/拆包机制。

    粘包产生的原因:

    • 应用程序写入的数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上
    • 接收方不及时读取套接字缓冲区数据
    • TCP默认使用Nagle算法,将小数据包合并

    拆包产生的原因:

    • 数据太大超过剩余缓冲区的大小
    • 数据太大超过MSS最大报文长度

    粘包和拆包的解决方案

    1. 消息长度固定,累计读取到定长的报文就认为是一个完整的信息
    2. 将换行符作为消息结束符
    3. 将特殊的分隔符作为消息的结束标志
    4. 通过在消息头中定义长度字段来标识消息总长度

    Netty中粘包和拆包的解决方案

    Netty提供了4种解码器来解决:

    1. 固定长度拆包器FixedLengthFrameDecoder
    2. 行拆包器LineBasedFrameDecoder,以换行符作为分隔符
    3. 分隔符拆包器DelimiterBasedFrameDecoder,通过自定义的分隔符进行拆分
    4. 基于数据包长度的拆包器LengthFieldBasedFrameDecoder,将应用层数据包的长度最为拆分一句。要求应用层协议中包含数据包的长度。

    DelimiterBasedFrameDecoder示例:

    ByteBuf byteBuf =
    Unpooled.copiedBuffer("$".getBytes(StandardCharsets.UTF_8));
    ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048, byteBuf));
    

    LengthFieldBasedFrameDecoder构造器参数讲解:
    public LengthFieldBasedFrameDecoder(
    ByteOrder byteOrder,
    int lengthFieldOffset,
    int lengthFieldLength,
    int lengthAdjustment,
    int initialBytesToStrip,
    boolean failFast)

    • byteOrder是指明Length字段是大端序还是小端序,因为Netty要读取Length字段的值,所以大端小端要设置好,默认Netty是大端序ByteOrder.BIG_ENDIAN。

    • maxFrameLength是指最大包长度,如果Netty最终生成的数据包超过这个长度,Netty就会报错。

    • lengthFieldOffset是指明Length的偏移位

    • lengthFieldLength是Length字段长度

    • lengthAdjustment 这个参数很多时候设为负数,这是最让小伙伴们迷惑的。下面我用一整段话来解释这个参数

    当Netty利用lengthFieldOffset(偏移位)和lengthFieldLength(Length字段长度)成功读出Length字段的值后,Netty认为这个值是指从Length字段之后,到包结束一共还有多少字节,如果这个值是13,那么Netty就会再等待13个Byte的数据到达后,拼接成一个完整的包。但是更多时候,Length字段的长度,是指整个包的长度,如果是这种情况,当Netty读出Length字段的时候,它已经读取了包的4个Byte的数据,所以,后续未到达的数据只有9个Byte,即13 - 4 = 9,这个时候,就要用lengthAdjustment来告诉Netty,后续的数据并没有13个Byte,要减掉4个Byte,所以lengthAdjustment要设为 -4!!!

    • initialBytesToStrip,跳过的个数。比如这里initialBytesToStrip设置为4,那么Netty就会跳过前4位解析后面的内容

    • failFast 参数一般设置为true,当这个参数为true时,netty一旦读到Length字段,并判断Length超过maxFrameLength,就立即抛出异常。

    示例:

     @Override
        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            for (int i=0;i<100;i++){
                byte[] bytes = "你好,我是客户端".getBytes(CharsetUtil.UTF_8);
                ByteBuf byteBuf = Unpooled.buffer();
                byteBuf.writeInt(bytes.length);
                byteBuf.writeBytes(bytes);
                channelHandlerContext.writeAndFlush(byteBuf);
            }
        }
    

    第2个参数和第三个参数表示:0-4个字节是内容长度字段,第五个参数的4代表跳过前4个字节。

     socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(60535,0,4,0,4));
    

    最后输出的内容:

    4. 基于Netty和WebSocket的聊天室案例

    1. WebSocket简介

    WebSocket是一种在单个TCP连接上进行全双工通信的协议。相比HTTP协议,WebSocket具备如下特点:

    1. 支持双向通信,实时性更强
    2. 更好的二进制支持
    3. 较少的开销:协议控制的数据包头部较小

    应用场景:

    • 社交订阅
    • 协同编辑
    • 股票基金报价
    • 体育实况更新
    • 多媒体聊天
    • 在线教育

    2. 服务端开发

    1. 引入依赖

    基于SpringBoot环境

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <!--添加thymeleaf依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-thymeleaf</artifactId>
            </dependency>
    
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.72.Final</version>
            </dependency>
    
    1. 核心后端代码
    @Component
    public class NettyWebSocketServer implements Runnable {
    
    
        @Autowired
        private NettyConfig nettyConfig;
    
        @Autowired
        private WebSocketChannelInit webSocketChannelInit;
    
        private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        private EventLoopGroup wokerGroup = new NioEventLoopGroup();
    
    
        @PreDestroy
        public void close(){
            bossGroup.shutdownGracefully();
            wokerGroup.shutdownGracefully();
        }
    
        @Override
        public void run() {
            try{
                ServerBootstrap serverBootstrap=new ServerBootstrap();
                serverBootstrap.group(bossGroup,wokerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(webSocketChannelInit);
                ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync();
                System.out.println("Netty服务端启动成功");
                channelFuture.channel().closeFuture().sync();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                bossGroup.shutdownGracefully();
                wokerGroup.shutdownGracefully();
            }
    
        }
    }
    
    @Component
    public class WebSocketChannelInit extends ChannelInitializer {
    
        @Autowired
        private NettyConfig nettyConfig;
    
        @Autowired
        private WebSocketHandler webSocketHandler;
    
        @Override
        protected void initChannel(Channel channel) throws Exception {
            ChannelPipeline pipeline = channel.pipeline();
            //对http协议的支持
            pipeline.addLast(new HttpServerCodec());
            //对大数据流的支持
            pipeline.addLast(new ChunkedWriteHandler());
            //post请求分为3部分。request line、request header、body
            //HttpObjectAggregator将多个信息转化为单一的request或者response对象
            pipeline.addLast(new HttpObjectAggregator(8000));
            //将http协议升级为ws协议,websocket的支持
            pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
    
            pipeline.addLast(webSocketHandler);
        }
    }
    
    
    @Component
    @ChannelHandler.Sharable  //设置通道共享
    public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    
        private List<Channel> channels=new ArrayList<>();
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            channels.add(ctx.channel());
            System.out.println("有新的连接了...");
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            channels.remove(ctx.channel());
            System.out.println("连接下线了");
        }
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {
    
            String text = textWebSocketFrame.text();
            Channel currentChannel = channelHandlerContext.channel();
            for (Channel channel:channels){
                //自己不给自己发消息
                if(!channel.equals(currentChannel)){
                    channel.writeAndFlush(new TextWebSocketFrame(text));
                }
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            Channel channel = ctx.channel();
            channels.remove(channel);
        }
    }
    

    3. 前端js代码

    $(function () {
        //这里需要注意的是,prompt有两个参数,前面是提示的话,后面是当对话框出来后,在对话框里的默认值
        var username = "";
        while (true) {
            //弹出一个输入框,输入一段文字,可以提交
            username = prompt("请输入您的名字", ""); //将输入的内容赋给变量 name ,
            if (username.trim() === "")//如果返回的有内容
            {
                alert("名称不能输入空")
            } else {
                $("#username").text(username);
                break;
            }
        }
    
        var ws = new WebSocket("ws://localhost:8081/chatService");
        ws.onopen = function () {
            console.log("连接成功.")
        };
        ws.onmessage = function (evt) {
            showMessage(evt.data);
        };
        ws.onclose = function (){
            console.log("连接关闭")
        };
    
        ws.onerror = function (){
            console.log("连接异常")
        };
    
        function showMessage(message) {
            // 张三:你好
            var str = message.split(":");
            $("#msg_list").append('<li class="active"}>\n' +
                '                                  <div class="main">\n' +
                '                                    <img class="avatar" width="30" height="30" src="/img/user.png">\n' +
                '                                    <div>\n' +
                '                                        <div class="user_name">'+str[0]+'</div>\n' +
                '                                        <div class="text">'+str[1]+'</div>\n' +
                '                                    </div>                       \n' +
                '                                   </div>\n' +
                '                              </li>');
            // 置底
            setBottom();
        }
    
        $('#my_test').bind({
            focus: function (event) {
                event.stopPropagation();
                $('#my_test').val('');
                $('.arrow_box').hide()
            },
            keydown: function (event) {
                event.stopPropagation();
                if (event.keyCode === 13) {
                    if ($('#my_test').val().trim() === '') {
                        this.blur();
                        $('.arrow_box').show();
                        setTimeout(this.focus(),1000);
                    } else {
                        $('.arrow_box').hide();
                        //发送消息
                        sendMsg();
                        this.blur();
                        setTimeout(this.focus())
                    }
                }
            }
        });
        $('#send').on('click', function (event) {
            event.stopPropagation();
            if ($('#my_test').val().trim() === '') {
                $('.arrow_box').show()
            } else {
                sendMsg();
            }
        });
    
        function sendMsg() {
            var message = $("#my_test").val();
            $("#msg_list").append('<li class="active"}>\n' +
                '                                  <div class="main self">\n' +
                '                                      <div class="text">'+message+'</div>\n' +
                '                                  </div>\n' +
                '                              </li>');
            $("#my_test").val('');
    
            //发送消息
            message = username + ":" + message;
            ws.send(message);
            // 置底
            setBottom();
        }
    
        // 置底
        function setBottom() {
            // 发送消息后滚动到底部
            var container = $('.m-message');
            var scroll = $('#msg_list');
            container.animate({
                scrollTop: scroll[0].scrollHeight - container[0].clientHeight + container.scrollTop() + 100
            });
        }
    });
    
  • 相关阅读:
    【web渗透思路】敏感信息泄露(网站+用户+服务器)
    【无标题】清空吗
    天才在左,疯子在右读书笔记 -- 高铭著
    基于python的智慧城市社区服务平台及养老服务子系统的设计与实现
    20220701 Barbalat引理证明
    【华为OD机试真题 python】分班问题 【2022 Q4 | 100分】
    实践案例丨CenterNet-Hourglass论文复现
    翻译: Github Copilot 可以创作艺术吗?
    KMP算法详解以及Java代码实现
    【Python 千题 —— 基础篇】进制转换:十进制转十六进制
  • 原文地址:https://www.cnblogs.com/javammc/p/15907602.html