• netty群聊客户端服务器及心跳检测


    【README】

    1.本文总结自B站《netty-尚硅谷》,很不错;

    2.本文po出了 Unpooled创建缓冲区的 代码示例;

    3.本文示例代码基于netty实现以下功能:

    • 群聊客户端及服务器;
    • 心跳检测;

    【1】Unpooled创建缓冲区

    Unpooled定义:

    • 是Netty 提供的一个专门用来操作缓冲区(即Netty的数据容器)的工具

    【1.1】Unpooled.buffer-申请给定容量的缓冲区

    1)Unpooled.buffer(capacity) 定义:

    1. public static ByteBuf buffer(int initialCapacity) {
    2. return ALLOC.heapBuffer(initialCapacity);
    3. }

    代码示例 :

    1. public class NettyByteBuf61 {
    2. public static void main(String[] args) {
    3. // 创建一个对象,该对象包含一个数组 byte[10]
    4. // 在netty buf中,不需要像nio那样 执行flip 切换读写模式
    5. // 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置
    6. ByteBuf byteBuf = Unpooled.buffer(10);
    7. for (int i = 0; i < 10; i++) {
    8. byteBuf.writeByte(i); // writerIndex 自增
    9. }
    10. // 输出
    11. for (int i = 0; i < byteBuf.capacity(); i++) {
    12. System.out.printf(byteBuf.readByte() + " "); // readerIndex 自增
    13. // System.out.println(byteBuf.getByte(i));
    14. }
    15. // 查看 byteBuf 的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)
    16. System.out.println(byteBuf);
    17. }
    18. }

    【代码解说】

    • 在netty buf中,不需要像nio那样 执行flip 切换读写模式;
    • 因为 netty buf,维护了一个 readerIndex 和 writerIndex,分别表示下一次要读入和写出的位置;

    运行结果:

    0 1 2 3 4 5 6 7 8 9
    UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 10, widx: 10, cap: 10)


    【1.2】Unpooled.copiedBuffer() 创建buf 缓冲区

    copiedBuffer(CharSequence string, Charset charset) 定义:

    • 通过给定的数据和字符编码返回一个 ByteBuf 对象(类似于NIO中的ByteBuffer但有区别)
    1. public static ByteBuf copiedBuffer(CharSequence string, Charset charset) {
    2. if (string == null) {
    3. throw new NullPointerException("string");
    4. }
    5. if (string instanceof CharBuffer) {
    6. return copiedBuffer((CharBuffer) string, charset);
    7. }
    8. return copiedBuffer(CharBuffer.wrap(string), charset);
    9. }

    代码示例

    1. public class NettyByteBuf62 {
    2. public static void main(String[] args) {
    3. // 通过 Unpooled.copiedBuffer 创建 buf缓冲区
    4. ByteBuf byteBuf = Unpooled.copiedBuffer("hello world", StandardCharsets.UTF_8);
    5. // 1 使用相关方法-byteBuf.hasArray()
    6. if (byteBuf.hasArray()) {
    7. String content = new String(byteBuf.array(), StandardCharsets.UTF_8);
    8. System.out.println(content);
    9. // 查看ByteBuf的类型-UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 12, cap: 24)
    10. System.out.println("bytebuf = " + byteBuf);
    11. // 查看偏移量
    12. System.out.println("byteBuf.arrayOffset() = " + byteBuf.arrayOffset()); // 0
    13. // 查看 readerIndex
    14. System.out.println("byteBuf.readerIndex() = " + byteBuf.readerIndex()); // 0
    15. // 查看 writerIndex
    16. System.out.println("byteBuf.writerIndex() = " + byteBuf.writerIndex()); // 12
    17. // 查看 capacity
    18. System.out.println("byteBuf.capacity() = " + byteBuf.capacity());
    19. // 查看可读取的字节数量 12
    20. System.out.println("byteBuf.readableBytes() = " + byteBuf.readableBytes());
    21. // 使用for循环读取byteBuf
    22. for (int i = 0; i < byteBuf.readableBytes(); i++) {
    23. System.out.print((char)byteBuf.getByte(i));
    24. }
    25. System.out.println();
    26. // 读取 byteBuf 其中某一段,从下标4开始,读取6个字节
    27. CharSequence charSequence = byteBuf.getCharSequence(4, 6, StandardCharsets.UTF_8);
    28. System.out.println(charSequence);
    29. }
    30. }
    31. }

    运行结果:

    hello world                      
    bytebuf = UnpooledByteBufAllocator$InstrumentedUnpooledUnsafeHeapByteBuf(ridx: 0, widx: 11, cap: 33)
    byteBuf.arrayOffset() = 0
    byteBuf.readerIndex() = 0
    byteBuf.writerIndex() = 11
    byteBuf.capacity() = 33
    byteBuf.readableBytes() = 11
    hello world
    o worl


    【2】netty群聊客户端与服务器

    需求描述:

    1. 基于Netty 实现 多人群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
    2.  服务器端:可以监测用户上线,离线,并实现消息转发功能;
    3.  客户端: 通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(由服务器转发得到);

    【2.1】netty服务器

    1)群聊服务器代码

    1. /**
    2. * @Description netty群聊服务器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月03日
    6. */
    7. public class NettyGroupChatServer63 {
    8. private int port;
    9. public NettyGroupChatServer63(int port) {
    10. this.port = port;
    11. }
    12. public static void main(String[] args) {
    13. try {
    14. new NettyGroupChatServer63(8089).run();
    15. } catch (InterruptedException e) {
    16. e.printStackTrace();
    17. }
    18. }
    19. public void run() throws InterruptedException {
    20. // 创建两个线程组
    21. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    22. EventLoopGroup workerGroup = new NioEventLoopGroup();
    23. try {
    24. // 服务器启动引导对象
    25. ServerBootstrap serverBootstrap = new ServerBootstrap();
    26. serverBootstrap.group(bossGroup, workerGroup)
    27. .channel(NioServerSocketChannel.class)
    28. .option(ChannelOption.SO_BACKLOG, 128)
    29. .childOption(ChannelOption.SO_KEEPALIVE, true)
    30. .childHandler(new ChannelInitializer() {
    31. @Override
    32. protected void initChannel(SocketChannel socketChannel) throws Exception {
    33. // 获取pipeline
    34. ChannelPipeline pipeline = socketChannel.pipeline();
    35. // 添加解码处理器 编码器
    36. pipeline.addLast("decoder", new StringDecoder());
    37. pipeline.addLast("encoder", new StringEncoder());
    38. // 添加业务处理handler
    39. pipeline.addLast(new NettyGroupChatServerHandler());
    40. }
    41. });
    42. ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
    43. System.out.println("netty服务器启动成功");
    44. // 监听关闭
    45. channelFuture.channel().closeFuture().sync();
    46. } finally {
    47. // 优雅关闭线程
    48. bossGroup.shutdownGracefully().sync();
    49. workerGroup.shutdownGracefully().sync();
    50. }
    51. }
    52. }

    2)群聊服务器处理器

    1. /**
    2. * @Description netty服务器处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月03日
    6. */
    7. public class NettyGroupChatServerHandler extends SimpleChannelInboundHandler {
    8. // 定义一个 channel 组,用于管理channel
    9. // GlobalEventExecutor.INSTANCE 是全局事件执行器,是一个单例
    10. private static final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    11. // 读取数据并转发
    12. @Override
    13. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    14. // 获取当前channel
    15. Channel channel = ctx.channel();
    16. // 遍历 channelGroup, 根据不同情况 回送不同消息
    17. channelGroup.forEach(otherChannel-> {
    18. if (channel != otherChannel) { // 非当前channel, 直接转发
    19. otherChannel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户 " + channel.remoteAddress() + "说:" + msg + "\n");
    20. } else { // 回显自己发送的消息给自己
    21. channel.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "自己说:" + msg + "\n");
    22. }
    23. });
    24. }
    25. // 一旦连接建立,第一个被执行
    26. // 将当前channel 添加到channelGroup
    27. @Override
    28. public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    29. Channel channel = ctx.channel();
    30. // 把客户端加入群组的信息发送到其他客户端
    31. channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 加入聊天");
    32. // 把当前channel 添加到 channel 组
    33. channelGroup.add(channel);
    34. }
    35. // 表示 channel 处于活动状态, 提示 xx 上线
    36. @Override
    37. public void channelActive(ChannelHandlerContext ctx) throws Exception {
    38. System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 上线了");
    39. }
    40. // 表示 channel 处于离线状态, 提示 xx 离线
    41. @Override
    42. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    43. System.out.println("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + ctx.channel().remoteAddress() + " 离开了");
    44. }
    45. // 断开连接,把xx客户离开的信息推送给其他在线客户
    46. @Override
    47. public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    48. Channel channel = ctx.channel();
    49. channelGroup.writeAndFlush("["+ DateUtils.getNowTimestamp() +"]"+ "客户端 " + channel.remoteAddress() + " 离开了");
    50. System.out.println("channelGroup.size() = " + channelGroup.size());
    51. }
    52. // 发送异常如何处理
    53. @Override
    54. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    55. // 关闭通道
    56. ctx.close();
    57. }
    58. }

    【2.2】netty客户端

    1)群聊客户端代码:

    1. /**
    2. * @Description netty群聊客户端
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月03日
    6. */
    7. public class NettyGroupChatClient64 {
    8. /** 主机和端口 */
    9. private final String host;
    10. private final int port;
    11. /**
    12. * @description 构造器
    13. * @author xiao tang
    14. * @date 2022/9/3
    15. */
    16. public NettyGroupChatClient64(String host, int port) {
    17. this.host = host;
    18. this.port = port;
    19. }
    20. public static void main(String[] args) {
    21. try {
    22. new NettyGroupChatClient64("127.0.0.1", 8089).run();
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. }
    27. public void run() throws InterruptedException {
    28. // 事件运行的线程池
    29. EventLoopGroup eventExecutors = new NioEventLoopGroup();
    30. try {
    31. // 客户端启动引导对象
    32. Bootstrap bootstrap = new Bootstrap();
    33. bootstrap.group(eventExecutors)
    34. .channel(NioSocketChannel.class)
    35. .handler(new ChannelInitializer() {
    36. @Override
    37. protected void initChannel(SocketChannel socketChannel) throws Exception {
    38. ChannelPipeline pipeline = socketChannel.pipeline();
    39. // 添加解码器 编码器
    40. pipeline.addLast("decoder", new StringDecoder());
    41. pipeline.addLast("encoder", new StringEncoder());
    42. // 添加业务逻辑的 handler
    43. pipeline.addLast(new NettyGroupChatClietnHandler());
    44. }
    45. });
    46. // 连接给定主机的端口,阻塞直到连接成功
    47. ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
    48. // 得到 channel
    49. Channel channel = channelFuture.channel();
    50. System.out.println("----------" + channel.localAddress() + "----------");
    51. // 客户端需要输入信息,创建一个扫描器
    52. Scanner scanner = new Scanner(System.in);
    53. while (scanner.hasNextLine()) {
    54. String msg = scanner.nextLine();
    55. // 通过channel 发送到服务器
    56. channel.writeAndFlush(msg);
    57. }
    58. } finally {
    59. // 关闭线程池,释放所有资源,阻塞直到关闭成功
    60. eventExecutors.shutdownGracefully().sync();
    61. }
    62. }
    63. }

    2)群聊客户端处理器代码:

    1. /**
    2. * @Description netty群聊客户端处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月03日
    6. */
    7. public class NettyGroupChatClietnHandler extends SimpleChannelInboundHandler {
    8. @Override
    9. protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    10. System.out.println(msg.trim());
    11. }
    12. }

    【2.3】 运行结果:

    1)服务器与客户端: 服务器1个,客户端3个;

    2)客户端离线:

     


    【3】netty心跳检测

    【3.1】netty心跳检测概述

    1)netty定义的空闲状态事件:

    Triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
    * read, write, or both operation for a while.

    当一个通道一段时间内没有执行 读,写,或读写操作时,就会触发 IdleStateEvent事件

    2)需求描述:

    1. 编写一个 Netty心跳检测机制案例, 当服务器超过3秒没有读时,就提示读空闲;
    2. 当服务器超过5秒没有写操作时,就提示写空闲;
    3.  实现当服务器超过7秒没有读或者写操作时,就提示读写空闲;

    【3.2】netty心跳检测代码实现

    1)netty心跳检测服务器

    1. /**
    2. * @Description netty心跳检测服务器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月03日
    6. */
    7. public class NettyHeartbeatCheckServer66 {
    8. public static void main(String[] args) {
    9. try {
    10. new NettyHeartbeatCheckServer66().run();
    11. } catch (InterruptedException e) {
    12. e.printStackTrace();
    13. }
    14. }
    15. public void run() throws InterruptedException {
    16. // 创建线程池执行器
    17. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    18. EventLoopGroup workerGroup = new NioEventLoopGroup(8);
    19. try {
    20. // 服务器启动引导对象
    21. ServerBootstrap serverBootstrap = new ServerBootstrap();
    22. serverBootstrap.group(bossGroup, workerGroup)
    23. .channel(NioServerSocketChannel.class)
    24. .option(ChannelOption.SO_BACKLOG, 128)
    25. .option(ChannelOption.SO_KEEPALIVE, true)
    26. .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
    27. .childHandler(new ChannelInitializer() {
    28. @Override
    29. protected void initChannel(SocketChannel socketChannel) throws Exception {
    30. // 添加处理器
    31. ChannelPipeline pipeline = socketChannel.pipeline();
    32. // 1. 添加空闲状态处理器 :
    33. // readerIdleTime: 表示多长时间没有读入io事件,就会发送一个心跳检测包,检测是否连接状态
    34. // writerIdleTime: 表示多长时间没有写出io事件,就会发送一个心跳检测包,检测是否连接状态
    35. // allIdleTime: 表示多长时间没有读入和写出io事件,就会发送一个心跳检测包,检测是否连接状态
    36. // 2. 文档说明
    37. // Triggers an {@link IdleStateEvent } when a {@link Channel} has not performed
    38. // * read, write, or both operation for a while.
    39. // 3. 当 IdleStateEvent 事件触发后, 就会传递给管道的 下一个处理器 去处理
    40. // 通过调用下一个handler的 userEventTriggered 方法,即在该方法中处理IdleStateEvent 事件;
    41. pipeline.addLast(new IdleStateHandler(4, 5, 7, TimeUnit.SECONDS));
    42. // 添加一个对空闲检测 进一步处理的handler(自定义 )
    43. pipeline.addLast(new NettyHeartbeatCheckServerHandler());
    44. }
    45. });
    46. // 启动服务器,监听端口,阻塞直到启动成功
    47. ChannelFuture channelFuture = serverBootstrap.bind(8089).sync();
    48. // 阻塞直到channel关闭
    49. channelFuture.channel().closeFuture().sync();
    50. } finally {
    51. bossGroup.shutdownGracefully().sync();
    52. workerGroup.shutdownGracefully().sync();
    53. }
    54. }
    55. }

    2)netty心跳检测服务器处理器

    1. /**
    2. * @Description netty心跳检测服务器处理器
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年09月04日
    6. */
    7. public class NettyHeartbeatCheckServerHandler extends ChannelInboundHandlerAdapter {
    8. @Override
    9. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    10. if (evt instanceof IdleStateEvent) {
    11. IdleStateEvent event2 = (IdleStateEvent) evt;
    12. String eventType = ""; // 事件类型
    13. switch (event2.state()) {
    14. case READER_IDLE: eventType = "读空闲"; break;
    15. case WRITER_IDLE: eventType = "写空闲"; break;
    16. case ALL_IDLE: eventType = "读写空闲"; break;
    17. }
    18. System.out.println("客户端" + ctx.channel().remoteAddress() + "--超时事件--" + eventType);
    19. System.out.println("服务器做相应处理");
    20. // 如果发生空闲,马上关闭通道
    21. // System.out.println("一旦发生超时事件,则关闭 channel");
    22. // ctx.channel().close();
    23. }
    24. }
    25. }

    【3.3】运行结果:

    1)以 NettyGroupChatClient64 作为客户端连接到 服务器 NettyHeartbeatCheckServer66;

    2)打印结果如下:

    1. // 控制台打印结果
    2. 客户端/127.0.0.1:61278--超时事件--读空闲
    3. 服务器做相应处理
    4. 客户端/127.0.0.1:61278--超时事件--写空闲
    5. 服务器做相应处理
    6. 客户端/127.0.0.1:61278--超时事件--读写空闲
    7. 服务器做相应处理
    8. 客户端/127.0.0.1:61278--超时事件--读空闲
    9. 服务器做相应处理
    10. 客户端/127.0.0.1:61278--超时事件--写空闲
    11. 服务器做相应处理
    12. 客户端/127.0.0.1:61278--超时事件--读空闲
    13. 服务器做相应处理

  • 相关阅读:
    vue-quill-editor 富文本编辑器上传视频
    LVS-NAT模式实验案例
    Python---upper()--转大写///与lower() --转小写
    docker-compose搭建MongoDB
    MATLB|基于复杂网络的配电系统微电网优化配置
    VR全景技术打造“智慧亚运”,实现720度自由视角
    外包做了3个月,技术退步明显。。。。。
    【Linux】 ubuntu ffmpeg环境配置
    【EI会议征稿】第八届能源系统、电气与电力国际学术会议(ESEP 2023)
    spark原理和实践
  • 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126685268