• Netty(10)协议设计与解析(IdleStateHandler:空闲检测器、心跳)


    为什么需要协议?

    TCP/IP 中消息传输基于流的方式,没有边界。

    协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

    协议举例

    redis 协议

    客户端代码

    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import java.nio.charset.Charset;
    
    @Slf4j
    public class TestRedis {
        /*
        向redis发送这样的一条命令:set name zhangsan
           redis将上面的一条命令看做数组,依次发送下面的内容
                1. 规定*号后面的就是数组元素的个数
                    *3:元素的个数为3
                2. 发送每个命令每个键值的长度,多个命令之间使用回车+换行进行分隔
                    $3: set命令是三个字节
                    set
                    $4: key的长度是四个字节
                    name
                    $8: value的长度是八个字节
                    zhangsan
          下面演示的客户端怎么发送:
         */
        public static void main(String[] args) {
            /*
                13: 回车
                10: 换行
             */
            final byte[] LINE = {13, 10};
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(worker);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) {
                        ch.pipeline().addLast(new LoggingHandler());
                        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) {
                                //因为使用的是英文和拼音,所以不考虑编码问题
                                ByteBuf buf = ctx.alloc().buffer();
                                buf.writeBytes("*3".getBytes());
                                buf.writeBytes(LINE);//回车换行
                                buf.writeBytes("$3".getBytes());//set命令的长度
                                buf.writeBytes(LINE);//回车换行
                                buf.writeBytes("set".getBytes());//set命令
                                buf.writeBytes(LINE);//回车换行
                                buf.writeBytes("$4".getBytes());//key的长度
                                buf.writeBytes(LINE);
                                buf.writeBytes("name".getBytes());
                                buf.writeBytes(LINE);
                                buf.writeBytes("$8".getBytes());//value的长度
                                buf.writeBytes(LINE);
                                buf.writeBytes("zhangsan".getBytes());
                                buf.writeBytes(LINE);
                                ctx.writeAndFlush(buf);
                            }
                            //接收redis返回到的结果
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                ByteBuf buf = (ByteBuf) msg;
                                System.out.println(buf.toString(Charset.defaultCharset()));
                            }
                        });
                    }
                });
                //连接本地的redis服务器
                ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("client error", e);
            } finally {
                worker.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86

    上面的客户端启动后,会向本机的redis发送数据,最终可以在本机的redis读取到set的内容“zhangsan”

    http协议

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.HttpRequest;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.codec.http.HttpServerCodec;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import lombok.extern.slf4j.Slf4j;
    
    import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
    
    @Slf4j
    public class TestHttp {
        public static void main(String[] args) {
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
                        /*
                            http协议的编解码器:
                                HttpServerCodec: 作为http协议的服务器端
                                Codec: 即包含解密又包含编码
                                    Decode:解码
                                    EnCode:编码
                                    二者组合在一起便是Codec
                            当http请求来了过后,便会从上往下经过一个个的handler
                                到了HttpServerCodec时,便会对请求进行解码,解码的内容将会被分为下面的两部分
                                    1. HttpRequest:包含请求行的请求头
                                    2. HttpContent:代表请求体
                         */
                        //HttpServerCodec: 即是入站处理器,又是出站处理器
                        ch.pipeline().addLast(new HttpServerCodec());
                        //解码后,对解码后的内容进行处理
    
                        /*
                            SimpleChannelInboundHandler:入站处理器,只关心某一种类型的消息;
                                                         可以对消息进行区分,进行选择处理
                                SimpleChannelInboundHandler():只关心HttpRequest类型的消息;
                                                                            其他类型消息便会被跳过
                         */
                        ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {
                                //msg: 代表请求头和请求行
                                // 获取请求行
                                log.debug(msg.uri());
                                // 给浏览器返回响应
                                /*
                                    DefaultFullHttpResponse: 响应结果便会结果出站处理器HttpServerCodec;
                                                           编码成为ByteBuf,符合http协议的响应
                                       protocolVersion:     http协议的版本
                                       HttpResponseStatus:  响应的状态码
                                 */
                                DefaultFullHttpResponse response =
                                        new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
    
                                byte[] bytes = "

    Hello, world!

    "
    .getBytes(); //加入响应头,设置其响应的消息长度 response.headers().setInt(CONTENT_LENGTH, bytes.length); //写入一些响应的内容给response对象 response.content().writeBytes(bytes); // 写回响应 ctx.writeAndFlush(response); } }); /* //不进行消息的区分,使用instanceof判断消息的类型 ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("{}", msg.getClass()); //判断msg是否是HttpRequest类型,是的话便去处理请求行,请求头 if (msg instanceof HttpRequest) { // 请求行,请求头 } else if (msg instanceof HttpContent) { //请求体 //判断msg是否是HttpRequest类型,是的话便去处理请求体 } } }); */ } }); //监听本机的8080端口,当使用浏览器访问localhost:8080时,当前这个方法便会被触发,进行上面的处理 ChannelFuture channelFuture = serverBootstrap.bind(8080).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108

    当浏览器访问:localhost:8080这个链接后,浏览器页面将会显示写入的html内容

    在控制台也会进行相应的输出

    自定义协议

    自定义协议要素

    • 魔数,用来在第一时间判定是否是无效数据包

      发送的数据,一般前几个就是魔术值
      通过魔术值,来判断这个消息是不是需要的,可以理解为对暗号

    • 版本号,可以支持协议的升级

    • 序列化算法消息正文到底采用哪种序列化反序列化方式,可以由此扩展

      例如:将消息正文格式为(**常见的序列化方式**为)json、protobuf(谷歌出品的)、hessian、jdk

    • 指令类型,是登录、注册、单聊、群聊… 跟业务相关

      消息是什么类型

    • 请求序号,为了双工通信,提供异步能力

      消息发送出去,接收的时候不一定是按照发送出去的顺序接收,所以需要一个序号去判断这次响应对应的是之前的那一次请求

    • 正文长度

      通过正文长度去判断需要读取的字节数是多少

    • 消息正文

    实战

    自定义消息的类型

    所有消息的公共父类(Message)

    @Data
    public abstract class Message implements Serializable {
    
        /**
         * 根据消息类型字节,获得对应的消息 class
         * @param messageType 消息类型字节
         * @return 消息 class
         */
        public static Class<? extends Message> getMessageClass(int messageType) {
            return messageClasses.get(messageType);
        }
    
        private int sequenceId;
    
        private int messageType;
    
    	//获取消息类型,其他继承这个方法的重新该方法,如此,子类便可以通过调用这个方法知道是什么消息类型
        public abstract int getMessageType();
    
        public static final int LoginRequestMessage = 0;
        public static final int LoginResponseMessage = 1;
        public static final int ChatRequestMessage = 2;
        public static final int ChatResponseMessage = 3;
        public static final int GroupCreateRequestMessage = 4;
        public static final int GroupCreateResponseMessage = 5;
        public static final int GroupJoinRequestMessage = 6;
        public static final int GroupJoinResponseMessage = 7;
        public static final int GroupQuitRequestMessage = 8;
        public static final int GroupQuitResponseMessage = 9;
        public static final int GroupChatRequestMessage = 10;
        public static final int GroupChatResponseMessage = 11;
        public static final int GroupMembersRequestMessage = 12;
        public static final int GroupMembersResponseMessage = 13;
        public static final int PingMessage = 14;
        public static final int PongMessage = 15;
        /**
         * 请求类型 byte 值
         */
        public static final int RPC_MESSAGE_TYPE_REQUEST = 101;
        /**
         * 响应类型 byte 值
         */
        public static final int  RPC_MESSAGE_TYPE_RESPONSE = 102;
    
        private static final Map<Integer, Class<? extends Message>> messageClasses = new HashMap<>();
    
        static {
            messageClasses.put(LoginRequestMessage, LoginRequestMessage.class);
            messageClasses.put(LoginResponseMessage, LoginResponseMessage.class);
            messageClasses.put(ChatRequestMessage, ChatRequestMessage.class);
            messageClasses.put(ChatResponseMessage, ChatResponseMessage.class);
            messageClasses.put(GroupCreateRequestMessage, GroupCreateRequestMessage.class);
            messageClasses.put(GroupCreateResponseMessage, GroupCreateResponseMessage.class);
            messageClasses.put(GroupJoinRequestMessage, GroupJoinRequestMessage.class);
            messageClasses.put(GroupJoinResponseMessage, GroupJoinResponseMessage.class);
            messageClasses.put(GroupQuitRequestMessage, GroupQuitRequestMessage.class);
            messageClasses.put(GroupQuitResponseMessage, GroupQuitResponseMessage.class);
            messageClasses.put(GroupChatRequestMessage, GroupChatRequestMessage.class);
            messageClasses.put(GroupChatResponseMessage, GroupChatResponseMessage.class);
            messageClasses.put(GroupMembersRequestMessage, GroupMembersRequestMessage.class);
            messageClasses.put(GroupMembersResponseMessage, GroupMembersResponseMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_REQUEST, RpcRequestMessage.class);
            messageClasses.put(RPC_MESSAGE_TYPE_RESPONSE, RpcResponseMessage.class);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    各种消息类型的类

    LoginRequestMessage
    @Data
    @ToString(callSuper = true)
    public class LoginRequestMessage extends Message {
        private String username;
        private String password;
    
        public LoginRequestMessage() {
        }
    
        public LoginRequestMessage(String username, String password) {
            this.username = username;
            this.password = password;
        }
    
        @Override
        public int getMessageType() {
            return LoginRequestMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    LoginResponseMessage
    @Data
    @ToString(callSuper = true)
    public class LoginResponseMessage extends AbstractResponseMessage {
    
        public LoginResponseMessage(boolean success, String reason) {
            super(success, reason);
        }
    
        @Override
        public int getMessageType() {
            return LoginResponseMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    ChatRequestMessage
    @Data
    @ToString(callSuper = true)
    public class ChatRequestMessage extends Message {
        private String content;
        private String to;
        private String from;
    
        public ChatRequestMessage() {
        }
    
        public ChatRequestMessage(String from, String to, String content) {
            this.from = from;
            this.to = to;
            this.content = content;
        }
    
        @Override
        public int getMessageType() {
            return ChatRequestMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    GroupCreateRequestMessage
    @Data
    @ToString(callSuper = true)
    public class GroupCreateRequestMessage extends Message {
        private String groupName;
        private Set<String> members;
    
        public GroupCreateRequestMessage(String groupName, Set<String> members) {
            this.groupName = groupName;
            this.members = members;
        }
    
        @Override
        public int getMessageType() {
            return GroupCreateRequestMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    GroupCreateRequestMessage
    @Data
    @ToString(callSuper = true)
    public class GroupCreateRequestMessage extends Message {
        private String groupName;
        private Set<String> members;
    
        public GroupCreateRequestMessage(String groupName, Set<String> members) {
            this.groupName = groupName;
            this.members = members;
        }
    
        @Override
        public int getMessageType() {
            return GroupCreateRequestMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    GroupMembersRequestMessage
    @Data
    @ToString(callSuper = true)
    public class GroupMembersRequestMessage extends Message {
        private String groupName;
    
        public GroupMembersRequestMessage(String groupName) {
            this.groupName = groupName;
        }
    
        @Override
        public int getMessageType() {
            return GroupMembersRequestMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    GroupJoinRequestMessage
    @Data
    @ToString(callSuper = true)
    public class GroupJoinRequestMessage extends Message {
        private String groupName;
    
        private String username;
    
        public GroupJoinRequestMessage(String username, String groupName) {
            this.groupName = groupName;
            this.username = username;
        }
    
        @Override
        public int getMessageType() {
            return GroupJoinRequestMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    GroupQuitRequestMessage
    @Data
    @ToString(callSuper = true)
    public class GroupQuitRequestMessage extends Message {
        private String groupName;
    
        private String username;
    
        public GroupQuitRequestMessage(String username, String groupName) {
            this.groupName = groupName;
            this.username = username;
        }
    
        @Override
        public int getMessageType() {
            return GroupQuitRequestMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    PingMessage
    public class PingMessage extends Message {
        @Override
        public int getMessageType() {
            return PingMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    PongMessage
    public class PongMessage extends Message {
        @Override
        public int getMessageType() {
            return PongMessage;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    自定义消息的编解码器

    @Slf4j
    //加了Sharable就说明以后在使用这个类进行实例化时,可以将其抽取出来共享, 但是不能继承ByteToMessageCodec这个方法
    //@ChannelHandler.Sharable
    public class MessageCodec extends ByteToMessageCodec<Message> {
    
        @Override
        public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
            // 1. 4 字节的魔数:类似与一种暗号
            out.writeBytes(new byte[]{1, 2, 3, 4});
            // 2. 1 字节的版本,
            out.writeByte(1);
            // 3. 1 字节的序列化方式, 约定:jdk: 0 , json: 1
            out.writeByte(0);//jdk的序列化方式
            // 4. 1 字节的指令类型
            out.writeByte(msg.getMessageType());
            // 5. 4 个字节的请求序号
            out.writeInt(msg.getSequenceId());
            // 无意义,对齐填充, 满足2的n次方
            out.writeByte(0xff);
            // 6. 获取内容的字节数组: 序列化为二进制的字节数组
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(msg);
            byte[] bytes = bos.toByteArray();
            // 7. 长度
            out.writeInt(bytes.length);
            // 8. 写入内容, 在写入内容前面的长度部分是固定的:15+1
            out.writeBytes(bytes);
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            //获取魔数值:四个字节
            int magicNum = in.readInt();
            //获取版本:一字节
            byte version = in.readByte();
            //获取序列化算法:一字节
            byte serializerType = in.readByte();
            //获取消息类型:一字节
            byte messageType = in.readByte();
            //消息序号:四字节
            int sequenceId = in.readInt();
            //填充,对齐的一字节
            in.readByte();
            //长度:四字节
            int length = in.readInt();
            //分配缓冲区
            byte[] bytes = new byte[length];
            //通过长度去读取多少个字节
            in.readBytes(bytes, 0, length);//内容
    //        if (messageType == 0){//如果是什么类型,就进行反序列化
    //            //。。。。
    //        }
            //将内容进行反序列化
            ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
            //将其转换为Message
            Message message = (Message) ois.readObject();
    
            //将内容进行打印
            log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
            log.debug("{}", message);
            //将netty解码后的结果存入对应的参数里面去,不存入的话接下来的handler将拿不到结果
            out.add(message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    测试

    编码
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.logging.LoggingHandler;
    
    public class TestMessageCodec {
        public static void main(String[] args) throws Exception {
            EmbeddedChannel channel = new EmbeddedChannel(
                    //打印日志
                    new LoggingHandler(),
                    new MessageCodec()
            );
            // encode
            LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
    
            channel.writeInbound(message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    解码
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.logging.LoggingHandler;
    
    public class TestMessageCodec {
        public static void main(String[] args) throws Exception {
            EmbeddedChannel channel = new EmbeddedChannel(
                    new LoggingHandler(),
                    new MessageCodec()
            );
            // encode
            LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
            // decode
            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
            new MessageCodec().encode(null, message, buf);
            //入站
            channel.writeInbound(buf );
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里插入图片描述

    黏包/半包问题解决
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.ByteBufAllocator;
    import io.netty.channel.embedded.EmbeddedChannel;
    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    import io.netty.handler.logging.LoggingHandler;
    
    public class TestMessageCodec {
        public static void main(String[] args) throws Exception {
            EmbeddedChannel channel = new EmbeddedChannel(
                    new LoggingHandler(),
                    //解决黏包/半包问题
                    //最大长度1024,长度字段偏移量12,长度字段长度四字节,长度字段不需要调整0,不需要去除前面的部分0
                    new LengthFieldBasedFrameDecoder(
                            1024, 12, 4, 0, 0),
                    new MessageCodec()
            );
            // encode
            LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123");
            // decode
            ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
            new MessageCodec().encode(null, message, buf);
    		//将buf进行切片为两部分
            ByteBuf s1 = buf.slice(0, 100);
            ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
            //writeInbound会在ByteBuf写完后,调用release方法,这时其引用记便变为0,
            //这时候ByteBuf占用的内存都被释放掉了
            s1.retain(); // 引用计数+1 = 2
            //模拟半包
            channel.writeInbound(s1); // release = 引用计数 - 1 = 1
            channel.writeInbound(s2);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    @Sharable什么情况下才能使用

    • 当 handler 不保存状态时,就可以安全地在多线程下被共享

    • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制, 如果使用了便会报下面的错
      在这里插入图片描述

    • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类

    正确的编解码代码

    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToMessageCodec;
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.util.List;
    
    @Slf4j
    @ChannelHandler.Sharable
    /**
     * 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
     */
    public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
        @Override
        public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
            ByteBuf out = ctx.alloc().buffer();
            // 1. 4 字节的魔数
            out.writeBytes(new byte[]{1, 2, 3, 4});
            // 2. 1 字节的版本,
            out.writeByte(1);
            // 3. 1 字节的序列化方式 jdk 0 , json 1
            out.writeByte(0);
            //out.writeByte(Config.getSerializerAlgorithm().ordinal());
            // 4. 1 字节的指令类型
            out.writeByte(msg.getMessageType());
            // 5. 4 个字节
            out.writeInt(msg.getSequenceId());
            // 无意义,对齐填充
            out.writeByte(0xff);
            // 6. 获取内容的字节数组
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream ();
            oos.wirteObject(msg);
            byte[] bytes = bos.toByteArray();
            //byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
            // 7. 长度
            out.writeInt(bytes.length);
            // 8. 写入内容
            out.writeBytes(bytes);
            outList.add(out);
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            int magicNum = in.readInt();
            byte version = in.readByte();
            //序列化算法
            byte serializerAlgorithm = in.readByte(); // 0 或 1
            byte messageType = in.readByte(); // 0,1,2...
            int sequenceId = in.readInt();
            in.readByte();
            int length = in.readInt();
            byte[] bytes = new byte[length];
            in.readBytes(bytes, 0, length);
    
            
            // 确定具体消息类型
            byte[] bytes = new byte[length];
            in.readBytes(bytes, 0, length);
            ObjectOutputStream oos = new ObjectOutputStream (new ByteArrayOutputStream(bytes));
            Message message = (Message)ois.readObject();
            /*
            // 找到反序列化算法
            Serializer.Algorithm algorithm = Serializer.Algorithm.values()[serializerAlgorithm];
            // 确定具体消息类型
            Class messageClass = Message.getMessageClass(messageType);
            Message message = algorithm.deserialize(messageClass, bytes);
            */
    //        log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
    //        log.debug("{}", message);
            out.add(message);
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    config
    public abstract class Config {
        static Properties properties;
        static {
            try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
                properties = new Properties();
                properties.load(in);
            } catch (IOException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
        public static int getServerPort() {
            String value = properties.getProperty("server.port");
            if(value == null) {
                return 8080;
            } else {
                return Integer.parseInt(value);
            }
        }
        public static Serializer.Algorithm getSerializerAlgorithm() {
            String value = properties.getProperty("serializer.algorithm");
            if(value == null) {
                return Serializer.Algorithm.Java;
            } else {
                return Serializer.Algorithm.valueOf(value);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    application.properties

    serializer.algorithm=Json
    cn.abc.server.service.HelloService=cn.itcast.server.service.HelloServiceImpl
    
    • 1
    • 2

    聊天室业务说明

    在这里插入图片描述

    Service

    /**
     * 用户管理接口
     */
    public interface UserService {
    
        /**
         * 登录
         * @param username 用户名
         * @param password 密码
         * @return 登录成功返回 true, 否则返回 false
         */
        boolean login(String username, String password);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    方便起见,这里直接使用map去存储用户信息

    public class UserServiceMemoryImpl implements UserService {
        private Map<String, String> allUserMap = new ConcurrentHashMap<>();
    
        {
            allUserMap.put("zhangsan", "123");
            allUserMap.put("lisi", "123");
            allUserMap.put("wangwu", "123");
            allUserMap.put("zhaoliu", "123");
            allUserMap.put("qianqi", "123");
        }
    
        @Override
        public boolean login(String username, String password) {
            String pass = allUserMap.get(username);
            if (pass == null) {
                return false;
            }
            return pass.equals(password);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    session

    /**
     * 会话管理接口
     */
    public interface Session {
    
        /**
         * 绑定会话
         * @param channel 哪个 channel 要绑定会话
         * @param username 会话绑定用户
         */
        void bind(Channel channel, String username);
    
        /**
         * 解绑会话
         * @param channel 哪个 channel 要解绑会话
         */
        void unbind(Channel channel);
    
        /**
         * 获取属性
         * @param channel 哪个 channel
         * @param name 属性名
         * @return 属性值
         */
        Object getAttribute(Channel channel, String name);
    
        /**
         * 设置属性
         * @param channel 哪个 channel
         * @param name 属性名
         * @param value 属性值
         */
        void setAttribute(Channel channel, String name, Object value);
    
        /**
         * 根据用户名获取 channel
         * @param username 用户名
         * @return channel
         */
        Channel getChannel(String username);
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    这里使用两个map集合去记录 用户名 跟 channel 的对应关系

    public class SessionMemoryImpl implements Session {
    
        private final Map<String, Channel> usernameChannelMap = new ConcurrentHashMap<>();
        private final Map<Channel, String> channelUsernameMap = new ConcurrentHashMap<>();
        private final Map<Channel,Map<String,Object>> channelAttributesMap = new ConcurrentHashMap<>();
    
        @Override
        public void bind(Channel channel, String username) {
            usernameChannelMap.put(username, channel);
            channelUsernameMap.put(channel, username);
            channelAttributesMap.put(channel, new ConcurrentHashMap<>());
        }
    
        @Override
        public void unbind(Channel channel) {
            String username = channelUsernameMap.remove(channel);
            usernameChannelMap.remove(username);
            channelAttributesMap.remove(channel);
        }
    
        @Override
        public Object getAttribute(Channel channel, String name) {
            return channelAttributesMap.get(channel).get(name);
        }
    
        @Override
        public void setAttribute(Channel channel, String name, Object value) {
            channelAttributesMap.get(channel).put(name, value);
        }
    
        @Override
        public Channel getChannel(String username) {
            return usernameChannelMap.get(username);
        }
    
        @Override
        public String toString() {
            return usernameChannelMap.toString();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    
    /**
     * 聊天组会话管理接口
     */
    public interface GroupSession {
    
        /**
         * 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
         * @param name 组名
         * @param members 成员
         * @return 成功时返回组对象, 失败返回 null
         */
        Group createGroup(String name, Set<String> members);
    
        /**
         * 加入聊天组
         * @param name 组名
         * @param member 成员名
         * @return 如果组不存在返回 null, 否则返回组对象
         */
        Group joinMember(String name, String member);
    
        /**
         * 移除组成员
         * @param name 组名
         * @param member 成员名
         * @return 如果组不存在返回 null, 否则返回组对象
         */
        Group removeMember(String name, String member);
    
        /**
         * 移除聊天组
         * @param name 组名
         * @return 如果组不存在返回 null, 否则返回组对象
         */
        Group removeGroup(String name);
    
        /**
         * 获取组成员
         * @param name 组名
         * @return 成员集合, 如果群不存在或没有成员会返回 empty set
         */
        Set<String> getMembers(String name);
    
        /**
         * 获取组成员的 channel 集合, 只有在线的 channel 才会返回
         * @param name 组名
         * @return 成员 channel 集合
         */
        List<Channel> getMembersChannel(String name);
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    SessionFactory

    public abstract class SessionFactory {
    
        private static Session session = new SessionMemoryImpl();
    
        public static Session getSession() {
            return session;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    GroupSession

    /**
     * 聊天组会话管理接口
     */
    public interface GroupSession {
    
        /**
         * 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
         * @param name 组名
         * @param members 成员
         * @return 成功时返回组对象, 失败返回 null
         */
        Group createGroup(String name, Set<String> members);
    
        /**
         * 加入聊天组
         * @param name 组名
         * @param member 成员名
         * @return 如果组不存在返回 null, 否则返回组对象
         */
        Group joinMember(String name, String member);
    
        /**
         * 移除组成员
         * @param name 组名
         * @param member 成员名
         * @return 如果组不存在返回 null, 否则返回组对象
         */
        Group removeMember(String name, String member);
    
        /**
         * 移除聊天组
         * @param name 组名
         * @return 如果组不存在返回 null, 否则返回组对象
         */
        Group removeGroup(String name);
    
        /**
         * 获取组成员
         * @param name 组名
         * @return 成员集合, 如果群不存在或没有成员会返回 empty set
         */
        Set<String> getMembers(String name);
    
        /**
         * 获取组成员的 channel 集合, 只有在线的 channel 才会返回
         * @param name 组名
         * @return 成员 channel 集合
         */
        List<Channel> getMembersChannel(String name);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    GroupSessionMemoryImpl

    实现GroupSession里面的方法

    public class GroupSessionMemoryImpl implements GroupSession {
        private final Map<String, Group> groupMap = new ConcurrentHashMap<>();
    
        @Override
        public Group createGroup(String name, Set<String> members) {
            Group group = new Group(name, members);
            return groupMap.putIfAbsent(name, group);
        }
    
        @Override
        public Group joinMember(String name, String member) {
            return groupMap.computeIfPresent(name, (key, value) -> {
                value.getMembers().add(member);
                return value;
            });
        }
    
        @Override
        public Group removeMember(String name, String member) {
            return groupMap.computeIfPresent(name, (key, value) -> {
                value.getMembers().remove(member);
                return value;
            });
        }
    
        @Override
        public Group removeGroup(String name) {
            return groupMap.remove(name);
        }
    
        @Override
        public Set<String> getMembers(String name) {
            return groupMap.getOrDefault(name, Group.EMPTY_GROUP).getMembers();
        }
    
        @Override
        public List<Channel> getMembersChannel(String name) {
            return getMembers(name).stream()
                    .map(member -> SessionFactory.getSession().getChannel(member))
                    .filter(Objects::nonNull)
                    .collect(Collectors.toList());
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    ProcotolFrameDecoder

    处理黏包和半包的代码一般是不会进行变化的,所以将这种处理的方法抽取出来,在这里提供一种无参构造

    import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
    
    public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
    
        public ProcotolFrameDecoder() {
            this(1024, 12, 4, 0, 0);
        }
    
        public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    过程代码

    连接建立后,就向服务器发送一个登录的请求(发一个登录消息),服务器去验证这个消息是否正确

    通过多个客户端,连接同一个服务器端,对二者的客户端进行发送消息,进行通信

    客户端代码

    @Slf4j
    public class ChatClient {
        public static void main(String[] args) {
            NioEventLoopGroup group = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
            //使两个线程之间进行通信
            CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
            //登录的状态
            AtomicBoolean LOGIN = new AtomicBoolean(false);
            AtomicBoolean EXIT = new AtomicBoolean(false);
            Scanner scanner = new Scanner(System.in);
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.channel(NioSocketChannel.class);
                bootstrap.group(group);
                bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProcotolFrameDecoder());
                        ch.pipeline().addLast(MESSAGE_CODEC);
                        // 用来判断是不是 读空闲时间过长,或 写空闲时间过长
                        // 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
                        ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
                        // ChannelDuplexHandler 可以同时作为入站和出站处理器
                        ch.pipeline().addLast(new ChannelDuplexHandler() {
                            // 用来触发特殊事件
                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
                                IdleStateEvent event = (IdleStateEvent) evt;
                                // 触发了写空闲事件
                                if (event.state() == IdleState.WRITER_IDLE) {
    //                                log.debug("3s 没有写数据了,发送一个心跳包");
                                    ctx.writeAndFlush(new PingMessage());
                                }
                            }
                        });
                        ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
                            // 接收响应消息
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                log.debug("msg: {}", msg);
                                if ((msg instanceof LoginResponseMessage)) {
                                    LoginResponseMessage response = (LoginResponseMessage) msg;
                                    if (response.isSuccess()) {
                                        // 如果登录成功, 设置登录状态为true
                                        LOGIN.set(true);
                                    }
                                    // 使记数减1:唤醒 system in 线程
                                    WAIT_FOR_LOGIN.countDown();
                                }
                            }
    
                            // 在连接建立后触发 active 事件
                            @Override
                            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                // 负责接收用户在控制台的输入,负责向服务器发送各种消息
                                /*
                                创建一个新线程,不创建新线程的话,其是在Nio的线程里面执行,会阻塞
                                 */
                                new Thread(() -> {
                                    System.out.println("请输入用户名:");
                                    String username = scanner.nextLine();
                                    if(EXIT.get()){
                                        return;
                                    }
                                    System.out.println("请输入密码:");
                                    String password = scanner.nextLine();
                                    if(EXIT.get()){
                                        return;
                                    }
                                    // 构造消息对象
                                    LoginRequestMessage message = new LoginRequestMessage(username, password);
                                    System.out.println(message);
                                    // 使用ctx对象,发送消息
                                    ctx.writeAndFlush(message);
                                    System.out.println("等待后续操作...");
                                    try {
                                        //阻塞线程,直到记数减为0(当channelRead里调用WAIT_FOR_LOGIN.countDown()时,该线程将被唤醒)
                                        WAIT_FOR_LOGIN.await();
                                    } catch (InterruptedException e) {
                                        e.printStackTrace();
                                    }
                                    // 如果登录失败
                                    if (!LOGIN.get()) {
                                        //关闭channel
                                        ctx.channel().close();
                                        return;
                                    }
                                    //如果登录没有失败,则向下运行
                                    while (true) {
                                        System.out.println("==================================");
                                        System.out.println("send [username] [content]");
                                        System.out.println("gsend [group name] [content]");
                                        System.out.println("gcreate [group name] [m1,m2,m3...]");
                                        System.out.println("gmembers [group name]");
                                        System.out.println("gjoin [group name]");
                                        System.out.println("gquit [group name]");
                                        System.out.println("quit");
                                        System.out.println("==================================");
                                        String command = null;
                                        try {
                                            command = scanner.nextLine();
                                        } catch (Exception e) {
                                            break;
                                        }
                                        if(EXIT.get()){
                                            return;
                                        }
                                        String[] s = command.split(" ");
                                        switch (s[0]){
                                            case "send":
                                                ctx.writeAndFlush(new ChatRequestMessage(username, s[1], s[2]));
                                                break;
                                            case "gsend":
                                                ctx.writeAndFlush(new GroupChatRequestMessage(username, s[1], s[2]));
                                                break;
                                            case "gcreate":
                                                //组里面的成员
                                                Set<String> set = new HashSet<>(Arrays.asList(s[2].split(",")));
                                                set.add(username); // 加入自己
                                                ctx.writeAndFlush(new GroupCreateRequestMessage(s[1], set));
                                                break;
                                            case "gmembers":
                                                ctx.writeAndFlush(new GroupMembersRequestMessage(s[1]));
                                                break;
                                            case "gjoin":
                                                ctx.writeAndFlush(new GroupJoinRequestMessage(username, s[1]));
                                                break;
                                            case "gquit":
                                                ctx.writeAndFlush(new GroupQuitRequestMessage(username, s[1]));
                                                break;
                                            case "quit":
                                                ctx.channel().close();
                                                return;
                                        }
                                    }
                                }, "system in").start();
                            }
    
                            // 在连接断开时触发
                            @Override
                            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                                log.debug("连接已经断开,按任意键退出..");
                                EXIT.set(true);
                            }
    
                            // 在出现异常时触发
                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                log.debug("连接已经断开,按任意键退出..{}", cause.getMessage());
                                EXIT.set(true);
                            }
                        });
                    }
                });
                Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
                channel.closeFuture().sync();
            } catch (Exception e) {
                log.error("client error", e);
            } finally {
                group.shutdownGracefully();
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166

    服务器端代码

    @Slf4j
    public class ChatServer {
        public static void main(String[] args) {
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
            LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
            MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
            LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
            ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
            GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
            GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
            GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();
            GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
            GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
            QuitHandler QUIT_HANDLER = new QuitHandler();
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.channel(NioServerSocketChannel.class);
                serverBootstrap.group(boss, worker);
                serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline().addLast(new ProcotolFrameDecoder());
                        ch.pipeline().addLast(LOGGING_HANDLER);
                        ch.pipeline().addLast(MESSAGE_CODEC);
                        // 用来判断是不是 读空闲时间过长,或 写空闲时间过长
                        // 5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
                        ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
                        // ChannelDuplexHandler 可以同时作为入站和出站处理器
                        ch.pipeline().addLast(new ChannelDuplexHandler() {
                            // 用来触发特殊事件
                            @Override
                            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
                                IdleStateEvent event = (IdleStateEvent) evt;
                                // 触发了读空闲事件
                                if (event.state() == IdleState.READER_IDLE) {
                                    log.debug("已经 5s 没有读到数据了");
                                    ctx.channel().close();
                                }
                            }
                        });
                        ch.pipeline().addLast(LOGIN_HANDLER);
                        ch.pipeline().addLast(CHAT_HANDLER);
                        ch.pipeline().addLast(GROUP_CREATE_HANDLER);
                        ch.pipeline().addLast(GROUP_JOIN_HANDLER);
                        ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);
                        ch.pipeline().addLast(GROUP_QUIT_HANDLER);
                        ch.pipeline().addLast(GROUP_CHAT_HANDLER);
                        ch.pipeline().addLast(QUIT_HANDLER);
                    }
                });
                Channel channel = serverBootstrap.bind(8080).sync().channel();
                channel.closeFuture().sync();
            } catch (InterruptedException e) {
                log.error("server error", e);
            } finally {
                boss.shutdownGracefully();
                worker.shutdownGracefully();
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    handler

    将服务器端的处理handler抽取处理,避免服务器端里面的代码过多

    LoginRequestMessageHandler

    登录的信息处理

    
    @ChannelHandler.Sharable
    public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
            String username = msg.getUsername();
            String password = msg.getPassword();
            boolean login = UserServiceFactory.getUserService().login(username, password);
            LoginResponseMessage message;
            if (login) {
                SessionFactory.getSession().bind(ctx.channel(), username);
                message = new LoginResponseMessage(true, "登录成功");
            } else {
                message = new LoginResponseMessage(false, "用户名或密码不正确");
            }
            ctx.writeAndFlush(message);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    ChatRequestMessageHandler

    聊天的信息处理

    
    @ChannelHandler.Sharable
    public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
            String to = msg.getTo();
            Channel channel = SessionFactory.getSession().getChannel(to);
            // 在线
            if (channel != null) {
                channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
            }
            // 不在线
            else {
                ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或者不在线"));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    GroupCreateRequestMessageHandler

    创建群

    
    @ChannelHandler.Sharable
    public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
            String groupName = msg.getGroupName();
            //成员
            Set<String> members = msg.getMembers();
            // 群管理器
            GroupSession groupSession = GroupSessionFactory.getGroupSession();
            Group group = groupSession.createGroup(groupName, members);
            //创建成功
            if (group == null) {
                // 发送成功消息给群的创建者
                ctx.writeAndFlush(new GroupCreateResponseMessage(true, groupName + "创建成功"));
                // 发送拉群消息:给每一个群员发送消息
                //得到该群里面的所有在线人员
                List<Channel> channels = groupSession.getMembersChannel(groupName);
                //遍历channel集合,对这些channel发送消息
                for (Channel channel : channels) {
                    channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已被拉入" + groupName));
                }
            } else {
            	//给创建者发送失败消息
                ctx.writeAndFlush(new GroupCreateResponseMessage(false, groupName + "已经存在"));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    GroupJoinRequestMessageHandler

    @ChannelHandler.Sharable
    public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
            Group group = GroupSessionFactory.getGroupSession().joinMember(msg.getGroupName(), msg.getUsername());
            if (group != null) {
                ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群加入成功"));
            } else {
                ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    GroupMembersRequestMessageHandler

    @ChannelHandler.Sharable
    public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
            Set<String> members = GroupSessionFactory.getGroupSession()
                    .getMembers(msg.getGroupName());
            ctx.writeAndFlush(new GroupMembersResponseMessage(members));
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    GroupQuitRequestMessageHandler

    @ChannelHandler.Sharable
    public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
            Group group = GroupSessionFactory.getGroupSession().removeMember(msg.getGroupName(), msg.getUsername());
            if (group != null) {
                ctx.writeAndFlush(new GroupJoinResponseMessage(true, "已退出群" + msg.getGroupName()));
            } else {
                ctx.writeAndFlush(new GroupJoinResponseMessage(true, msg.getGroupName() + "群不存在"));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    GroupChatRequestMessageHandler

    群组里面发送信息

    
    @ChannelHandler.Sharable
    public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
            List<Channel> channels = GroupSessionFactory.getGroupSession()
                    .getMembersChannel(msg.getGroupName());
    		//遍历所有群成员的channel,对这些channel发送消息
            for (Channel channel : channels) {
            	//发消息
                channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    QuitHandler

    处理退出事件

    1. 异常退出
    2. 正常退出
    @Slf4j
    @ChannelHandler.Sharable
    public class QuitHandler extends ChannelInboundHandlerAdapter {
    
        // 当连接断开时触发 inactive 事件
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            SessionFactory.getSession().unbind(ctx.channel());
            log.debug("{} 已经断开", ctx.channel());
        }
    
        // 当出现异常时触发
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            SessionFactory.getSession().unbind(ctx.channel());
            log.debug("{} 已经异常断开 异常是{}", ctx.channel(), cause.getMessage());
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    空闲检测: 连接假死(IdleStateHandler

    原因

    • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源。
    • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,就这么一直耗着
    • 应用程序线程阻塞,无法进行数据读写

    问题

    • 假死的连接占用的资源不能自动释放
    • 向假死的连接发送数据,得到的反馈是发送超时

    服务器端解决

    • 怎么判断客户端连接是否假死呢?如果能收到客户端数据,说明没有假死。因此策略就可以定为,每隔一段时间就检查这段时间内是否接收到客户端数据,没有就可以判定为连接假死
    /*
    	参数1:检查读的空闲时间超过对应的秒数(默认是秒)
    	参数2:检查写的空闲时间超过对应的秒数
    	参数3:读写都空闲的时间超过对应的秒数
    	参数4:时间的单位
    */
    /*
    下面这个设置的意思是:
    	5秒钟没有收到客户端发过来的数据(这时候就认为读的空闲时间太长)
    	不关心其他的空闲
    
    5s 内如果没有收到 channel 的数据,会触发一个 IdleState#READER_IDLE 事件
    
    */
    new IdleStateHandler(5, 0, 0);
    // ChannelDuplexHandler 可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {
        // 用来触发特殊事件: 如读空闲事件
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
        	//获取 IdleState 事件
            IdleStateEvent event = (IdleStateEvent) evt;
            // 触发了读空闲事件
            if (event.state() == IdleState.READER_IDLE) {
                log.debug("已经 5s 没有读到数据了");
                ctx.channel().close();
            }
        }
    });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    心跳包

    客户端定时心跳

    • 客户端可以定时向服务器端发送数据,只要这个时间间隔小于服务器定义的空闲检测的时间间隔,那么就能防止前面提到的误判,客户端可以定义如下心跳处理器
    
    // 用来判断是不是 读空闲时间过长,或 写空闲时间过长
    // 3s 内如果没有向服务器写数据,会触发一个 IdleState#WRITER_IDLE 事件
    ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
    // ChannelDuplexHandler 可以同时作为入站和出站处理器
    ch.pipeline().addLast(new ChannelDuplexHandler() {
        // 用来触发特殊事件
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception{
            IdleStateEvent event = (IdleStateEvent) evt;
            // 触发了写空闲事件
            if (event.state() == IdleState.WRITER_IDLE) {
                //                                log.debug("3s 没有写数据了,发送一个心跳包");
                ctx.writeAndFlush(new PingMessage());
            }
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  • 相关阅读:
    jQuery常用API--选择器
    RabbitMQ快速入门笔记
    拓扑关系如何管理?
    RK3568开发笔记(十一):开发版buildroot固件移植一个ffmpeg播放rtsp的播放器Demo
    《有钱人和你想的不一样》书籍分享
    【最新】如何在CSDN个人主页左侧栏添加二维码?侧边推广怎么弄?
    P2P分布式搜索引擎YaCy
    《高效便捷,探索快递柜系统架构的智慧之路》
    Redis数据结构之quicklist
    【C++】红黑树的模拟实现
  • 原文地址:https://blog.csdn.net/yyuggjggg/article/details/126495440