• 4.基于NIO的群聊系统


    【README】

    1.本文总结自B站《netty-尚硅谷》,很不错;

    2.文末有错误及解决方法;


    【1】群聊需求

    1)编写一个 NIO 群聊系统,实现服务器端和客户端之间的数据简单通讯(非
    阻塞)
    2)实现多人群聊;
    3)服务器端:可以监测用户上线,离线,并实现消息转发功能;
    4)客户端:通过channel 可以无阻塞发送消息给其它所有用户,同时可以接受
    其它用户发送的消息(有服务器转发得到);


    【2】概要设计

    1)服务器端:

    •   服务器启动并监听 6667 ;
    •   服务器接收客户端消息,并实现转发,处理上线 与  离线;

    2)客户端

    •   连接服务器;
    •   发送消息;
    •   接收服务器消息 ;

    【3】代码实现及自测

    【3.1】服务器端

    1. /**
    2. * @Description 群聊服务器端
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年08月19日
    6. */
    7. public class NIOGchatServer {
    8. private Selector selector;
    9. private ServerSocketChannel listenChannel;
    10. private static final int PORT = 6667;
    11. /**
    12. * @description 构造器
    13. * @author xiao tang
    14. * @date 2022/8/19
    15. */
    16. public NIOGchatServer() {
    17. try {
    18. // 得到选择器
    19. selector = Selector.open();
    20. // 初始化 ServerSocketChannel
    21. listenChannel = ServerSocketChannel.open();
    22. // 绑定端口
    23. listenChannel.socket().bind(new InetSocketAddress(PORT));
    24. // 设置非阻塞模式
    25. listenChannel.configureBlocking(false);
    26. // 把listenChannel注册到selector,事件为 ACCEPT
    27. listenChannel.register(selector, SelectionKey.OP_ACCEPT);
    28. } catch (Exception e) {
    29. e.printStackTrace();
    30. System.out.println("群聊服务器构造异常");
    31. }
    32. }
    33. public static void main(String[] args) throws IOException {
    34. // 创建服务器对象,并监听端口
    35. new NIOGchatServer().listen();
    36. }
    37. /**
    38. * @description 监听
    39. * @param
    40. * @author xiao tang
    41. * @date 2022/8/19
    42. */
    43. public void listen() throws IOException {
    44. while(true) {
    45. // 等待客户端请求连接
    46. selector.select();
    47. // 获取选择key集合
    48. Iterator iterator = selector.selectedKeys().iterator();
    49. while (iterator.hasNext()) {
    50. SelectionKey key = iterator.next();
    51. if (key.isAcceptable()) { // 通道发生连接事件
    52. SocketChannel sc = listenChannel.accept();
    53. sc.configureBlocking(false); // 设置为非阻塞
    54. // 将 sc 注册到 selector 上
    55. sc.register(selector, SelectionKey.OP_READ);
    56. // 提示
    57. System.out.println(sc.getRemoteAddress() + " connected successfully.");
    58. }
    59. if (key.isReadable()) { // 通道发生 read 事件
    60. // 处理读
    61. this.readData(key);
    62. }
    63. // 用完之后,要移除key
    64. iterator.remove();
    65. }
    66. }
    67. }
    68. /**
    69. * @description 读取客户端消息
    70. * @param key 选择键
    71. * @return
    72. * @author xiao tang
    73. * @date 2022/8/19
    74. */
    75. private void readData(SelectionKey key) {
    76. // 定义一个socketchannel
    77. SocketChannel channel = null;
    78. try {
    79. // 取到关联的channel
    80. channel = (SocketChannel) key.channel();
    81. // 创建缓冲 buffer
    82. ByteBuffer buffer = ByteBuffer.allocate(1024);
    83. int count = channel.read(buffer);
    84. // 根据count的值做处理
    85. if (count > 0) {
    86. // 把缓冲区的数据转为字符串并输出
    87. String msg = new String(buffer.array(), 0, count, StandardCharsets.UTF_8);
    88. // 输出该消息
    89. System.out.println(msg);
    90. // 向其他客户端转发消息
    91. this.forward2OtherClients(msg, channel);
    92. }
    93. } catch (IOException e) {
    94. e.printStackTrace();
    95. try {
    96. System.out.println(channel.getRemoteAddress() + " has been offline.");
    97. // 取消注册
    98. key.channel();
    99. // 关闭通道
    100. channel.close();
    101. } catch (IOException e2) {
    102. e2.printStackTrace();
    103. }
    104. }
    105. }
    106. /**
    107. * @description 消息转发给其他客户端
    108. * @param msg 消息
    109. * @param self 当前 SocketChannel
    110. * @author xiao tang
    111. * @date 2022/8/19
    112. */
    113. private void forward2OtherClients(String msg, SocketChannel self) throws IOException {
    114. // 遍历所有注册到 selector 上的 SocketChannel 并排除自己
    115. for (SelectionKey key : selector.keys()) {
    116. // 排除自己
    117. if (key.equals(self.keyFor(selector))) continue;
    118. // 通过key 取出对应的 SocketChannel
    119. Channel targetChannel = key.channel();
    120. // 消息转发
    121. if (targetChannel instanceof SocketChannel) {
    122. SocketChannel dest = (SocketChannel) targetChannel;
    123. // 把 msg 存储到buffer
    124. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
    125. // 把buffer数据写入通道
    126. dest.write(buffer);
    127. }
    128. }
    129. }
    130. }

    【3.2】客户端

    1. /**
    2. * @Description NIO群聊客户端
    3. * @author xiao tang
    4. * @version 1.0.0
    5. * @createTime 2022年08月19日
    6. */
    7. public class NIOGchatClient {
    8. // 定义相关的属性
    9. private static final String HOST = "127.0.0.1"; // 服务器ip地址
    10. private static final int PORT = 6667; // 服务器端口
    11. private Selector selector;
    12. private SocketChannel socketChannel;
    13. private String userName;
    14. // 线程池
    15. private static ExecutorService THREAD_POOL = Executors.newCachedThreadPool();
    16. public static void main(String[] args) {
    17. try {
    18. // 启动客户端
    19. NIOGchatClient client = new NIOGchatClient();
    20. // 启动一个线程,每隔3秒读取服务器发送的数据
    21. THREAD_POOL.submit(new Runnable() {
    22. @Override
    23. public void run() {
    24. while(true) {
    25. try {
    26. client.read();
    27. TimeUnit.SECONDS.sleep(3);
    28. } catch (Exception e) {
    29. e.printStackTrace();
    30. break;
    31. }
    32. }
    33. }
    34. });
    35. // 客户端接收控制台输入,并发送数据给服务器
    36. Scanner scanner = new Scanner(System.in);
    37. while(scanner.hasNextLine()) {
    38. client.send(scanner.nextLine());
    39. }
    40. } catch (Exception e) {
    41. e.printStackTrace();
    42. } finally {
    43. THREAD_POOL.shutdown();
    44. }
    45. }
    46. /**
    47. * @description 构造器
    48. * @author xiao tang
    49. * @date 2022/8/19
    50. */
    51. public NIOGchatClient() throws IOException {
    52. this.selector = Selector.open();
    53. // 连接服务器
    54. socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
    55. // 设置非阻塞
    56. socketChannel.configureBlocking(false);
    57. // 将 channel 注册到 selector,事件 READ
    58. socketChannel.register(this.selector, SelectionKey.OP_READ);
    59. // 得到userName
    60. userName = socketChannel.getLocalAddress().toString().substring(1);
    61. System.out.println(userName + " connected server successfully.");
    62. }
    63. /**
    64. * @description 发送消息到服务器
    65. * @param msg 消息
    66. * @author xiao tang
    67. * @date 2022/8/19
    68. */
    69. private void send(String msg) {
    70. msg = userName + ":" + msg;
    71. ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8));
    72. try {
    73. socketChannel.write(buffer);
    74. } catch (IOException e) {
    75. e.printStackTrace();
    76. }
    77. }
    78. /**
    79. * @description 从通道读取数据并显示
    80. * @author xiao tang
    81. * @date 2022/8/20
    82. */
    83. private void read() throws IOException {
    84. selector.select();
    85. // 存在可用通道,读取数据并显示 (注意这里是 selector.selectedKeys() 而不是 selector.keys() )
    86. Iterator iterator = selector.selectedKeys().iterator();
    87. while (iterator.hasNext()) {
    88. SelectionKey key = iterator.next();
    89. // 若可读通道,则读取
    90. if (key.isReadable()) {
    91. SocketChannel sc = (SocketChannel) key.channel();
    92. ByteBuffer buffer = ByteBuffer.allocate(1024);
    93. int count = sc.read(buffer);
    94. System.out.println(new String(buffer.array(), 0, count , StandardCharsets.UTF_8));
    95. }
    96. // 用完key要移除
    97. iterator.remove();
    98. }
    99. }
    100. }

    【3.3】测试效果


    【4】报错及解决

    1)问题1:为什么要移除key ?

    1. // 用完之后,要移除key
    2. iterator.remove();

    refer2 Why the key should be removed in `selector.selectedKeys().iterator()` in java nio? - Stack Overflow

    There are 2 tables in the selector:

    1. registration table: when we call channel.register, there will be a new item(key) into it. Only if we call key.cancel(), it will be removed from this table.

    2. ready for selection table: when we call selector.select(), the selector will look up the registration table, find the keys which are available, copy the references of them to this selection table. The items of this table won't be cleared by selector(that means, even if we call selector.select() again, it won't clear the existing items)

    That's why we have to invoke iter.remove() when we got the key from selection table. If not, we will get the key again and again by selector.selectedKeys() even if it's not ready to use.

    大意就是:选择器中有2个表,分别是 表1是注册表; 表2是就绪选择表

    调用 selector.select() 时, 注册表1中对应通道有事件的key 会被拷贝到就绪选择表2;而 选择器不会清理表2的key;即便我们重复调用 selector.select() 时,它也不会清理表2的key;

    这也就是为什么我们从选择表2中获得key后,会调用 it.remove() 清理掉key;如果不清理,我们重复调用 selector.selectedKeys() 时,还是会获取之前的key,即便这些key对应 通道没有事件,这就会导致报空指针

    2)分清楚 selector.selectedKeys() 和 selector.keys() 的 区别

    • selector.selectedKeys():获取有事件发生的通道对应的键集合,如 ACCEPT事件,READ事件;
    • selector.keys():获取注册到当前选择器的所有通道对应的key集合;(因为通道要先实现多路复用,就需要注册到选择器,选择器会产生一个key,与通道关联起来);

    3)为什么客户端或服务器在读取缓冲区的内容时,我要通过offset + 长度去获取?如 代码:

    1. // 若可读通道,则读取
    2. if (key.isReadable()) {
    3. SocketChannel sc = (SocketChannel) key.channel();
    4. ByteBuffer buffer = ByteBuffer.allocate(1024);
    5. int count = sc.read(buffer);
    6. System.out.println(new String(buffer.array(), 0, count , StandardCharsets.UTF_8));
    7. }

    【代码解说】

    • 上述代码的最后一行,offset 等于0, 长度是count;
    • 因为如果不使用 count 限定buffer范围的话,打印出来有很多换行。(当然是我的测试案例里是有换行 ,有兴趣的同学可以自己试下);
    • 加了count,限定范围后,就没有换行了。
  • 相关阅读:
    数据库设计详细教程上篇(数据库设计流程)
    简单工厂模式、工厂模式、抽象工厂模式和加入反射、配置优化后的抽象工厂模式之间的关系和区别
    分布式电源接入对配电网影响分析(Matlab代码实现)
    【教学类-12-01】20221105《连连看8*4-不重复16个)(小班主题《白天与黑夜》)
    小白科普篇:详解Java对象的强引用、软引用、弱引用和虚引用
    决胜未来:解锁新科技趋势的无尽可能性
    前端笔试题总结,带答案和解析(三)
    【Eclipse】安装教程
    基于matlab的sift变换的图像配准和拼接算法仿真
    setTimeout 、setInterval、requestAnimationFrame
  • 原文地址:https://blog.csdn.net/PacosonSWJTU/article/details/126435122