• Netty——搭建一个聊天室(笔记)


    一、黏包与半包处理

    1.1 黏包与半包演示

    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class HelloWorldServer {
        void start(){
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .channel(NioServerSocketChannel.class)
                        /**
                         * 滑动缓冲区-接收大小设置,方便展示半包现象
                         * 不设置会自适应,TCP协议的连接双方会自动协调
                         * */
                        .option(ChannelOption.SO_RCVBUF,10)
                        .group(boss,worker)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("server error {}",e);
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
        public static void main(String[] args){
            HelloWorldServer helloWorldServer = new HelloWorldServer();
            helloWorldServer.start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class HelloWorldClient {
        public static void main(String[] args){
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                Bootstrap boostrap = new Bootstrap();
                boostrap
                        .channel(NioSocketChannel.class)
                        .group(worker)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        for(int i = 0;i < 10;i++) {
                                            ByteBuf byteBuf = ctx.alloc().buffer(16);
                                            byteBuf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
                                            ctx.writeAndFlush(byteBuf);
                                        }
                                    }
                                });
                            }
                        });
                ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                log.error("client error {}",e);
            }finally {
                worker.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    1.2 滑动窗口协议

    参考滑动窗口协议详解

    1.3 现象原因

    黏包

    • 现象:发送abc、def,收到abcdef
    • 原因:
      • 应用层:接收方ByteBuf设置太大(Net它默认1024)
      • 滑动窗口:假设发送方256bytes表示一个完整报文,但由于接收方处理不及时且窗口足够大,这256bytes字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲多了个报文就会黏包
      • Nagle算法:会造成黏包TCP系列29—窗口管理&流控—3、Nagle算法

    半包

    • 现象:发送abcdef,收到abc、def
    • 应用层:接收方ByteBuf小于实际发送数量
    • 滑动窗口:假设接收方的窗口128bytes,发送方报文为256bytes,这时就放不下了,只能先发送前128bytes,等待ack后,才能发送剩余部分,这时就会造成半包
    • MSS限制(Maximum Segment Size,最大报文限制。网卡对于文件的限制):发送数据超过MSS限制,会将数据切分发送,就会造成半包

    1.4 解决方法-短连接

    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class HelloWorldServer {
        void start(){
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .channel(NioServerSocketChannel.class)
                        //.option(ChannelOption.SO_RCVBUF,10)
                        .group(boss,worker)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("server error {}",e);
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
        public static void main(String[] args){
            HelloWorldServer helloWorldServer = new HelloWorldServer();
            helloWorldServer.start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class HelloWorldClient {
        private static void send(){
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                Bootstrap boostrap = new Bootstrap();
                boostrap
                        .channel(NioSocketChannel.class)
                        .group(worker)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        ByteBuf byteBuf = ctx.alloc().buffer(16);
                                        byteBuf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15});
                                        ctx.writeAndFlush(byteBuf);
                                        ctx.channel().close();
                                    }
                                });
                            }
                        });
                ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                log.error("client error {}",e);
            }finally {
                worker.shutdownGracefully();
            }
        }
        public static void main(String[] args){
            for(int i = 0;i <10;i++){
                send();
            }
            log.debug("finish !");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    通过发一次就断开,保证数据可以分开。

    但无法解决半包问题,我们可以让滑动窗口的接收缓冲区变小,来测试。
    调整服务端缓冲区

    					/**
                         * 滑动缓冲区-接收大小设置
                         * 调整serverBootstrap整个滑动窗口大小,全局设置
                         * */
                        //.option(ChannelOption.SO_RCVBUF,10)
                        /**
                         * 滑动缓冲区-接收大小设置
                         * 调整child连接的滑动窗口大小
                         * 最小值(16的整数倍),初始值,最大值
                         * */
                        .childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    调整客户端

    byteBuf.writeBytes(new byte[]{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,17,18});
    
    • 1

    缺点:

    1. 效率低
    2. 无法避免半包问题

    1.5 解决方法-定长帧解码器

    客户端与服务器约定固定的消息长度,少于则等待,大于则截断。

    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import lombok.extern.slf4j.Slf4j;
    
    
    import java.nio.charset.Charset;
    import java.util.Arrays;
    import java.util.Random;
    
    @Slf4j
    public class HelloWorldClient {
        private static void send(){
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                Bootstrap boostrap = new Bootstrap();
                boostrap
                        .channel(NioSocketChannel.class)
                        .group(worker)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        ByteBuf buf = ctx.alloc().buffer();
                                        char c = '0';
                                        Random r = new Random();
                                        for(int i = 0;i < 10;i++){
                                            byte[] msg = fill10Bytes(c,r.nextInt(10)+1);
                                            c++;
                                            buf.writeBytes(msg);
                                        }
                                        ctx.writeAndFlush(buf);
                                    }
                                });
                            }
                        });
                ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                log.error("client error {}",e);
            }finally {
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String[] args){
            send();
        }
    
        /**
         * 长度填充,总长度为10
         * */
        public static byte[] fill10Bytes(char c, int len) {
            if(len > 10)
                len = 10;
            byte[] bytes = new byte[10];
            Arrays.fill(bytes, (byte) '_');
            for (int i = 0; i < len; i++) {
                bytes[i] = (byte) c;
            }
            log.info(new String(bytes, Charset.forName("utf-8")));
            return bytes;
        }
    }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.AdaptiveRecvByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class HelloWorldServer {
        void start(){
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
                        .group(boss,worker)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //设置定长解码器
                                ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
                                ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("server error {}",e);
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
        public static void main(String[] args){
            HelloWorldServer helloWorldServer = new HelloWorldServer();
            helloWorldServer.start();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    缺点:

    1. 消息的最大长度不能大于定长
    2. 占用字节数比较多,因为消息的长度定了,即使字节没哟那么长

    1.6 解决方法-基于行的帧解码器

    通过分割符号来对消息进行分割。
    行解码器就是以换行符号作为消息的分割。指定构造时,需要确定最大长度。当达到最大长度时,仍然不能找到分隔符号,(如果failFast为true)就会抛出异常。

    	void start(){
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .channel(NioServerSocketChannel.class)
                        .childOption(ChannelOption.RCVBUF_ALLOCATOR,new AdaptiveRecvByteBufAllocator(16,16,16))
                        .group(boss,worker)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //设置定长解码器
                                ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                                ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("server error {}",e);
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    	private static void send(){
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                Bootstrap boostrap = new Bootstrap();
                boostrap
                        .channel(NioSocketChannel.class)
                        .group(worker)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        ByteBuf buf = ctx.alloc().buffer();
                                        char c = '0';
                                        Random r = new Random();
                                        for(int i = 0;i < 10;i++){
                                            StringBuilder stringBuilder = makeString(c,r.nextInt(256) + 1);
                                            c++;
                                            buf.writeBytes(stringBuilder.toString().getBytes(StandardCharsets.UTF_8));
                                        }
                                        ctx.writeAndFlush(buf);
                                    }
                                });
                            }
                        });
                ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                log.error("client error {}",e);
            }finally {
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String[] args){
            send();
        }
    
    
        /**
         * 长度填充+换行符
         * */
        public static StringBuilder makeString(char c,int len){
            StringBuilder stringBuilder = new StringBuilder(len + 2);
            for(int i = 0;i < len;i++){
                stringBuilder.append(c);
            }
            stringBuilder.append("\n");
            return stringBuilder;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    1.7 解决方法-基于长度字段的帧解码器

    发送消息时,会把有关消息内容长度的信息一起发送,接收消息时,就可以处理黏包半包问题。

    包含如下参数。

    • maxFrameLength:最大帧长度
    • lengthFieldOffset:长度字节偏移量(表示长度的字节所在的位置)
    • lengthFieldLength:长度字段长度(表示长度的字节占几个字节)
    • lengthAdjustment:长度字段为基准,还有几个字节之后是内容(从表示长度的字节的位置之后,距离内容中间隔了几个字节)
    • initialBytesToStrip:从头剥离几个字节(最后从哪里开始截取报文,即,不要哪些信息)
    					.childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                //设置定长解码器
                                /**
                                 * @param maxFrameLength,lengthFieldOffset,lengthFieldLength,lengthAdjustment,initialBytesToStrip
                                 */
                                ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,0,4,0,0));
                                ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                            }
                        });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    private static void send(){
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                Bootstrap boostrap = new Bootstrap();
                boostrap
                        .channel(NioSocketChannel.class)
                        .group(worker)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        ByteBuf buf = ctx.alloc().buffer();
                                        char c = '0';
                                        Random r = new Random();
                                        sendMsg(buf,"hello,world");
                                        sendMsg(buf,"hi!");
                                        ctx.writeAndFlush(buf);
                                    }
                                });
                            }
                        });
                ChannelFuture channelFuture = boostrap.connect("localhost",8080).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (InterruptedException e) {
                log.error("client error {}",e);
            }finally {
                worker.shutdownGracefully();
            }
        }
    
        public static void main(String[] args){
            send();
        }
    
    
        /**
         * 发出内容写入
         * */
        private static void sendMsg(ByteBuf byteBuf,String content){
                byte[] bytes = content.getBytes();
                int length = bytes.length;
                byteBuf.writeInt(length);
                byteBuf.writeBytes(bytes);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    二、协议的设计与解析

    2.1 向redis发出set key value

    set key value
    依据协议,需要替换为,单词之间换行
     *3 			$3			set		 $4 name $8 zhangsan
    多少个词 第一个词的字符串 	词 ...
    
    • 1
    • 2
    • 3
    • 4
    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    import java.nio.charset.StandardCharsets;
    
    
    public class helloRedis {
        public static void main(String[] args){
            final  byte[] LINE = {13,10}; //回车换行
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try{
                Bootstrap boostrap = new Bootstrap();
                boostrap
                        .channel(NioSocketChannel.class)
                        .group(worker)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        ByteBuf buf = ctx.alloc().buffer();
                                        buf.writeBytes("*3".getBytes(StandardCharsets.UTF_8));
                                        buf.writeBytes(LINE);
                                        buf.writeBytes("$3".getBytes(StandardCharsets.UTF_8));
                                        buf.writeBytes(LINE);
                                        buf.writeBytes("set".getBytes(StandardCharsets.UTF_8));
                                        buf.writeBytes(LINE);
                                        buf.writeBytes("$4".getBytes(StandardCharsets.UTF_8));
                                        buf.writeBytes(LINE);
                                        buf.writeBytes("name".getBytes(StandardCharsets.UTF_8));
                                        buf.writeBytes(LINE);
                                        buf.writeBytes("$8".getBytes(StandardCharsets.UTF_8));
                                        buf.writeBytes(LINE);
                                        buf.writeBytes("zhangsan".getBytes(StandardCharsets.UTF_8));
                                        buf.writeBytes(LINE);
                                        ctx.writeAndFlush(buf);
                                    }
    
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        ByteBuf byteBuf = (ByteBuf) msg;
                                        System.out.println(byteBuf.toString(StandardCharsets.UTF_8));
                                    }
                                });
                            }
                        });
                ChannelFuture channelFuture = boostrap.connect("IP",端口).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                worker.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    在这里插入图片描述

    在这里插入图片描述

    2.2 http协议

    Netty已经帮助我们封装好了,我们只需要关注业务即可。

    HttpServerCodec:解码器,组合了HttpRequestDecoder与HttpResponseEncoder
    包括了请求解码与回复编码

    将请求拆分为了:请求行请求头+请求体
    在这里插入图片描述

    package com.yjx23332.netty.test;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import static org.springframework.http.HttpHeaders.CONTENT_LENGTH;
    
    @Slf4j
    public class TestHttp {
        public static void main(String[] args){
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup workers = new NioEventLoopGroup();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .channel(NioServerSocketChannel.class)
                        .group(boss,workers)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                                ch.pipeline().addLast(new HttpServerCodec());
    
    //                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
    //                                @Override
    //                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //                                    log.debug("{}",msg.getClass());
    //                                    if(msg instanceof HttpRequest){//请求头
    //
    //                                    }else if (msg instanceof HttpContent){//请求体
    //
    //                                    }
    //                                }
    //                            });
    
                                //只关心某种类型的消息
                                ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                                    @Override
                                    protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                                        //请求行
                                        log.debug("请求行:{}",msg.uri());
                                        //请求头
                                        log.debug("请求头:{}",msg.headers());
    
                                        //返回响应
                                        DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(),HttpResponseStatus.OK);
                                        byte[] bytes = "

    hello,world!

    "
    .getBytes(); response.headers().setInt(CONTENT_LENGTH,bytes.length); response.content().writeBytes(bytes); ctx.writeAndFlush(response); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { boss.shutdownGracefully(); workers.shutdownGracefully(); } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    在这里插入图片描述

    2.3 自定义协议

    • 魔数:用来在第一时间判定是否是无效数据包
    • 版本号,可以支持协议的升级
    • 序列算法:协议正文到底采用那种序列化与反序列化方式,可以由此扩展。如json、protobuf、hessian、jdk
    • 指令集类型:登陆、注册、单聊、群聊……与业务相关
    • 请求序号,为了双工通信,提供异步能力
    • 正文长度
    • 消息正文

    创建大致如下结构

    在这里插入图片描述

    Message消息类

    package com.yjx23332.netty.test.entity;
    
    import com.yjx23332.netty.test.entity.vo.req.*;
    import com.yjx23332.netty.test.entity.vo.resp.*;
    import lombok.Data;
    
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    
    @Data
    public abstract  class Message implements Serializable {
        public static Class<?> getMessageClass(int messageType){ return messageClasses.get(messageType);}
    
        private int sequenceId;
    
        private int messageType;
    
        public  abstract int getMessageType();
    
        /**
         * 指令类型
         * */
        public static final int LoginRequestMessage = 0;
    
        public  static final int LoginResponseMessage = 1;
    
        public static final int ChatRequestMessage = 2;
    
        public  static final int ChatResponseMessage = 3;
    
        public static final int GroupCreateRequestMessage = 4;
    
        public static final int GroupCreateResponseMessage = 5;
    
        public static final int GroupJoinRequestMessage = 6;
    
        public static final int GroupJoinResponseMessage = 7;
    
        public static final int GroupQuitRequestMessage = 8;
    
        public static final int GroupQuitResponseMessage = 9;
    
        public static final int GroupChatRequestMessage = 10;
    
        public static final int GroupChatResponseMessage = 11;
    
        public static final int GroupMembersRequestMessage = 12;
    
        public static final int GroupMembersResponseMessage = 13;
    
        public static final int PingMessage = 14;
    
        private static final Map<Integer,Class<?>> messageClasses = new HashMap<>();
    
        static{
            messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
            messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
            messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
            messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
            messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
            messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
            messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
            messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
            messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
            messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
            messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
            messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
            messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
            messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
            messageClasses.put(PingMessage,PingMessage.class);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    LoginRequestMessage与LoginResponseMessage

    以LoginRequestMessage为例,先创建类似如下格式文件。主要先把getMessageType实现了。

    package com.yjx23332.netty.test.entity.vo.req;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    //将父类中的属性也算到tostring中
    @ToString(callSuper = true)
    public class LoginRequestMessage extends Message {
        private String username;
        private String password;
    
        public LoginRequestMessage(){}
    
        public LoginRequestMessage(String username,String password){
            this.username = username;
            this.password = password;
        }
    
        @Override
        public int getMessageType() {
            return LoginRequestMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    package com.yjx23332.netty.test.entity.vo.resp;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class LoginResponseMessage extends Message {
        private boolean success;
        private String message;
    
        LoginResponseMessage(boolean success, String message){
            this.success= success;
            this.message = message;
        }
        @Override
        public int getMessageType() {
            return LoginResponseMessage;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    2.3.1 自定义编解码操作

    package com.yjx23332.netty.test.protocol;
    
    import com.yjx23332.netty.test.entity.Message;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageCodec;
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.util.List;
    
    @Slf4j
    public class MessageCodec extends ByteToMessageCodec<Message> {
        /***
         * 编码
         */
        @Override
        protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
            //1. 4 字节表示魔数
            out.writeBytes(new byte[]{1,2,3,4});
            //2. 1 字节表示版本
            out.writeByte(1);
            //3. 1 字节表示序列化方式,0 jdk,1 json
            out.writeByte(0);
            //4. 1 字节表示指令类型
            out.writeByte(msg.getMessageType());
            //5. 4 字节表示序列号
            out.writeInt(msg.getSequenceId());
    
            // 上述和为15,为了满足2^n倍,让内存对齐。填入一个无意义字节
            out.writeByte(0xff);
    
            //6. 获取内容的字节数组
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(msg);
            byte[] bytes = byteArrayOutputStream.toByteArray();
    
            //7. 4字节表示长度
            out.writeInt(bytes.length);
            //8. 写入内容
            out.writeBytes(bytes);
            if(objectOutputStream != null)
                objectOutputStream.close();
            if(byteArrayOutputStream != null)
                byteArrayOutputStream.close();
        }
    
        /**
         * 解码
         * */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            int magicNum = in.readInt();
            byte version = in.readByte();
            byte serializerType = in.readByte();
            byte messageType = in.readByte();
            int sequenceId = in.readInt();
            in.readByte();
            int length = in.readInt();
            byte[] bytes = new byte[length];
            /**
             * 从当前读指针位置开始读
             * */
            in.readBytes(bytes,0,length);
            if(serializerType == 0){
                ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
                Message message = (Message) objectInputStream.readObject();
                log.debug("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
                log.debug("{}",message);
                out.add(message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    测试,放在同一个包下,方便调用内部方法

    package com.yjx23332.netty.test.protocol;
    
    import com.yjx23332.netty.test.entity.vo.req.LoginRequestMessage;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.handler.logging.LoggingHandler;
    
    public class TestMessage {
        public static void main(String[] args) throws Exception {
            EmbeddedChannel channel = new EmbeddedChannel(
                    new LoggingHandler(),
                    new MessageCodec()
            );
            //encode
            LoginRequestMessage loginRequestMessage = new LoginRequestMessage("zhangsan","123","张三");
            channel.writeOutbound(loginRequestMessage);
            //decode
            ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
            new MessageCodec().encode(null,loginRequestMessage,byteBuf);
            //入站
            channel.writeInbound(byteBuf);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    2.3.2 黏包与半包

    这里黏包不大会有问题,但是半包问题仍会发生。因此我们加入帧解码器

    	EmbeddedChannel channel = new EmbeddedChannel(
                    new LengthFieldBasedFrameDecoder(1024,12,4,0,0),
                    new LoggingHandler(),
                    new MessageCodec()
            );
    
    • 1
    • 2
    • 3
    • 4
    • 5

    半包测试
    我们把内容进行切片,来测试半包即可

      		ByteBuf s1 = byteBuf.slice(0,100);
            s1.retain();
            ByteBuf s2 = byteBuf.slice(100, byteBuf.readableBytes() - 100);
            s2.retain();
            //入站
            channel.writeInbound(s1);//会自动调用 s1.release()
            channel.writeInbound(s2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.3.3 @Sharable

    一个Handler实例,让其它管道共用可以吗?比如如下写法

    		LengthFieldBasedFrameDecoder lengthFieldBasedFrameDecoder = new LengthFieldBasedFrameDecoder(1024,12,4,0,0);
            LoggingHandler loggingHandler = new LoggingHandler();
            EmbeddedChannel channel = new EmbeddedChannel(
                    lengthFieldBasedFrameDecoder,
                    loggingHandler,
                    new MessageCodec()
            );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 对于loggingHandler 没有问题,因为它只是用于日志记录,且不存储中间数据。
    • 但是对于lengthFieldBasedFrameDecoder ,如果有多个用户同时使用lengthFieldBasedFrameDecoder ,且有半包情况时,就会出错,线程不安全。这也是ThreadLocal出现的目的。
    • 因此需要用来记录状态的或者会被改变的方法,不要进行共享

    对于Netty,如果它的Handler类上加了 @Sharable 的话,就说明它考虑到了多线程,可以被共享。

    在这里插入图片描述

    那我们自己的编解码器可以用吗?
    我们编解码器继承了该类,该类要求其子类不能使用@Sharable
    它认为我们可能会需要处理中间状态,因此强制要求不能共享
    在这里插入图片描述
    可以看到有这一句

    `protected ByteToMessageCodec(boolean preferDirect) {
            ensureNotSharable();
            outboundMsgMatcher = TypeParameterMatcher.find(this, ByteToMessageCodec.class, "I");
            encoder = new Encoder(preferDirect);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    通过下列语句,来判断是否有该注解,

     	protected void ensureNotSharable() {
            if (this.isSharable()) {
                throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
            }
        }
    
        public boolean isSharable() {
            Class<?> clazz = this.getClass();
            Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
            Boolean sharable = (Boolean)cache.get(clazz);
            if (sharable == null) {
                sharable = clazz.isAnnotationPresent(ChannelHandler.Sharable.class);
                cache.put(clazz, sharable);
            }
    
            return sharable;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    所以我们换一个即可,换成一个认为已经接收完毕所有消息的去继承

    package com.yjx23332.netty.test.protocol;
    
    import com.yjx23332.netty.test.entity.Message;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageCodec;
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.util.List;
    
    /**
     * 必须确保收到的消息是完整的,才可以使用
     * */
    @ChannelHandler.Sharable
    @Slf4j
    public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf,Message> {
        /***
         * 编码
         */
        @Override
        protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
            ByteBuf outTemp = ctx.alloc().buffer();
            //1. 4 字节表示魔数
            outTemp.writeBytes(new byte[]{1,2,3,4});
            //2. 1 字节表示版本
            outTemp.writeByte(1);
            //3. 1 字节表示序列化方式,0 jdk,1 json
            outTemp.writeByte(0);
            //4. 1 字节表示指令类型
            outTemp.writeByte(msg.getMessageType());
            //5. 4 字节表示序列号
            outTemp.writeInt(msg.getSequenceId());
    
            // 上述和为15,为了满足2^n倍,让内存对齐。填入一个无意义字节
            outTemp.writeByte(0xff);
    
            //6. 获取内容的字节数组
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(msg);
            byte[] bytes = byteArrayOutputStream.toByteArray();
    
            //7. 4字节表示长度
            outTemp.writeInt(bytes.length);
            //8. 写入内容
            outTemp.writeBytes(bytes);
            if(objectOutputStream != null)
                objectOutputStream.close();
            if(byteArrayOutputStream != null)
                byteArrayOutputStream.close();
            out.add(outTemp);
        }
    
        /**
         * 解码
         * */
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            int magicNum = in.readInt();
            byte version = in.readByte();
            byte serializerType = in.readByte();
            byte messageType = in.readByte();
            int sequenceId = in.readInt();
            in.readByte();
            int length = in.readInt();
            byte[] bytes = new byte[length];
            /**
             * 从当前读指针位置开始读
             * */
            in.readBytes(bytes,0,length);
            if(serializerType == 0){
                ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
                Message message = (Message) objectInputStream.readObject();
                log.debug("{},{},{},{},{},{}",magicNum,version,serializerType,messageType,sequenceId,length);
                log.debug("{}",message);
                out.add(message);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84

    三、聊天室业务

    3.1 搭建基本结构

    就着前面的代码,完成一个简易聊天室
    在这里插入图片描述

    3.1.1 Userservice

    package com.yjx23332.netty.test.server.service;
    
    public interface UserService {
        /**
         * 登录
         * @param username 用户名
         * @param password 密码
         * */
        boolean login(String username,String password);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    此处不连接数据库,直接获取值

    package com.yjx23332.netty.test.server.service.Impl;
    
    import com.yjx23332.netty.test.server.service.UserService;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class UserServiceMemoryImpl implements UserService {
        private Map<String,String> allUserMap = new ConcurrentHashMap<>();
    
        {
            allUserMap.put("zhangsan","123");
            allUserMap.put("lisi","123");
            allUserMap.put("wangwu","123");
        }
    
        @Override
        public boolean login(String username, String password) {
            String pass = allUserMap.get(username);
            if(pass == null){
                return false;
            }
            return pass.equals(password);
        }
    }
    
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    3.1.2 group

    package com.yjx23332.netty.test.entity.dto;
    
    import lombok.Data;
    
    import java.util.Collections;
    import java.util.Set;
    @Data
    public class Group {
        private String name;
        private Set<String> members;
    
        public static final Group EMPTY_GROUP = new Group("empty", Collections.emptySet());
    
        public Group(String name, Set<String> members) {
            this.name = name;
            this.members = members;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.1.3 session

    package com.yjx23332.netty.test.server.session;
    
    import io.netty.channel.Channel;
    
    public interface Session {
        /**
         * 绑定会话
         * @param channel 哪一个channel
         * @param username 绑定用户
         * */
        void bind(Channel channel, String username);
    
        /**
         * 解绑会话
         * @param channel 哪一个channel
         * */
        void unbind(Channel channel);
    
        /**
         * 获取属性
         * @param channel 哪一个channel
         * @param name 属性名
         * @return 属性值
         * */
        Object getAttribute(Channel channel,String name);
    
        /**
         * 设置属性
         * @param channel 哪一个channel
         * @param name 属性名
         * @param value 属性值
         * */
        void setAttribute(Channel channel, String name, Object value);
    
        /**
         * 获取管道
         * @param username 哪一个用户
         * */
        Channel getChannel(String username);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    可以发现,很类似WebSocket搭建聊天室的设计

    package com.yjx23332.netty.test.server.session;
    
    import io.netty.channel.Channel;
    
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class SessionMemoryImpl implements Session{
        private final Map<String,Channel> usernameChannelMap = new ConcurrentHashMap<>();
        private final Map<Channel,String> channelUsernameMap = new ConcurrentHashMap<>();
        private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
    
    
        @Override
        public void bind(Channel channel, String username) {
            usernameChannelMap.put(username,channel);
            channelUsernameMap.put(channel,username);
            channelAttributesMap.put(channel,new ConcurrentHashMap<>());
        }
    
        @Override
        public void unbind(Channel channel) {
            String username = channelUsernameMap.remove(channel);
            usernameChannelMap.remove(username);
            channelAttributesMap.remove(channel);
        }
    
        @Override
        public Object getAttribute(Channel channel, String name) {
            return channelAttributesMap.get(channel).get(name);
        }
    
        @Override
        public void setAttribute(Channel channel, String name, Object value) {
            channelAttributesMap.get(channel).put(name,value);
        }
    
        @Override
        public Channel getChannel(String username) {
            return usernameChannelMap.get(username);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    3.1.4 GroupSession

    package com.yjx23332.netty.test.server.session;
    
    import io.netty.channel.Channel;
    
    import java.util.List;
    import java.util.Set;
    
    public interface GroupSession {
    
        /**
         * 创建一个聊天组,如果不存在才能创建成功,否则返回null
         * @param name 组名
         * @param members 成员
         * @return  成功则返回组对象
         * */
    
        Group createGroup(String name, Set<String> members);
    
        /**
         * 加入一个聊天组,如果不存在才能创建成功,否则返回null
         * @param name 组名
         * @param member 成员
         * @return  成功则返回组对象
         * */
        Group joinMember(String name ,String member);
    
        /**
         * 移除组成员
         * @param name 组名
         * @param member 成员
         * @Return 如果组不存在则返回null,否则返回组对象
         * */
        Group removeMember(String name,String member);
    
        /**
         * 移除聊天组
         * @param name 组名
         * @Return 如果组不存在则返回null,否则返回组对象
         * */
        Group removeGroup(String name);
    
        /**
         * 获取组成员
         * @param name 组名
         * @Return 如果组不存在则返回null,否则返回组对象
         */
        Set<String> getMembers(String name);
    
        /**
         * 获取组成员的channel几何,只有在线的channel再回返回
         * @param name 组名
         * */
        List<Channel> getMembersChannel(String name);
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    package com.yjx23332.netty.test.server.session.Impl;
    
    import com.yjx23332.netty.test.entity.dto.Group;
    import com.yjx23332.netty.test.server.session.GroupSession;
    import com.yjx23332.netty.test.server.factory.SessionFactory;
    import io.netty.channel.Channel;
    
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.stream.Collectors;
    
    public class GroupSessionMemoryImpl implements GroupSession {
        private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
    
        @Override
        public Group createGroup(String name, Set<String> members) {
            Group group = new Group(name, members);
            return groupMap.putIfAbsent(name,group);
        }
    
        @Override
        public Group joinMember(String name, String member) {
            return groupMap.computeIfPresent(name, (key, value) -> {
                value.getMembers().add(member);
                return value;
            });
        }
    
        @Override
        public Group removeMember(String name, String member) {
            return groupMap.computeIfPresent(name, (key, value) -> {
                value.getMembers().remove(member);
                return value;
            });
        }
    
        @Override
        public Group removeGroup(String name) {
            return groupMap.remove(name);
        }
    
        @Override
        public Set<String> getMembers(String name) {
            return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
        }
    
        @Override
        public List<Channel> getMembersChannel(String name) {
            if(groupMap.get(name) == null){
                return null;
            }
            return getMembers(name).stream()
                    .map(member -> SessionFactory.getSession().getChannel(member))
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());
        }
    
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60

    3.1.5 ProcotolFrameDecoder

    替换掉原来默认的基于长度字段的帧解码器

    package com.yjx23332.netty.test.protocol;
    
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    
    public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
    
        public ProcotolFrameDecoder(){
            super(1024,12,4,0,0);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    服务器运行类

    package com.yjx23332.netty.test.server;
    
    import com.yjx23332.netty.test.protocol.MessageCodec;
    import com.yjx23332.netty.test.protocol.ProcotolFrameDecoder;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class ChatServer {
        public static void main(String[] args){
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler();
            MessageCodec MESSAGE_CODEC = new MessageCodec();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .channel(NioServerSocketChannel.class)
                        .group(boss,worker)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new ProcotolFrameDecoder());
                                socketChannel.pipeline().addLast(LOGGING_HANDLER);
                                socketChannel.pipeline().addLast(MESSAGE_CODEC);
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.debug("{}",e);
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    3.1.6 Factory

    package com.yjx23332.netty.test.server.factory;
    
    import com.yjx23332.netty.test.server.session.Impl.SessionMemoryImpl;
    import com.yjx23332.netty.test.server.session.Session;
    
    public abstract class SessionFactory {
    
        private static Session session = new SessionMemoryImpl();
    
        public static Session getSession() {
            return session;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    package com.yjx23332.netty.test.server.factory;
    
    import com.yjx23332.netty.test.server.service.Impl.UserServiceMemoryImpl;
    import com.yjx23332.netty.test.server.service.UserService;
    
    public abstract class UserServiceFacotory {
        private static UserService userService = new UserServiceMemoryImpl();
    
        public static UserService getUserService(){
            return  userService;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    package com.yjx23332.netty.test.server.factory;
    
    import com.yjx23332.netty.test.server.session.GroupSession;
    import com.yjx23332.netty.test.server.session.Impl.GroupSessionMemoryImpl;
    
    public abstract class GroupSessionFactory {
        private static GroupSession groupSession = new GroupSessionMemoryImpl();
    
        public static GroupSession getGroupSession() {
            return groupSession;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    3.2 消息

    登录相关的消息在2.3就已贴出源码,此处就不再给出

    3.2.1 ChatMessage

    package com.yjx23332.netty.test.entity.vo.req;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class ChatRequestMessage extends Message {
        private String content;
        private String to;
        private String from;
    
        public  ChatRequestMessage(String from,String to,String content){
            this.content = content;
            this.from = from;
            this.to = to;
        }
        @Override
        public int getMessageType() {
            return ChatRequestMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    package com.yjx23332.netty.test.entity.vo.resp;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class ChatResponseMessage extends Message {
        private String from;
        private String content;
    
        private boolean success;
    
        public ChatResponseMessage(String from, String content){
            this.from = from;
            this.success = true;
            this.content = content;
        }
    
        public ChatResponseMessage(boolean success,String content){
            this.success = success;
            this.content = content;
        }
    
        @Override
        public int getMessageType() {
            return ChatResponseMessage;
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    3.2.2 GroupChatMessage

    package com.yjx23332.netty.test.entity.vo.req;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupChatRequestMessage extends Message {
        private String content;
        private String groupName;
        private String from;
        public GroupChatRequestMessage(String from,String groupName,String content){
            this.from = from;
            this.groupName = groupName;
            this.content = content;
        }
        @Override
        public int getMessageType() {
            return GroupChatRequestMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    package com.yjx23332.netty.test.entity.vo.resp;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupChatResponseMessage extends Message {
        private String from;
        private String content;
    
        public GroupChatResponseMessage(String from,String content){
            this.from = from;
            this.content = content;
        }
    
        @Override
        public int getMessageType() {
            return GroupChatResponseMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3.2.3 GroupCreate

    package com.yjx23332.netty.test.entity.vo.req;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    import java.util.Set;
    @Data
    @ToString(callSuper = true)
    public class GroupCreateRequestMessage extends Message {
        private String groupName;
        private Set<String> members;
    
        public GroupCreateRequestMessage(String groupName,Set<String> members){
            this.groupName  = groupName;
            this.members = members;
        }
    
        @Override
        public int getMessageType() {
            return GroupCreateRequestMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    package com.yjx23332.netty.test.entity.vo.resp;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupCreateResponseMessage extends Message {
        String message;
        boolean success;
        public GroupCreateResponseMessage(boolean success, String message){
            this.message = message;
            this.success = success;
        }
        @Override
        public int getMessageType() {
            return GroupCreateResponseMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.2.4 GroupMembers

    package com.yjx23332.netty.test.entity.vo.req;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupMembersRequestMessage extends Message {
        private String groupName;
    
        public GroupMembersRequestMessage(String groupName){
            this.groupName = groupName;
        }
    
        @Override
        public int getMessageType() {
            return GroupMembersRequestMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.2.5 GroupJoin

    package com.yjx23332.netty.test.entity.vo.req;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupJoinRequestMessage extends Message {
        private String groupName;
    
        private String username;
    
        public GroupJoinRequestMessage(String username,String groupName){
            this.username = username;
            this.groupName = groupName;
        }
    
        @Override
        public int getMessageType() {
            return GroupJoinRequestMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    package com.yjx23332.netty.test.entity.vo.resp;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupJoinResponseMessage extends Message {
        String message;
        boolean success;
        public GroupJoinResponseMessage(boolean success,String message){
            this.message = message;
            this.success = success;
        }
        @Override
        public int getMessageType() {
            return GroupJoinResponseMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    3.2.6 GroupQuit

    package com.yjx23332.netty.test.entity.vo.req;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupQuitRequestMessage extends Message {
        private String groupName;
    
        private String username;
    
        public GroupQuitRequestMessage(String username,String groupName){
            this.username = username;
            this.groupName = groupName;
        }
    
        @Override
        public int getMessageType() {
            return GroupQuitRequestMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    package com.yjx23332.netty.test.entity.vo.resp;
    
    import com.yjx23332.netty.test.entity.Message;
    import lombok.Data;
    import lombok.ToString;
    
    @Data
    @ToString(callSuper = true)
    public class GroupQuitResponseMessage extends Message {
        boolean success;
        String message;
    
        public GroupQuitResponseMessage(boolean success , String message){
            this.success = success;
            this.message = message;
        }
    
        @Override
        public int getMessageType() {
            return GroupQuitResponseMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    3.3 服务端与客户端

    3.3.1 客户端

    图方便,这里就不检验各个输入的正确性
    且用一个比较简陋的处理方式,接收与处理所有消息

    package com.yjx23332.netty.test.client;
    
    import ch.qos.logback.classic.pattern.SyslogStartConverter;
    import com.yjx23332.netty.test.entity.vo.req.*;
    import com.yjx23332.netty.test.entity.vo.resp.LoginResponseMessage;
    import com.yjx23332.netty.test.protocol.MessageCodec;
    import com.yjx23332.netty.test.protocol.ProcotolFrameDecoder;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.Arrays;
    import java.util.Scanner;
    import java.util.Set;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.stream.Collectors;
    
    
    @Slf4j
    public class ChatClient {
        public static void main(String[] args) {
            NioEventLoopGroup group = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler();
            MessageCodec MESSAGE_CODEC = new MessageCodec();
            // 计数器,用于检测是否有消息
            CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
            // 记录登录状态
            AtomicBoolean LoginStatus = new AtomicBoolean(false);
            try{
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class)
                        .group(group)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new ProcotolFrameDecoder());
                                socketChannel.pipeline().addLast(LOGGING_HANDLER);
                                socketChannel.pipeline().addLast(MESSAGE_CODEC);
                                socketChannel.pipeline().addLast("client handler",new ChannelInboundHandlerAdapter(){
                                    @Override
                                    public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                        //建立连接后,创建一个线程去接收用户在控制的输入,负责向服务器发送各种消息
                                        new Thread(()->{
                                            Scanner scanner = new Scanner(System.in);
                                            System.out.println("请输入用户名:");
                                            String userName = scanner.nextLine();
                                            System.out.println("请输入密码:");
                                            String password = scanner.nextLine();
                                            //此处忽略校验
                                            ctx.writeAndFlush(new LoginRequestMessage(userName,password));
                                            //阻塞,直到计数为0
                                            try {
                                                WAIT_FOR_LOGIN.await();
                                            } catch (InterruptedException e) {
                                                log.error("client login error {}",e);
                                            }
                                            //登录失败则关闭连接
                                            if(!LoginStatus.get()){
                                                ctx.channel().close();
                                                return;
                                            }
                                            while(true){
                                                System.out.println("=============================");
                                                System.out.println("send [userName] [content]");
                                                System.out.println("gsend [group name] [content]");
                                                System.out.println("gcreate [group name] [m1,m2,m3...]");
                                                System.out.println("gmemebers [group name]");
                                                System.out.println("gjoin [group name]");
                                                System.out.println("gquit [group name]");
                                                System.out.println("quit");
                                                System.out.println("=============================");
                                                String command = scanner.nextLine();
                                                String[] s = command.split(" ");
                                                switch (s[0]){
                                                    case "send":
                                                        ctx.writeAndFlush(new ChatRequestMessage(userName,s[1],s[2]));
                                                        break;
                                                    case "gsend":
                                                        ctx.writeAndFlush(new GroupChatRequestMessage(userName,s[1],s[2]));
                                                        break;
                                                    case "gcreate":
                                                        Set<String> members = Arrays.stream(s[2].split(",")).collect(Collectors.toSet());
                                                        members.add(userName);
                                                        ctx.writeAndFlush(new GroupCreateRequestMessage(s[1],members));
                                                        break;
                                                    case "gmembers":
                                                        ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
                                                        break;
                                                    case "gjoin":
                                                        ctx.writeAndFlush(new GroupJoinRequestMessage(userName,s[1]));
                                                        break;
                                                    case "gquit":
                                                        ctx.writeAndFlush(new GroupQuitRequestMessage(userName,s[1]));
                                                        break;
                                                    case "quit":
                                                        ctx.channel().close();
                                                        return;
                                                    default:
                                                        System.out.println("请参照输入格式");
                                                        break;
                                                }
                                            }
                                        },"system.in").start();
    
                                        super.channelActive(ctx);
                                    }
    
                                    @Override
                                    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                        System.out.println("msg:" + msg);
                                        if(msg instanceof  LoginResponseMessage){
                                            LoginResponseMessage responseMessage = (LoginResponseMessage) msg;
                                            if(responseMessage.isSuccess()){
                                                LoginStatus.set(true);
                                            }
                                            //数字 -1,唤醒阻塞的线程
                                            WAIT_FOR_LOGIN.countDown();
                                        }
                                        super.channelRead(ctx, msg);
                                    }
                                });
                            }
                        });
                ChannelFuture channelFuture = bootstrap.connect("localhost",8080).sync();
                channelFuture.channel().closeFuture().sync();
            }catch (Exception e){
                log.error("client error {}",e);
            }
            finally {
                group.shutdownGracefully();
                log.debug("连接正在关闭");
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144

    3.3.2 handler

    由于服务端使用的处理器比较多,匿名的话比较难阅读,我们再建一个handler类,用于流水线上的处理方法
    在这里插入图片描述

    LoginRequestMessageHandler
    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.entity.vo.req.LoginRequestMessage;
    import com.yjx23332.netty.test.entity.vo.resp.LoginResponseMessage;
    import com.yjx23332.netty.test.server.factory.SessionFactory;
    import com.yjx23332.netty.test.server.factory.UserServiceFacotory;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    /**
     * 该处理器只处理 LoginRequestMessage,因此用 SimpleChannelInboundHandler
     * */
    @ChannelHandler.Sharable
    public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
            @Override
            protected void channelRead0(ChannelHandlerContext channelHandlerContext, LoginRequestMessage loginRequestMessage) throws Exception {
                String username = loginRequestMessage.getUsername();
                String password = loginRequestMessage.getPassword();
                boolean login = UserServiceFacotory.getUserService().login(username,password);
                LoginResponseMessage loginResponseMessage = null;
                if(login){
                    /**
                     * 将登录的管道以及对应的用户名放入session之中
                     * */
                    SessionFactory.getSession().bind(channelHandlerContext.channel(),username);
                    loginResponseMessage = new LoginResponseMessage(true,"登录成功");
                }else{
                    loginResponseMessage = new LoginResponseMessage(false,"用户名或密码不正确");
                }
                channelHandlerContext.writeAndFlush(loginResponseMessage);
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    ChatRequestMessageHandler
    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.entity.vo.req.ChatRequestMessage;
    import com.yjx23332.netty.test.entity.vo.resp.ChatResponseMessage;
    import com.yjx23332.netty.test.server.factory.SessionFactory;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    @ChannelHandler.Sharable
    public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, ChatRequestMessage chatRequestMessage) throws Exception {
            String to = chatRequestMessage.getTo();
            Channel channel = SessionFactory.getSession().getChannel(to);
    
            //在线
            if(channel != null){
                channel.writeAndFlush(new ChatResponseMessage(chatRequestMessage.getFrom(),chatRequestMessage.getContent()));
            }
            //不在线
            else{
                channelHandlerContext.writeAndFlush(new ChatResponseMessage(false,"对方用户不存在或未在线"));
            }
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    GroupCreateRequestMessageHandler
    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.entity.dto.Group;
    import com.yjx23332.netty.test.entity.vo.req.GroupCreateRequestMessage;
    import com.yjx23332.netty.test.entity.vo.resp.GroupCreateResponseMessage;
    import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
    import com.yjx23332.netty.test.server.session.GroupSession;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    import java.util.List;
    import java.util.Set;
    
    @ChannelHandler.Sharable
    public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupCreateRequestMessage groupCreateRequestMessage) throws Exception {
            String groupName = groupCreateRequestMessage.getGroupName();
            Set<String> members = groupCreateRequestMessage.getMembers();
    
            //群管理器
            GroupSession groupSession = GroupSessionFactory.getGroupSession();
            Group group = groupSession.createGroup(groupName,members);
            if(group == null){
                channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(true,groupName + ",创建成功"));
                //发送拉群消息
                List<Channel> channels = groupSession.getMembersChannel(groupName);
                for(Channel channel : channels){
                    channel.writeAndFlush(new GroupCreateResponseMessage(true,"您已被拉入:" + groupName));
                }
            }
            else{
                channelHandlerContext.writeAndFlush(new GroupCreateResponseMessage(false,groupName + ",已经存在"));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    GroupQuitRequestMessageHandler
    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.entity.dto.Group;
    import com.yjx23332.netty.test.entity.vo.req.GroupQuitRequestMessage;
    import com.yjx23332.netty.test.entity.vo.resp.GroupQuitResponseMessage;
    import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
    import com.yjx23332.netty.test.server.factory.SessionFactory;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    @ChannelHandler.Sharable
    public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupQuitRequestMessage groupQuitRequestMessage) throws Exception {
            String groupName = groupQuitRequestMessage.getGroupName();
            String userName = groupQuitRequestMessage.getUsername();
            Group group = GroupSessionFactory.getGroupSession().removeMember(groupName,userName);
            if(group.getMembers().contains(userName)){
                channelHandlerContext.writeAndFlush(new GroupQuitResponseMessage(false,"删除失败"));
            }
            else{
                channelHandlerContext.writeAndFlush(new GroupQuitResponseMessage(true,"删除成功,当前成员为:" + group.toString()));
                SessionFactory.getSession().getChannel(userName).writeAndFlush(new GroupQuitResponseMessage(true,"您已被移除群聊:" + groupName));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    GroupJoinRequestMessageHandler
    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.entity.dto.Group;
    import com.yjx23332.netty.test.entity.vo.req.GroupJoinRequestMessage;
    import com.yjx23332.netty.test.entity.vo.resp.GroupJoinResponseMessage;
    import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
    import com.yjx23332.netty.test.server.factory.SessionFactory;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    @ChannelHandler.Sharable
    public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupJoinRequestMessage groupJoinRequestMessage) throws Exception {
            String groupName = groupJoinRequestMessage.getGroupName();
            String userName = groupJoinRequestMessage.getUsername();
            Group group = GroupSessionFactory.getGroupSession().joinMember(groupName,userName);
            if(group.getMembers().contains(userName)){
                channelHandlerContext.writeAndFlush(new GroupJoinResponseMessage(true,"添加成员成功,当前成员:" + group.toString()));
                SessionFactory.getSession().getChannel(userName).writeAndFlush(new GroupJoinResponseMessage(true,"您已加入:" + groupName));
            }
            else{
                channelHandlerContext.writeAndFlush(new GroupJoinResponseMessage(false,"添加失败 "));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    GroupChatRequestMessageHandler
    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.entity.vo.req.GroupChatRequestMessage;
    import com.yjx23332.netty.test.entity.vo.resp.GroupChatResponseMessage;
    import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    
    import java.util.List;
    
    @ChannelHandler.Sharable
    public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupChatRequestMessage groupChatRequestMessage) throws Exception {
            String groupName = groupChatRequestMessage.getGroupName();
            String userName = groupChatRequestMessage.getFrom();
            List<Channel> channelList = GroupSessionFactory.getGroupSession().getMembersChannel(groupChatRequestMessage.getGroupName());
            for(Channel channel: channelList){
                channel.writeAndFlush(new GroupChatResponseMessage(groupName+":"+userName,groupChatRequestMessage.getContent()));
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    GroupMembersRequestMessageHanlder

    这里就不详细处理了

    
    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.entity.vo.req.GroupMembersRequestMessage;
    import com.yjx23332.netty.test.entity.vo.resp.GroupMembersResponseMessage;
    import com.yjx23332.netty.test.server.factory.GroupSessionFactory;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    @ChannelHandler.Sharable
    public class GroupMembersRequestMessageHanlder extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, GroupMembersRequestMessage groupMembersRequestMessage) throws Exception {
           channelHandlerContext.channel().writeAndFlush(new GroupMembersResponseMessage(GroupSessionFactory.getGroupSession().getMembers(groupMembersRequestMessage.getGroupName())));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    QuitHandler

    注意,此处没有处理Group中的失效连接

    package com.yjx23332.netty.test.handler;
    
    import com.yjx23332.netty.test.server.factory.SessionFactory;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import lombok.extern.slf4j.Slf4j;
    
    
    @ChannelHandler.Sharable
    @Slf4j
    public class QuitHandler extends ChannelInboundHandlerAdapter {
        /**
         * 正常关闭
         * */
        @Override
        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            SessionFactory.getSession().unbind(channelHandlerContext.channel());
            log.debug("{},已经正常断开",channelHandlerContext.channel());
        }
        /**
         * 异常关闭
         * */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            SessionFactory.getSession().unbind(ctx.channel());
             log.debug("{},已经异常断开, 异常内容:{}",ctx.channel(),cause);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    3.3.3 服务端

    package com.yjx23332.netty.test.server;
    
    
    import com.yjx23332.netty.test.handler.*;
    import com.yjx23332.netty.test.protocol.MessageCodecSharable;
    import com.yjx23332.netty.test.protocol.ProcotolFrameDecoder;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.IdleStateHandler;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class ChatServer {
        public static void main(String[] args){
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler();
            MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
            LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
            ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
            GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
            GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
            GroupMembersRequestMessageHanlder GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHanlder();
            GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
            GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
            QuitHandler QUIT_HANDLER = new QuitHandler();
            try{
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap
                        .channel(NioServerSocketChannel.class)
                        .group(boss,worker)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new ProcotolFrameDecoder());
                                socketChannel.pipeline().addLast(LOGGING_HANDLER);
                                socketChannel.pipeline().addLast(MESSAGE_CODEC);
                                socketChannel.pipeline().addLast(LOGIN_HANDLER);
                                socketChannel.pipeline().addLast(CHAT_HANDLER);
                                socketChannel.pipeline().addLast(GROUP_CREATE_HANDLER);
                                socketChannel.pipeline().addLast(GROUP_JOIN_HANDLER);
                                socketChannel.pipeline().addLast(GROUP_MEMBERS_HANDLER);
                                socketChannel.pipeline().addLast(GROUP_QUIT_HANDLER);
                                socketChannel.pipeline().addLast(GROUP_CHAT_HANDLER);
                                socketChannel.pipeline().addLast(QUIT_HANDLER);
                            }
                        });
                ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.debug("{}",e);
            }finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    3.4 连接假死

    原因

    • 网络设备出现故障,比如网卡机房等底层TCP连接断开,但应用程序没有感知。任然占用资源
    • 公网网络不稳定,出现丢包。连续丢包,将导致可u段数据发不出去,服务端而无法接收应用程序阻塞,无法进行数据读写
    • 应用程序线程阻塞,无法进行数据读写

    问题

    • 假死的连接占用不能自动释放
    • 向加死的连接发送数据,得到的反馈是发送超时

    3.4.1 空闲检测器以及处理

       						//用来判断是不是读空闲事件过长,或者写空闲事件过长
                               /**
                                * 单位为秒,为0则是不使用。
                                * @param 读时间
                                * @param 写实间
                                * @param 读与写共同空闲时间
                                * 超时触发 IdleStateEvent
                                * */
                               socketChannel.pipeline().addLast(new IdleStateHandler(5,0,0));
                               /**
                                * 同时为出入站处理器
                                * */
                               socketChannel.pipeline().addLast(new ChannelDuplexHandler(){
                                   /**
                                    * 用来触发特殊事件
                                    * */
                                   @Override
                                   public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                       IdleStateEvent event = (IdleStateEvent) evt;
                                       if(event.state() == IdleState.READER_IDLE){
                                           log.debug("已经5秒没有读取到数据");
                                           // 主动关闭连接
                                           ctx.channel().close();                                     
                                       }
                                   }
                               });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    3.4.2 心跳包

    但是如果连接好好的,只是没有东西发怎么办?这个可以由客户端去自动发一个心跳包。应该与业务无关。

    package com.yjx23332.netty.test.entity.vo.resp;
    
    import com.yjx23332.netty.test.entity.Message;
    
    public class PingMessage extends Message {
        @Override
        public int getMessageType() {
            return PingMessage;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    							//如果3秒没有向服务器写数据,触发一个事件
                                socketChannel.pipeline().addLast(new IdleStateHandler(0,3,0));
                                socketChannel.pipeline().addLast(new ChannelDuplexHandler(){
                                    @Override
                                    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                                        IdleStateEvent event = (IdleStateEvent) evt;
                                        if(event.state() == IdleState.WRITER_IDLE){
                                            log.debug("已经3秒没有写数据了,发送一个心跳消息");
                                            ctx.writeAndFlush(new PingMessage());
                                        }
                                    }
                                });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    优化以及拓展见下一篇笔记。

    参考文献

    [1]黑马程序员Netty全套教程

  • 相关阅读:
    flutter EventBus
    RTMP摄像头直播-CameraX数据采集处理
    Prometheus与Grafana监控SpringBoot应用
    VSCode里源代码管理出现新旧代码左右的更改对比窗口
    行为型模式-观察者模式
    k8s创建pod - 启动pod的流程
    Tomcat一机多实例部署
    实验四:健康打卡
    【2024最新华为OD-C/D卷试题汇总】[支持在线评测] 密码解密(100分) - 三语言AC题解(Python/Java/Cpp)
    通过代理模式 + 责任链模式实现对目标执行方法拦截和增强功能
  • 原文地址:https://blog.csdn.net/weixin_46949627/article/details/126768821