• Netty深入浅出Java网络编程学习笔记(二) Netty进阶应用篇


    目录

    四、应用

    1、粘包与半包

    现象分析

    粘包

    半包

    本质

    解决方案

    短链接

    定长解码器

    行解码器

    长度字段解码器——LTC

    2、协议设计与解析

    协议的作用

    Redis协议

    HTTP协议

    自定义协议

    组成要素

    编码器与解码器

    编写测试类

     @Sharable注解

    自定义编解码器能否使用@Sharable注解

    3、在线聊天室

    聊天室业务

    用户登录接口

    用户会话接口

    群聊会话接口

    整体结构

    客户端代码结构

    服务器代码结构

    登录

    客户端代码

    服务器代码

    运行结果

    单聊

    群聊

    创建群聊

    群聊聊天

    加入群聊

    退出

    查看群聊成员

    退出聊天室

    连接假死

    解决方法



    四、应用

    1、粘包与半包

    粘包和半包问题是数据传输中比较常见的问题,所谓的粘包问题是指数据在传输时,在一条消息中读取到了另一条消息的部分数据,这种现象就叫做粘包。比如发送了两条消息,分别为“ABC”和“DEF”,那么正常情况下接收端也应该收到两条消息“ABC”和“DEF”,但接收端却收到的是“ABCD”,像这种情况就叫做粘包,半包问题是指接收端只收到了部分数据,而非完整的数据的情况就叫做半包。比如发送了一条消息是“ABC”,而接收端却收到的是“AB”和“C”两条信息,这种情况就叫做半包

    只要是TCP协议的网络交互都有粘包和半包问题,因为TCP的传输是基于字节的传输方式,数据是以字节的形式进行传输的,并没有明确的边界。因此,在传输过程中,TCP没有办法直接识别数据包的边界,并且在流量控制下,TCP的字节传输还不稳定,当发送方连续发送多个数据包时,这些数据包可能会在网络传输的过程中合并或拆分,导致粘包和半包问题的出现,而UDP则没有这个问题,因为UDP的传输是基于数据报的

    现象分析

    粘包

    现象

    • 发送 abc def,接收 abcdef

    原因

    • 应用层
      • 接收方 ByteBuf 设置太大(Netty 默认 1024)
    • 传输层-网络层
      • 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
      • Nagle 算法:会造成粘包

    Nagle算法的原理如下:当TCP发送方需要发送一个小数据包时,Nagle算法会将这个数据包缓存起来,不立即发送。然后,TCP发送方会继续等待其他数据,直到以下两个条件中的任意一个满足后再发送数据:

    1. 接收到之前发送的数据的确认ACK。
    2. 发送方的发送缓冲区中的数据量达到一定的阈值(一般是MSS,即最大报文长度)。

    之所以要缓存起来是因为一个TCP的请求都是要进行数据报头的添加,而IP的报头+TCP的报头 = 40字节,哪怕你只是发送了1个字节的数据,也会被封装为一个41字节的传输内容,那这样子粘包现象就很严重了,为此解决方法就是,缓存多点字节再一起发过来。然而,当发送方连续发送多个小数据包时,这些数据包可能会在网络传输的过程中被合并成一个大数据包,导致粘包问题的出现。这是因为Nagle算法本身不考虑数据包的边界,只是简单地将小数据包缓存起来,直到条件满足后发送。

    半包

    现象

    • 发送 abcdef,接收 abc def

    原因

    • 应用层
      • 接收方 ByteBuf 小于实际发送数据量
    • 传输层-网络层
      • 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
    • 数据链路层
      • MSS 限制:当发送的数据超过 MSS (最大报文长度)限制后,会将数据切分发送,就会造成半包
    本质

    发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界


    解决方案

    解决方案的思路和我这篇文章的处理方式类似,可以先看一下这个大概思路icon-default.png?t=N7T8https://blog.csdn.net/weixin_73077810/article/details/131843387

    短链接长连接是描述客户端与服务器之间TCP连接持续时间的概念。

    • 短链接:短链接通常指的是一次性的临时连接。在短链接中,客户端与服务器建立连接、交换数据后,连接就会关闭。在每次通信之前,需要重新建立连接,进行握手和协商。

    短链接的优点是简单、轻量,适用于临时的低频率的通信。但在高并发或频繁通信的场景中,频繁的连接建立和关闭会增加网络开销和延迟。

    • 长连接:长连接指的是客户端与服务器之间持久的TCP连接。在长连接中,连接一经建立,客户端和服务器可以多次长时间地进行双向通信。在连接建立后,数据可以实时、便捷地传输。

    长连接的优点是减少连接建立和断开的开销,节省网络资源,减少延迟,提高通信效率。长连接常用于需要实时交互的应用,如即时通信、实时数据传输等。

    需要注意的是,长连接可能会带来一些管理上的挑战。服务器需要维护大量的长连接,消耗资源,需要适当管理连接数和超时机制,防止资源浪费和死连接问题。

    短链接

            客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

    客户端代码改进

    修改channelActive方法

    1. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    2. log.debug("sending...");
    3. ByteBuf buffer = ctx.alloc().buffer(16);
    4. buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
    5. ctx.writeAndFlush(buffer);
    6. // 使用短链接,每次发送完毕后就断开连接
    7. ctx.channel().close();
    8. }

    定长解码器

    客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度

    服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码,具体使用方法如下

    ch.pipeline().addLast(new FixedLengthFrameDecoder(16));

    客户端代码

    客户端发送数据的代码如下

    1. // 约定最大长度为16
    2. final int maxLength = 16;
    3. // 被发送的数据
    4. char c = 'a';
    5. // 向服务器发送10个报文
    6. for (int i = 0; i < 10; i++) {
    7. ByteBuf buffer = ctx.alloc().buffer(maxLength);
    8. // 定长byte数组,未使用部分会以0进行填充
    9. byte[] bytes = new byte[maxLength];
    10. // 生成长度为0~15的数据
    11. for (int j = 0; j < (int)(Math.random()*(maxLength-1)); j++) {
    12. bytes[j] = (byte) c;
    13. }
    14. buffer.writeBytes(bytes);
    15. c++;
    16. // 将数据发送给服务器
    17. ctx.writeAndFlush(buffer);
    18. }

    服务器代码

    使用FixedLengthFrameDecoder对粘包数据进行拆分,该handler需要添加在LoggingHandler之前,保证数据被打印时已被拆分

    1. // 通过定长解码器对粘包数据进行拆分
    2. ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
    3. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

    行解码器

    行解码器的是通过分隔符对数据进行拆分来解决粘包半包问题的

    • 可以通过LineBasedFrameDecoder(int maxLength)来拆分以换行符(\n  or  \r\n)为分隔符的数据
    • 可以通过DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)指定通过什么分隔符来拆分数据(可以传入多个分隔符)

    两种解码器都需要传入数据的最大长度,若超出最大长度,会抛出TooLongFrameException异常

    以换行符 \n 为分隔符

    客户端代码

    1. // 约定最大长度为 64
    2. final int maxLength = 64;
    3. // 被发送的数据
    4. char c = 'a';
    5. for (int i = 0; i < 10; i++) {
    6. ByteBuf buffer = ctx.alloc().buffer(maxLength);
    7. // 生成长度为0~62的数据
    8. Random random = new Random();
    9. StringBuilder sb = new StringBuilder();
    10. for (int j = 0; j < (int)(random.nextInt(maxLength-2)); j++) {
    11. sb.append(c);
    12. }
    13. // 数据以 \n 结尾
    14. sb.append("\n");
    15. buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
    16. c++;
    17. // 将数据发送给服务器
    18. ctx.writeAndFlush(buffer);
    19. }

    服务器代码

    1. // 通过行解码器对粘包数据进行拆分,以 \n 为分隔符
    2. // 需要指定最大长度
    3. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64));
    4. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

    以自定义分隔符 \c 为分隔符

    客户端代码

    1. // 数据以 \c 结尾
    2. sb.append("\\c");
    3. buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));

    服务器代码

    1. // 将分隔符放入ByteBuf中
    2. ByteBuf bufSet = ch.alloc().buffer().writeBytes("\\c".getBytes(StandardCharsets.UTF_8));
    3. // 通过行解码器对粘包数据进行拆分,以 \c 为分隔符
    4. ch.pipeline().addLast(new DelimiterBasedFrameDecoder(64, ch.alloc().buffer().writeBytes(bufSet)));
    5. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));

    长度字段解码器——LTC

    在传送数据时可以在数据中添加一个用于表示有用数据长度的字段,在解码时读取出这个用于表明长度的字段,同时读取其他相关参数,即可知道最终需要的数据是什么样子的

    LengthFieldBasedFrameDecoder解码器可以提供更为丰富的拆分方法,其构造方法有五个参数

    1. public LengthFieldBasedFrameDecoder(
    2. int maxFrameLength,
    3. int lengthFieldOffset, int lengthFieldLength,
    4. int lengthAdjustment, int initialBytesToStrip)

    参数解析

    • maxFrameLength 数据最大长度

      • 表示数据的最大长度(包括附加信息、长度标识等内容)
    • lengthFieldOffset 数据长度标识的起始偏移量

      • 用于指明数据第几个字节开始是用于标识有用字节长度的,因为前面可能还有其他附加信息
    • lengthFieldLength 数据长度标识所占字节数(用于指明有用数据的长度)

      • 数据中用于表示有用数据长度的标识所占的字节数
    • lengthAdjustment 长度表示为有用数据的偏移量

      • 用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
    • initialBytesToStrip 数据读取起点

      • 读取起点,不读取 0 ~ initialBytesToStrip 之间的数据

    参数图解

    使用

    通过 EmbeddedChannel 对 handler 进行测试

    1. public class EncoderStudy {
    2. public static void main(String[] args) {
    3. // 模拟服务器
    4. // 使用EmbeddedChannel测试handler
    5. EmbeddedChannel channel = new EmbeddedChannel(
    6. // 数据最大长度为1KB,长度标识前后各有1个字节的附加信息,长度标识长度为4个字节(int)
    7. new LengthFieldBasedFrameDecoder(1024, 1, 4, 1, 0),
    8. new LoggingHandler(LogLevel.DEBUG)
    9. );
    10. // 模拟客户端,写入数据
    11. ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    12. send(buffer, "Hello");
    13. channel.writeInbound(buffer);
    14. send(buffer, "World");
    15. channel.writeInbound(buffer);
    16. }
    17. private static void send(ByteBuf buf, String msg) {
    18. // 得到数据的长度
    19. int length = msg.length();
    20. byte[] bytes = msg.getBytes(StandardCharsets.UTF_8);
    21. // 将数据信息写入buf
    22. // 写入长度标识前的其他信息
    23. buf.writeByte(0xCA);
    24. // 写入数据长度标识
    25. buf.writeInt(length);
    26. // 写入长度标识后的其他信息
    27. buf.writeByte(0xFE);
    28. // 写入具体的数据
    29. buf.writeBytes(bytes);
    30. }
    31. }

    运行结果

    1. 146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B
    2. +-------------------------------------------------+
    3. | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
    4. +--------+-------------------------------------------------+----------------+
    5. |00000000| ca 00 00 00 05 fe 48 65 6c 6c 6f |......Hello |
    6. +--------+-------------------------------------------------+----------------+
    7. 146 [main] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 11B
    8. +-------------------------------------------------+
    9. | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
    10. +--------+-------------------------------------------------+----------------+
    11. |00000000| ca 00 00 00 05 fe 57 6f 72 6c 64 |......World |
    12. +--------+-------------------------------------------------+----------------+

    2、协议设计与解析

    协议的作用

    TCP/IP 中消息传输基于字节流的方式,没有边界

    协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

    Redis协议

    如果我们要向Redis服务器发送一条set name Nyima的指令,需要遵守如下协议

    1. // 该指令一共有3部分,每条指令之后都要添加回车与换行符
    2. *3\r\n
    3. // 第一个指令的长度是3
    4. $3\r\n
    5. // 第一个指令是set指令
    6. set\r\n
    7. // 下面的指令以此类推
    8. $4\r\n
    9. name\r\n
    10. $5\r\n
    11. Nyima\r\n

    客户端代码如下

    1. public class RedisClient {
    2. static final Logger log = LoggerFactory.getLogger(StudyServer.class);
    3. public static void main(String[] args) {
    4. NioEventLoopGroup group = new NioEventLoopGroup();
    5. try {
    6. ChannelFuture channelFuture = new Bootstrap()
    7. .group(group)
    8. .channel(NioSocketChannel.class)
    9. .handler(new ChannelInitializer() {
    10. @Override
    11. protected void initChannel(SocketChannel ch) {
    12. // 打印日志
    13. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
    14. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
    15. @Override
    16. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    17. // 回车与换行符
    18. final byte[] LINE = {'\r','\n'};
    19. // 获得ByteBuf
    20. ByteBuf buffer = ctx.alloc().buffer();
    21. // 连接建立后,向Redis中发送一条指令,注意添加回车与换行
    22. // set name Nyima
    23. buffer.writeBytes("*3".getBytes());
    24. buffer.writeBytes(LINE);
    25. buffer.writeBytes("$3".getBytes());
    26. buffer.writeBytes(LINE);
    27. buffer.writeBytes("set".getBytes());
    28. buffer.writeBytes(LINE);
    29. buffer.writeBytes("$4".getBytes());
    30. buffer.writeBytes(LINE);
    31. buffer.writeBytes("name".getBytes());
    32. buffer.writeBytes(LINE);
    33. buffer.writeBytes("$5".getBytes());
    34. buffer.writeBytes(LINE);
    35. buffer.writeBytes("Nyima".getBytes());
    36. buffer.writeBytes(LINE);
    37. ctx.writeAndFlush(buffer);
    38. }
    39. });
    40. }
    41. })
    42. .connect(new InetSocketAddress("localhost", 6379));
    43. channelFuture.sync();
    44. // 关闭channel
    45. channelFuture.channel().close().sync();
    46. } catch (InterruptedException e) {
    47. e.printStackTrace();
    48. } finally {
    49. // 关闭group
    50. group.shutdownGracefully();
    51. }
    52. }
    53. }

    Redis中查询执行结果


    HTTP协议

    HTTP协议在请求行请求头中都有很多的内容,自己实现较为困难,可以使用HttpServerCodec作为服务器端的解码器与编码器,来处理HTTP请求

    1. // HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
    2. // Codec(CodeCombine) 一般代表该类既作为 编码器 又作为 解码器
    3. public final class HttpServerCodec extends CombinedChannelDuplexHandler
    4. implements HttpServerUpgradeHandler.SourceCodec

     服务器代码

    1. public class HttpServer {
    2. static final Logger log = LoggerFactory.getLogger(StudyServer.class);
    3. public static void main(String[] args) {
    4. NioEventLoopGroup group = new NioEventLoopGroup();
    5. new ServerBootstrap()
    6. .group(group)
    7. .channel(NioServerSocketChannel.class)
    8. .childHandler(new ChannelInitializer() {
    9. @Override
    10. protected void initChannel(SocketChannel ch) {
    11. ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
    12. // 作为服务器,使用 HttpServerCodec 作为编码器与解码器
    13. ch.pipeline().addLast(new HttpServerCodec());
    14. // 服务器只处理HTTPRequest,具体的限定取决于泛型
    15. ch.pipeline().addLast(new SimpleChannelInboundHandler() {
    16. @Override
    17. protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
    18. // 获得请求uri
    19. log.debug(msg.uri());
    20. // 获得完整响应,设置版本号与状态码
    21. DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
    22. // 设置响应内容
    23. byte[] bytes = "

      Hello, World!

      "
      .getBytes(StandardCharsets.UTF_8);
    24. // 设置响应体长度,避免浏览器一直接收响应内容
    25. response.headers().setInt(CONTENT_LENGTH, bytes.length);
    26. // 设置响应体
    27. response.content().writeBytes(bytes);
    28. // 写回响应
    29. ctx.writeAndFlush(response);
    30. }
    31. });
    32. }
    33. })
    34. .bind(8080);
    35. }
    36. }

    服务器负责处理请求并响应浏览器。所以只需要处理HTTP请求即可

    1. // 服务器只处理HTTPRequest
    2. ch.pipeline().addLast(new SimpleChannelInboundHandler()

    获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为获得CONTENT_LENGTH而一直空转,需要添加CONTENT_LENGTH字段,表明响应体中数据的具体长度

    1. // 获得完整响应,设置版本号与状态码
    2. DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
    3. // 设置响应内容
    4. byte[] bytes = "

      Hello, World!

      "
      .getBytes(StandardCharsets.UTF_8);
    5. // 设置响应体长度,避免浏览器一直接收响应内容
    6. response.headers().setInt(CONTENT_LENGTH, bytes.length);
    7. // 设置响应体
    8. response.content().writeBytes(bytes);

    运行结果

    浏览器

    自定义协议

    组成要素
    • 魔数:用来在第一时间判定接收的数据是否为无效数据包
    • 版本号:可以支持协议的升级
    • 序列化算法:消息正文到底采用哪种序列化反序列化方式
      • 如:json、protobuf、hessian、jdk
    • 指令类型:是登录、注册、单聊、群聊… 跟业务相关
    • 请求序号:为了双工通信,提供异步能力
    • 正文长度
    • 消息正文

    编码器与解码器
    1. public class MessageCodec extends ByteToMessageCodec {
    2. @Override
    3. protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
    4. // 设置魔数 4个字节
    5. out.writeBytes(new byte[]{'N','Y','I','M'});
    6. // 设置版本号 1个字节
    7. out.writeByte(1);
    8. // 设置序列化方式 1个字节
    9. out.writeByte(1);
    10. // 设置指令类型 1个字节
    11. out.writeByte(msg.getMessageType());
    12. // 设置请求序号 4个字节
    13. out.writeInt(msg.getSequenceId());
    14. // 为了补齐为2的次幂个字节,填充1个字节的数据,满足为16字节
    15. out.writeByte(0xff);
    16. // 获得序列化后的msg
    17. ByteArrayOutputStream bos = new ByteArrayOutputStream();
    18. ObjectOutputStream oos = new ObjectOutputStream(bos);
    19. oos.writeObject(msg);
    20. byte[] bytes = bos.toByteArray();
    21. // 获得并设置正文长度 长度用4个字节标识
    22. out.writeInt(bytes.length);
    23. // 设置消息正文
    24. out.writeBytes(bytes);
    25. }
    26. @Override
    27. protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
    28. // 获取魔数
    29. int magic = in.readInt();
    30. // 获取版本号
    31. byte version = in.readByte();
    32. // 获得序列化方式
    33. byte seqType = in.readByte();
    34. // 获得指令类型
    35. byte messageType = in.readByte();
    36. // 获得请求序号
    37. int sequenceId = in.readInt();
    38. // 移除补齐字节
    39. in.readByte();
    40. // 获得正文长度
    41. int length = in.readInt();
    42. // 获得正文
    43. byte[] bytes = new byte[length];
    44. in.readBytes(bytes, 0, length);
    45. ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
    46. Message message = (Message) ois.readObject();
    47. // 将信息放入List中,传递给下一个handler
    48. out.add(message);
    49. // 打印获得的信息正文
    50. System.out.println("===========魔数===========");
    51. System.out.println(magic);
    52. System.out.println("===========版本号===========");
    53. System.out.println(version);
    54. System.out.println("===========序列化方法===========");
    55. System.out.println(seqType);
    56. System.out.println("===========指令类型===========");
    57. System.out.println(messageType);
    58. System.out.println("===========请求序号===========");
    59. System.out.println(sequenceId);
    60. System.out.println("===========正文长度===========");
    61. System.out.println(length);
    62. System.out.println("===========正文===========");
    63. System.out.println(message);
    64. }
    65. }
      • 编码器与解码器方法源于父类ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息

      public class MessageCodec extends ByteToMessageCodec
      • 编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2n,不足需要补齐。正文内容如果为对象,需要通过序列化将其放入到ByteBuf中
      • 解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个handler

      编写测试类
      1. public class TestCodec {
      2. static final org.slf4j.Logger log = LoggerFactory.getLogger(StudyServer.class);
      3. public static void main(String[] args) throws Exception {
      4. EmbeddedChannel channel = new EmbeddedChannel();
      5. // 添加解码器,避免粘包半包问题
      6. channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0));
      7. // 开启控制台日志
      8. channel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
      9. // 绑定自定义编码器与解码器,其内部重写父类的encode和decode两个handler方法
      10. channel.pipeline().addLast(new MessageCodec());
      11. // 自定义的封装dto类
      12. LoginRequestMessage user = new LoginRequestMessage("Nyima", "123");
      13. // 测试编码与解码
      14. ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer();
      15. // 内部将user正文数据存储到byteBuf的正文位置上
      16. new MessageCodec().encode(null, user, byteBuf);
      17. channel.writeInbound(byteBuf);
      18. }
      19. }
      • 测试类中用到了LengthFieldBasedFrameDecoder,避免粘包半包问题
      • 通过MessageCodec的encode方法将附加信息与正文写入到ByteBuf中,通过channel执行入站操作。入站时会调用decode方法进行解码

      运行结果


       @Sharable注解

      为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作

      1. LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
      2. // 不同的channel中使用同一个handler对象,提高复用率
      3. channel1.pipeline().addLast(loggingHandler);
      4. channel2.pipeline().addLast(loggingHandler);

      但是并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题

      • channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播
      • 此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误

      为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable注解来标明,该handler能否在多个channel中共享。

      只有带有该注解,才能通过对象的方式被共享,否则无法被共享


      自定义编解码器能否使用@Sharable注解

      这需要根据自定义的handler的处理逻辑进行分析

      我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的

      但是实际情况我们并不能添加该注解,会抛出异常信息ChannelHandler cn.nyimac.study.day8.protocol.MessageCodec is not allowed to be shared

      • 因为MessageCodec继承自ByteToMessageCodec,ByteToMessageCodec类的注解如下

      • 这就意味着ByteToMessageCodec不能被多个channel所共享的

        • 原因:因为该类的目标是:将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过。所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题

      如果想要共享,需要怎么办呢?

      继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似

      1. @ChannelHandler.Sharable
      2. public class MessageSharableCodec extends MessageToMessageCodec {
      3. @Override
      4. protected void encode(ChannelHandlerContext ctx, Message msg, List out) throws Exception {
      5. ...
      6. }
      7. @Override
      8. protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
      9. ...
      10. }
      11. }

      12. 3、在线聊天室

        聊天室业务

        用户登录接口
        1. public interface UserService {
        2. /**
        3. * 登录
        4. * @param username 用户名
        5. * @param password 密码
        6. * @return 登录成功返回 true, 否则返回 false
        7. */
        8. boolean login(String username, String password);
        9. }
        用户会话接口
        1. public interface Session {
        2. /**
        3. * 绑定会话
        4. * @param channel 哪个 channel 要绑定会话
        5. * @param username 会话绑定用户
        6. */
        7. void bind(Channel channel, String username);
        8. /**
        9. * 解绑会话
        10. * @param channel 哪个 channel 要解绑会话
        11. */
        12. void unbind(Channel channel);
        13. /**
        14. * 获取属性
        15. * @param channel 哪个 channel
        16. * @param name 属性名
        17. * @return 属性值
        18. */
        19. Object getAttribute(Channel channel, String name);
        20. /**
        21. * 设置属性
        22. * @param channel 哪个 channel
        23. * @param name 属性名
        24. * @param value 属性值
        25. */
        26. void setAttribute(Channel channel, String name, Object value);
        27. /**
        28. * 根据用户名获取 channel
        29. * @param username 用户名
        30. * @return channel
        31. */
        32. Channel getChannel(String username);
        33. }
        群聊会话接口
        1. public interface GroupSession {
        2. /**
        3. * 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
        4. * @param name 组名
        5. * @param members 成员
        6. * @return 成功时返回组对象, 失败返回 null
        7. */
        8. Group createGroup(String name, Set members);
        9. /**
        10. * 加入聊天组
        11. * @param name 组名
        12. * @param member 成员名
        13. * @return 如果组不存在返回 null, 否则返回组对象
        14. */
        15. Group joinMember(String name, String member);
        16. /**
        17. * 移除组成员
        18. * @param name 组名
        19. * @param member 成员名
        20. * @return 如果组不存在返回 null, 否则返回组对象
        21. */
        22. Group removeMember(String name, String member);
        23. /**
        24. * 移除聊天组
        25. * @param name 组名
        26. * @return 如果组不存在返回 null, 否则返回组对象
        27. */
        28. Group removeGroup(String name);
        29. /**
        30. * 获取组成员
        31. * @param name 组名
        32. * @return 成员集合, 如果群不存在或没有成员会返回 empty set
        33. */
        34. Set getMembers(String name);
        35. /**
        36. * 获取组成员的 channel 集合, 只有在线的 channel 才会返回
        37. * @param name 组名
        38. * @return 成员 channel 集合
        39. */
        40. List getMembersChannel(String name);
        41. /**
        42. * 判断群聊是否一被创建
        43. * @param name 群聊名称
        44. * @return 是否存在
        45. */
        46. boolean isCreated(String name);
        47. }
        整体结构

        • client包:存放客户端相关类

        • message包:存放各种类型的消息

        • protocol包:存放自定义协议

        • server包:存放服务器相关类

          • service包:存放用户相关类
          • session包:单聊及群聊相关会话类
        客户端代码结构
        1. public class ChatClient {
        2. static final Logger log = LoggerFactory.getLogger(ChatClient.class);
        3. public static void main(String[] args) {
        4. NioEventLoopGroup group = new NioEventLoopGroup();
        5. LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        6. MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
        7. try {
        8. Bootstrap bootstrap = new Bootstrap();
        9. bootstrap.group(group);
        10. bootstrap.channel(NioSocketChannel.class);
        11. bootstrap.handler(new ChannelInitializer() {
        12. @Override
        13. protected void initChannel(SocketChannel ch) throws Exception {
        14. // 自定义的协议解码粘半包处理器
        15. ch.pipeline().addLast(new ProtocolFrameDecoder());
        16. ch.pipeline().addLast(loggingHandler);
        17. ch.pipeline().addLast(messageSharableCodec);
        18. }
        19. });
        20. Channel channel = bootstrap.connect().sync().channel();
        21. channel.closeFuture().sync();
        22. } catch (InterruptedException e) {
        23. e.printStackTrace();
        24. } finally {
        25. group.shutdownGracefully();
        26. }
        27. }
        28. }
        服务器代码结构
        1. public class ChatServer {
        2. static final Logger log = LoggerFactory.getLogger(ChatServer.class);
        3. public static void main(String[] args) {
        4. NioEventLoopGroup boss = new NioEventLoopGroup();
        5. NioEventLoopGroup worker = new NioEventLoopGroup();
        6. LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
        7. MessageSharableCodec messageSharableCodec = new MessageSharableCodec();
        8. try {
        9. ServerBootstrap bootstrap = new ServerBootstrap();
        10. bootstrap.group(boss, worker);
        11. bootstrap.channel(NioServerSocketChannel.class);
        12. bootstrap.childHandler(new ChannelInitializer() {
        13. @Override
        14. protected void initChannel(SocketChannel ch) throws Exception {
        15. ch.pipeline().addLast(new ProtocolFrameDecoder());
        16. ch.pipeline().addLast(loggingHandler);
        17. ch.pipeline().addLast(messageSharableCodec);
        18. }
        19. });
        20. Channel channel = bootstrap.bind(8080).sync().channel();
        21. channel.closeFuture().sync();
        22. } catch (InterruptedException e) {
        23. e.printStackTrace();
        24. } finally {
        25. boss.shutdownGracefully();
        26. worker.shutdownGracefully();
        27. }
        28. }
        29. }

        登录

        客户端代码

        客户端添加如下handler,分别处理登录、聊天等操作

        1. @Slf4j
        2. public class ChatClient {
        3. public static void main(String[] args) {
        4. NioEventLoopGroup group = new NioEventLoopGroup();
        5. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        6. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        7. // 这是一个计数锁,只有当其维护的value减为0的时候才会释放
        8. CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
        9. // 原子变量
        10. AtomicBoolean LOGIN = new AtomicBoolean(false);
        11. try {
        12. Bootstrap bootstrap = new Bootstrap();
        13. bootstrap.channel(NioSocketChannel.class);
        14. bootstrap.group(group);
        15. bootstrap.handler(new ChannelInitializer() {
        16. @Override
        17. protected void initChannel(SocketChannel ch) throws Exception {
        18. ch.pipeline().addLast(new ProcotolFrameDecoder());
        19. ch.pipeline().addLast(LOGGING_HANDLER);
        20. ch.pipeline().addLast(MESSAGE_CODEC);
        21. ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
        22. /**
        23. * 创建连接时执行的处理器,用于执行登陆操作
        24. */
        25. @Override
        26. public void channelActive(ChannelHandlerContext ctx) throws Exception {
        27. // 开辟额外线程(不要让nio的线程被录入阻塞),用于用户登陆及后续操作
        28. new Thread(()->{
        29. Scanner scanner = new Scanner(System.in);
        30. System.out.println("请输入用户名");
        31. String username = scanner.next();
        32. System.out.println("请输入密码");
        33. String password = scanner.next();
        34. // 创建包含登录信息的请求体
        35. LoginRequestMessage message = new LoginRequestMessage(username, password);
        36. // 发送到channel中,注意这里用ctx写出,因为他要从这里找前面的那些处理器进行加工
        37. ctx.writeAndFlush(message);
        38. // 校验登录结果,如果能获取到锁就说明登录成功
        39. if (!loginStatus.get()) {
        40. // 登陆失败,关闭channel并返回
        41. ctx.channel().close();
        42. return;
        43. }
        44. // 登录成功后,执行其他操作
        45. while (true) {
        46. System.out.println("==================================");
        47. System.out.println("send [username] [content]");
        48. System.out.println("gsend [group name] [content]");
        49. System.out.println("gcreate [group name] [m1,m2,m3...]");
        50. System.out.println("gmembers [group name]");
        51. System.out.println("gjoin [group name]");
        52. System.out.println("gquit [group name]");
        53. System.out.println("quit");
        54. System.out.println("==================================");
        55. String command = scanner.nextLine();
        56. // 获得指令及其参数,并发送对应类型消息
        57. // 注意这里!!!!!你发送的消息类型决定了在服务器端处理的handeler
        58. String[] commands = command.split(" ");
        59. switch (commands[0]){
        60. case "send":
        61. ctx.writeAndFlush(new ChatRequestMessage(username, commands[1], commands[2]));
        62. break;
        63. case "gsend":
        64. ctx.writeAndFlush(new GroupChatRequestMessage(username,commands[1], commands[2]));
        65. break;
        66. case "gcreate":
        67. // 分割,获得群员名
        68. String[] members = commands[2].split(",");
        69. Set set = new HashSet<>(Arrays.asList(members));
        70. // 把自己加入到群聊中
        71. set.add(username);
        72. ctx.writeAndFlush(new GroupCreateRequestMessage(commands[1],set));
        73. break;
        74. case "gmembers":
        75. ctx.writeAndFlush(new GroupMembersRequestMessage(commands[1]));
        76. break;
        77. case "gjoin":
        78. ctx.writeAndFlush(new GroupJoinRequestMessage(username, commands[1]));
        79. break;
        80. case "gquit":
        81. ctx.writeAndFlush(new GroupQuitRequestMessage(username, commands[1]));
        82. break;
        83. case "quit":
        84. ctx.channel().close();
        85. return;
        86. default:
        87. System.out.println("指令有误,请重新输入");
        88. continue;
        89. }
        90. }
        91. }, "login channel").start();
        92. }
        93. @Override
        94. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        95. // 注意噢,这个消息的接收是在这里进行输出控制台
        96. log.debug("{}", msg);
        97. if (msg instanceof LoginResponseMessage) {
        98. // 如果是登录响应信息
        99. LoginResponseMessage message = (LoginResponseMessage) msg;
        100. boolean isSuccess = message.isSuccess();
        101. // 登录成功,设置登陆标记
        102. if (isSuccess) {
        103. loginStatus.set(true);
        104. }
        105. // 登陆后,唤醒登陆线程,原始计数为1,减了一个后就变为0,释放锁
        106. waitLogin.countDown();
        107. }
        108. }
        109. });
        服务器代码
        1. @Slf4j
        2. public class ChatServer {
        3. public static void main(String[] args) {
        4. NioEventLoopGroup boss = new NioEventLoopGroup();
        5. NioEventLoopGroup worker = new NioEventLoopGroup();
        6. LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
        7. MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
        8. try {
        9. ServerBootstrap serverBootstrap = new ServerBootstrap();
        10. serverBootstrap.channel(NioServerSocketChannel.class);
        11. serverBootstrap.group(boss, worker);
        12. serverBootstrap.childHandler(new ChannelInitializer() {
        13. @Override
        14. protected void initChannel(SocketChannel ch) throws Exception {
        15. ch.pipeline().addLast(new ProcotolFrameDecoder());
        16. // 日志
        17. ch.pipeline().addLast(LOGGING_HANDLER);
        18. // 自定义的协议编解码操作
        19. ch.pipeline().addLast(MESSAGE_CODEC);
        20. // 只对LoginRequestMessage解码结果进行操作
        21. ch.pipeline().addLast(new SimpleChannelInboundHandler() {
        22. @Override
        23. protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
        24. String username = msg.getUsername();
        25. String password = msg.getPassword();
        26. // 拿着账号密码去后端做校验,校验通过还要把用户名和channel的对
        27. 应关系也要存储起来,用来实现单聊的时候看对方在不在线
        28. boolean login = UserServiceFactory.getUserService().login(username, password);
        29. LoginResponseMessage message;
        30. if(login) {
        31. message = new LoginResponseMessage(true, "登录成功");
        32. } else {
        33. message = new LoginResponseMessage(false, "用户名或密码不正确");
        34. }
        35. ctx.writeAndFlush(message);
        36. }
        37. });
        38. }
        39. });
        40. Channel channel = serverBootstrap.bind(8080).sync().channel();
        41. channel.closeFuture().sync();
        42. } catch (InterruptedException e) {
        43. log.error("server error", e);
        44. } finally {
        45. boss.shutdownGracefully();
        46. worker.shutdownGracefully();
        47. }
        48. }
        49. }
        1. // 该handler处理登录请求
        2. LoginRequestMessageHandler loginRequestMessageHandler = new LoginRequestMessageHandler();
        3. ch.pipeline().addLast(new LoginRequestMessageHandler());
        运行结果

        客户端

        1. 5665 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 1, 0, 279
        2. 5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:AbstractResponseMessage{success=true, reason='登陆成功'}
        3. 5667 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='登陆成功'}
        4. success

        服务器

        1. 11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - 1314474317, 1, 1, 0, 0, 217
        2. 11919 [nioEventLoopGroup-3-1] DEBUG cn.nyimac.study.day8.protocol.MessageSharableCodec - message:LoginRequestMessage{username='Nyima', password='123'}
        3. 7946 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x8e7c07f6, L:/127.0.0.1:8080 - R:/127.0.0.1:60572] WRITE: 295B
        4. +-------------------------------------------------+
        5. | 0 1 2 3 4 5 6 7 8 9 a b c d e f |
        6. +--------+-------------------------------------------------+----------------+
        7. |00000000| 4e 59 49 4d 01 01 01 00 00 00 00 ff 00 00 01 17 |NYIM............|
        8. |00000010| ac ed 00 05 73 72 00 31 63 6e 2e 6e 79 69 6d 61 |....sr.1cn.nyima|
        9. |00000020| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
        10. |00000030| 73 61 67 65 2e 4c 6f 67 69 6e 52 65 73 70 6f 6e |sage.LoginRespon|
        11. |00000040| 73 65 4d 65 73 73 61 67 65 e2 34 49 24 72 52 f3 |seMessage.4I$rR.|
        12. |00000050| 07 02 00 00 78 72 00 34 63 6e 2e 6e 79 69 6d 61 |....xr.4cn.nyima|
        13. |00000060| 63 2e 73 74 75 64 79 2e 64 61 79 38 2e 6d 65 73 |c.study.day8.mes|
        14. |00000070| 73 61 67 65 2e 41 62 73 74 72 61 63 74 52 65 73 |sage.AbstractRes|
        15. |00000080| 70 6f 6e 73 65 4d 65 73 73 61 67 65 b3 7e 19 32 |ponseMessage.~.2|
        16. |00000090| 9b 88 4d 7b 02 00 02 5a 00 07 73 75 63 63 65 73 |..M{...Z..succes|
        17. |000000a0| 73 4c 00 06 72 65 61 73 6f 6e 74 00 12 4c 6a 61 |sL..reasont..Lja|
        18. |000000b0| 76 61 2f 6c 61 6e 67 2f 53 74 72 69 6e 67 3b 78 |va/lang/String;x|
        19. |000000c0| 72 00 24 63 6e 2e 6e 79 69 6d 61 63 2e 73 74 75 |r.$cn.nyimac.stu|
        20. |000000d0| 64 79 2e 64 61 79 38 2e 6d 65 73 73 61 67 65 2e |dy.day8.message.|
        21. |000000e0| 4d 65 73 73 61 67 65 dd e9 84 b7 21 db 18 52 02 |Message....!..R.|
        22. |000000f0| 00 02 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 |..I..messageType|
        23. |00000100| 49 00 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 |I..sequenceIdxp.|
        24. |00000110| 00 00 00 00 00 00 00 01 74 00 0c e7 99 bb e9 99 |........t.......|
        25. |00000120| 86 e6 88 90 e5 8a 9f |....... |
        26. +--------+-------------------------------------------------+----------------+

        单聊

        客户端输入send username content即可发送单聊消息,需要服务器端添加处理ChatRequestMessage的handler

        1. @ChannelHandler.Sharable // 必须添加该注解
        2. // 表明只对ChatRequestMessage的消息进行加工
        3. public class ChatRequestMessageHandler extends SimpleChannelInboundHandler {
        4. @Override
        5. protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
        6. // 获得user所在的channel
        7. Channel channel = SessionFactory.getSession().getChannel(msg.getTo());
        8. // 如果双方都在线
        9. if (channel != null) {
        10. // 通过接收方与服务器之间的channel发送信息,注意,这里不是写到byteBuf去
        11. channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
        12. } else {
        13. // 通过发送方与服务器之间的channel发送消息
        14. ctx.writeAndFlush(new ChatResponseMessage(false, "对方用户不存在或离线,发送失败"));
        15. }
        16. }
        17. }
        1. // 该handler处理单聊请求
        2. ChatRequestMessageHandler chatRequestMessageHandler = new ChatRequestMessageHandler();
        3. ch.pipeline().addLast(chatRequestMessageHandler);

        运行结果

        发送方(zhangsan)

        send Nyima hello

        接收方(Nyima)

        1. // 收到zhangsan发来的消息
        2. 20230 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - ChatResponseMessage{from='zhangsan', content='hello'}

        群聊

        创建群聊

        添加处理GroupCreateRequestMessage的handler

        1. @ChannelHandler.Sharable
        2. // 表明只对GroupCreateRequestMessage的消息进行加工
        3. public class GroupCreateMessageHandler extends SimpleChannelInboundHandler {
        4. @Override
        5. protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
        6. // 获得要创建的群聊名,ctx对应的是发送这个创建群聊的业务请求的人的这个channel
        7. String groupName = msg.getGroupName();
        8. // 获得要创建的群聊的成员组(首次拉起形成群聊的那几个人,包含自身才行)
        9. Set members = msg.getMembers();
        10. // 判断该群聊是否创建过,未创建返回null并创建群聊
        11. Group group = GroupSessionFactory.getGroupSession().createGroup(groupName, members);
        12. if (group == null) {
        13. // 向群的创建者发送创建成功消息
        14. GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(true, groupName + "创建成功");
        15. ctx.writeAndFlush(groupCreateResponseMessage);
        16. // 获得在线群员的channel,给群员发送入群聊消息
        17. List membersChannel = GroupSessionFactory.getGroupSession().getMembersChannel(groupName);
        18. groupCreateResponseMessage = new GroupCreateResponseMessage(true, "您已被拉入"+groupName);
        19. // 给每个在线群员发送消息
        20. for(Channel channel : membersChannel) {
        21. channel.writeAndFlush(groupCreateResponseMessage);
        22. }
        23. } else {
        24. // 发送失败消息给创建人
        25. GroupCreateResponseMessage groupCreateResponseMessage = new GroupCreateResponseMessage(false, groupName + "已存在");
        26. ctx.writeAndFlush(groupCreateResponseMessage);
        27. }
        28. }
        29. }
        1. // 该handler处理创建群聊请求
        2. GroupCreateMessageHandler groupCreateMessageHandler = new GroupCreateMessageHandler();
        3. ch.pipeline().addLast(groupCreateMessageHandler);

        运行结果

        创建者客户端

        1. // 首次创建
        2. gcreate Netty学习 zhangsan,lisi
        3. 31649 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='Netty学习创建成功'}
        4. 15244 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}
        5. // 再次创建
        6. gcreate Netty学习 zhangsan,lisi
        7. 40771 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='Netty学习已存在'}

        群员客户端

        28788 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='您已被拉入Netty学习'}

        群聊聊天
        1. @ChannelHandler.Sharable
        2. // 表明只对GroupChatRequestMessage的消息进行加工
        3. public class GroupChatMessageHandler extends SimpleChannelInboundHandler {
        4. @Override
        5. protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
        6. String groupName = msg.getGroupName();
        7. GroupSession groupSession = GroupSessionFactory.getGroupSession();
        8. // 判断群聊是否存在
        9. boolean isCreated = groupSession.isCreated(groupName);
        10. if (isCreated) {
        11. // 给群员发送信息
        12. List membersChannel = groupSession.getMembersChannel(groupName);
        13. for(Channel channel : membersChannel) {
        14. channel.writeAndFlush(new GroupChatResponseMessage(msg.getFrom(), msg.getContent()));
        15. }
        16. } else {
        17. ctx.writeAndFlush(new GroupChatResponseMessage(false, "群聊不存在"));
        18. }
        19. }
        20. }
        1. // 该handler处理群聊聊天
        2. GroupChatMessageHandler groupChatMessageHandler = new GroupChatMessageHandler();
        3. ch.pipeline().addLast(groupChatMessageHandler);

        运行结果

        发送方(群聊存在)

        1. gsend Netty学习 你们好
        2. 45408 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - GroupChatResponseMessage{from='zhangsan', content='你们好'}

        接收方

        48082 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - GroupChatResponseMessage{from='zhangsan', content='你们好'}

         发送方(群聊不存在)

        1. gsend Spring学习 你们好
        2. 25140 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient - AbstractResponseMessage{success=false, reason='群聊不存在'}

        加入群聊
        1. @ChannelHandler.Sharable
        2. public class GroupJoinMessageHandler extends SimpleChannelInboundHandler {
        3. @Override
        4. protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
        5. GroupSession groupSession = GroupSessionFactory.getGroupSession();
        6. // 判断该用户是否在群聊中
        7. Set members = groupSession.getMembers(msg.getGroupName());
        8. boolean joinFlag = false;
        9. // 群聊存在且用户未加入,才能加入
        10. if (!members.contains(msg.getUsername()) && groupSession.isCreated(msg.getGroupName())) {
        11. joinFlag = true;
        12. }
        13. if (joinFlag) {
        14. // 加入群聊
        15. groupSession.joinMember(msg.getGroupName(), msg.getUsername());
        16. ctx.writeAndFlush(new GroupJoinResponseMessage(true,"加入"+msg.getGroupName()+"成功"));
        17. } else {
        18. ctx.writeAndFlush(new GroupJoinResponseMessage(false, "加入失败,群聊未存在或您已加入该群聊"));
        19. }
        20. }
        21. }
        1. // 该handler处理加入群聊
        2. GroupJoinMessageHandler groupJoinMessageHandler = new GroupJoinMessageHandler();
        3. ch.pipeline().addLast(groupJoinMessageHandler);

        运行结果

        正常加入群聊

        94921 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='加入Netty学习成功'}

        加入不能存在或已加入的群聊

        44025 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=false, reason='加入失败,群聊未存在或您已加入该群聊'}

        退出
        1. @ChannelHandler.Sharable
        2. public class GroupQuitMessageHandler extends SimpleChannelInboundHandler {
        3. @Override
        4. protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
        5. GroupSession groupSession = GroupSessionFactory.getGroupSession();
        6. String groupName = msg.getGroupName();
        7. Set members = groupSession.getMembers(groupName);
        8. String username = msg.getUsername();
        9. // 判断用户是否在群聊中以及群聊是否存在
        10. boolean joinFlag = false;
        11. if (groupSession.isCreated(groupName) && members.contains(username)) {
        12. // 可以退出
        13. joinFlag = true;
        14. }
        15. if (joinFlag) {
        16. // 退出成功
        17. groupSession.removeMember(groupName, username);
        18. ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出"+groupName+"成功"));
        19. } else {
        20. // 退出失败
        21. ctx.writeAndFlush(new GroupQuitResponseMessage(false, "群聊不存在或您未加入该群,退出"+groupName+"失败"));
        22. }
        23. }
        24. }
        1. // 该handler处理退出群聊
        2. GroupQuitMessageHandler groupQuitMessageHandler = new GroupQuitMessageHandler();
        3. ch.pipeline().addLast(groupQuitMessageHandler);

        运行结果

        正常退出

        32282 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=true, reason='退出Netty学习成功'}

        退出不存在或未加入的群聊

        67404 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - AbstractResponseMessage{success=false, reason='群聊不存在或您未加入该群,退出Netty失败'}

        查看群聊成员
        1. @ChannelHandler.Sharable
        2. public class GroupMembersMessageHandler extends SimpleChannelInboundHandler {
        3. @Override
        4. protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
        5. ctx.writeAndFlush(new GroupMembersResponseMessage(GroupSessionFactory.getGroupSession().getMembers(msg.getGroupName())));
        6. }
        7. }
        1. // 该handler处理查看成员
        2. GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();
        3. ch.pipeline().addLast(groupMembersMessageHandler);

        运行结果

        46557 [nioEventLoopGroup-2-1] DEBUG cn.nyimac.study.day8.client.ChatClient  - GroupMembersResponseMessage{members=[zhangsan, Nyima]}

        退出聊天室

        1. @ChannelHandler.Sharable
        2. public class QuitHandler extends ChannelInboundHandlerAdapter {
        3. /**
        4. * 断开连接时触发 Inactive事件
        5. */
        6. @Override
        7. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        8. // 解绑
        9. SessionFactory.getSession().unbind(ctx.channel());
        10. }
        11. /**
        12. * 异常退出,需要解绑
        13. */
        14. @Override
        15. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        16. // 解绑
        17. SessionFactory.getSession().unbind(ctx.channel());
        18. }
        19. }
        1. // 该handler处理退出聊天室
        2. ch.pipeline().addLast(quitHandler);
        3. GroupMembersMessageHandler groupMembersMessageHandler = new GroupMembersMessageHandler();

        退出时,客户端会关闭channel并返回

        1. case "quit":
        2. // 关闭channel并返回
        3. ctx.channel().close();
        4. return;

        连接假死

        原因

        • 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源
        • 公网网络不稳定,出现丢包。如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,会白白地消耗资源
        • 应用程序线程阻塞,无法进行数据读写

        问题

        • 假死的连接占用的资源不能自动释放
        • 向假死的连接发送数据,得到的反馈是发送超时
        解决方法

        可以添加IdleStateHandler对空闲时间进行检测,通过构造函数可以传入三个参数

        • readerIdleTimeSeconds 读空闲经过的秒数
        • writerIdleTimeSeconds 写空闲经过的秒数
        • allIdleTimeSeconds 读和写空闲经过的秒数

        当指定时间内未发生读或写事件时,会触发特定事件

        • 读空闲会触发  READER_IDLE
        • 写空闲会触发  WRITE_IDLE
        • 读和写空闲会触发  ALL_IDEL

        将定时任务的周期设置为 0,这意味着不会触发该空闲状态事件。

        想要处理这些事件,需要自定义事件处理函数

        服务器端代码

        1. // 用于空闲连接的检测,5s内未读到数据,会触发READ_IDLE事件
        2. ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
        3. // 添加双向处理器,负责处理READER_IDLE事件
        4. /*ChannelDuplexHandler 是 Netty 框架中的一个特殊类,它是用来处理网络通信中的读写事件的双向处理器。它扩展
        5. 了ChannelInboundHandler和ChannelOutboundHandler,同时负责处理从网络中读取到的数据以及将数据写入到网络中。*/
        6. ch.pipeline().addLast(new ChannelDuplexHandler() {
        7. @Override
        8. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        9. // 获得事件
        10. IdleStateEvent event = (IdleStateEvent) evt;
        11. if (event.state() == IdleState.READER_IDLE) {
        12. // 断开连接
        13. ctx.channel().close();
        14. }
        15. }
        16. });
        • 使用IdleStateHandler进行空闲检测
        • 使用双向处理器ChannelDuplexHandler对入站与出站事件进行处理
          • IdleStateHandler中的事件为特殊事件,需要实现ChannelDuplexHandleruserEventTriggered方法,判断事件类型并自定义处理方式,来对事件进行处理

        避免因非网络等原因引发的WRITER_IDLE事件,比如网络情况良好,只是用户本身没有输入数据,这时发生WRITER_IDLE事件,直接让服务器断开连接是不可取的

        为避免此类情况,需要在客户端向服务器发送心跳包,发送频率要小于服务器设置的IdleTimeSeconds,一般设置为其值的一半

        客户端代码

        1. // 发送心跳包,让服务器知道客户端在线
        2. // 3s未发生WRITER_IDLE,就像服务器发送心跳包
        3. // 该值为服务器端设置的READER_IDLE触发时间的一半左右
        4. ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
        5. ch.pipeline().addLast(new ChannelDuplexHandler() {
        6. @Override
        7. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        8. IdleStateEvent event = (IdleStateEvent) evt;
        9. if (event.state() == IdleState.WRITER_IDLE) {
        10. // 发送心跳包
        11. ctx.writeAndFlush(new PingMessage());
        12. }
        13. }
        14. });

      13. 相关阅读:
        Linux安装jenkins
        恒合仓库 - 采购单管理模块
        注册D8读卡器COM组件
        深度学习:pytorch nn.Embedding详解
        el-table表格设置——动态修改表头
        axios介绍以及对axios进行二次封装
        (1-线性回归问题)RBF神经网络
        C/C++-指针
        从功能测试到自动化测试你都知道他们的有缺点吗?
        网络安全——使用反弹木马进行提权获取主机Shell
      14. 原文地址:https://blog.csdn.net/weixin_73077810/article/details/133755425