• Java基础之《netty(6)—NIO快速入门》


    一、案例
    1、编写一个NIO入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞)
    2、目的:理解NIO非阻塞网络编程机制
    3、代码

     

    NIOServer.java

    1. package netty.niostart;
    2. import java.io.IOException;
    3. import java.net.InetSocketAddress;
    4. import java.nio.ByteBuffer;
    5. import java.nio.channels.SelectionKey;
    6. import java.nio.channels.Selector;
    7. import java.nio.channels.ServerSocketChannel;
    8. import java.nio.channels.SocketChannel;
    9. import java.util.Iterator;
    10. import java.util.Set;
    11. public class NIOServer {
    12. public static void main(String[] args) throws Exception {
    13. //创建ServerSocketChannel -> 类似于ServerSocket
    14. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    15. //得到一个Selector对象
    16. Selector selector = Selector.open();
    17. //绑定一个端口6666,在服务器端监听
    18. serverSocketChannel.socket().bind(new InetSocketAddress(6666));
    19. //设置为非阻塞
    20. serverSocketChannel.configureBlocking(false);
    21. //把serverSocketChannel注册到selector,关心事件为OP_ACCEPT
    22. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    23. //循环等待客户端连接
    24. while(true) {
    25. if (selector.select(5*1000) == 0) {
    26. //等待5秒钟,如果没有事件发生,返回
    27. System.out.println("服务器等待了5秒,无连接");
    28. continue;
    29. }
    30. //如果返回的>0,就获取到相关的selectionKey集合
    31. //1.如果返回的>0,表示已经获取到关注的事件
    32. //2.selector.selectedKeys()返回关注的集合
    33. //3.通过selectionKeys反向获取通道
    34. Set selectionKeys = selector.selectedKeys();
    35. //遍历集合
    36. Iterator keyIterator = selectionKeys.iterator();
    37. while (keyIterator.hasNext()) {
    38. //获取到SelectionKey
    39. SelectionKey key = keyIterator.next();
    40. //根据key,对应的通道发生的事件,做相应的处理
    41. if (key.isAcceptable()) {
    42. //如果是OP_ACCEPT,有新的客户端连接
    43. //给该客户端生成一个SocketChannel
    44. SocketChannel socketChannel = serverSocketChannel.accept();
    45. //将socketChannel设置为非阻塞模式
    46. socketChannel.configureBlocking(false);
    47. //将客户端的socketChannel也注册到selector,关注事件为SelectionKey.OP_READ
    48. //同时给该socketChannel关联一个buffer
    49. socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(10240));
    50. System.out.println("from " + socketChannel.hashCode() + " 客户端建立了连接");
    51. } else if (key.isReadable()) {
    52. //发生OP_READ
    53. //通过key反向获取到对应的channel
    54. SocketChannel socketChannel = (SocketChannel)key.channel();
    55. //获取到该channel关联的buffer
    56. ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
    57. byteBuffer.clear();
    58. //把当前通道的数据读到buffer中
    59. try {
    60. if (socketChannel.read(byteBuffer) == -1) {
    61. System.out.println("from " + socketChannel.hashCode() + " 客户端断开了连接");
    62. keyIterator.remove();
    63. socketChannel.close();
    64. continue;
    65. }
    66. //解析客户端数据
    67. //socket通讯格式可以自己定义:4字节报文长度+报文体
    68. String request = new String(byteBuffer.array(), 0, byteBuffer.position(), "utf-8");
    69. System.out.println("from " + socketChannel.hashCode() + " 客户端:" + request);
    70. //----------------------------------------------------------------//
    71. /**
    72. * 业务模块处理(能否做成异步,传入socketChannel对象)主线程只负责网络IO读写
    73. */
    74. //业务处理
    75. System.out.println("业务处理...");
    76. //使用新buffer返回
    77. ByteBuffer newByteBuffer = ByteBuffer.allocate(10240);
    78. newByteBuffer.clear();
    79. //获取返回数据
    80. String response = "result ok";
    81. newByteBuffer.put(response.getBytes("utf-8"));
    82. newByteBuffer.put((byte)'9');
    83. //注册写事件
    84. socketChannel.register(selector, SelectionKey.OP_WRITE, newByteBuffer);
    85. //----------------------------------------------------------------//
    86. } catch (IOException ioe) {
    87. ioe.printStackTrace();
    88. keyIterator.remove();
    89. socketChannel.close();
    90. continue;
    91. }
    92. } else if (key.isWritable()) {
    93. //通过key反向获取到对应的channel
    94. SocketChannel socketChannel = (SocketChannel)key.channel();
    95. //获取到该channel关联的buffer
    96. ByteBuffer byteBuffer = (ByteBuffer)key.attachment();
    97. //读写切换
    98. byteBuffer.flip(); //只有buffer出数据需要切换
    99. //把buffer数据写入到channel
    100. try {
    101. socketChannel.write(byteBuffer);
    102. } catch (IOException ioe) {
    103. ioe.printStackTrace();
    104. keyIterator.remove();
    105. socketChannel.close();
    106. continue;
    107. }
    108. String response = new String(byteBuffer.array(), 0, byteBuffer.position(), "utf-8");
    109. System.out.println("to " + socketChannel.hashCode() + " 服务端:" + response);
    110. //如果长连接继续注册事件等待
    111. //注册读事件
    112. //socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(10240));
    113. //如果短连接则断开
    114. socketChannel.close();
    115. } else if (key.isConnectable()) {
    116. System.out.println("Connectable");
    117. } else if (key.isValid()) {
    118. System.out.println("Valid");
    119. }
    120. //手动从集合中移除当前的SelectionKey,防止重复操作
    121. keyIterator.remove();
    122. }
    123. }
    124. }
    125. }

    NIOClient.java

    1. package netty.niostart;
    2. import java.net.InetSocketAddress;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.SocketChannel;
    5. public class NIOClient {
    6. public static void main(String[] args) throws Exception {
    7. //得到一个网络通道
    8. SocketChannel socketChannel = SocketChannel.open();
    9. //设置非阻塞模式
    10. socketChannel.configureBlocking(false);
    11. //提供服务器端的ip和端口
    12. InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
    13. //连接服务器
    14. if (!socketChannel.connect(inetSocketAddress)) {
    15. while(!socketChannel.finishConnect()) {
    16. System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作");
    17. }
    18. }
    19. //如果连接成功,就发送数据
    20. String str = "111";
    21. //客户端也要关联buffer
    22. ByteBuffer byteBuffer = ByteBuffer.wrap(str.getBytes("utf-8")); //根据字节数组产生buffer
    23. //Thread.sleep(30*1000); //模拟线程阻塞
    24. //发送数据
    25. //将buffer数据写入channel
    26. socketChannel.write(byteBuffer);
    27. //获取返回
    28. byteBuffer.clear();
    29. int numBytesRead;
    30. while ((numBytesRead = socketChannel.read(byteBuffer)) != -1) { //-1是读完
    31. if (numBytesRead == 0) { //0是读到0个
    32. if (byteBuffer.limit() == byteBuffer.position()) {
    33. byteBuffer.clear();
    34. }
    35. continue;
    36. }
    37. //buffer是数组用这个
    38. //System.out.println("客户端收到:" + new String(byteBuffer.array(), byteBuffer.arrayOffset(), byteBuffer.arrayOffset()+byteBuffer.position(), "utf-8"));
    39. //单个buffer用这个
    40. System.out.println("客户端收到:" + new String(byteBuffer.array(), 0, byteBuffer.position(), "utf-8"));
    41. }
    42. System.out.println("断开连接...");
    43. socketChannel.close();
    44. //System.in.read();
    45. }
    46. }

    二、用telnet测试
    1、打开cmd
    2、执行:chcp 65001,将编码方式改为“utf-8”
    3、telnet连接
    4、按Ctrl + ]
    5、执行send xxx
    6、但是telnet传输字符长度有限制?

    三、服务端设想

    1、前端请求建立socket连接
    2、网络通讯为NIO模型
    3、将请求消息和socketChannel对象传到一个队列
    4、业务模块维护一个线程池从队列获取请求并处理,还是一请求一线程模式
    5、业务线程处理完成后注册写事件,结束
    这样网络读写是异步的,通讯和业务处理也是异步的
     

  • 相关阅读:
    windows操作系统通过浏览器调用本地程序
    中电金信开启2023公益助学活动,以公益之光,守护童年梦想!
    一加手机 执行adb错误问题 需要权限
    基于Intel MediaSDK的低延迟编码实现
    记录下电脑windows安装Tina的过程
    软件测试周刊(第77期):只要放弃一次,就会滋生放弃的习性, 原本可以解决的问题也会变得无法解决。
    每天一点python——day74
    java使用stream流把集合中元素的属性空值赋值为0,BigDecimal类型属性使用reduce()自定义运算符,避免计算的时候导致报错
    D. Ball-(CDQ分治)
    基于享元模式实现连接池
  • 原文地址:https://blog.csdn.net/csj50/article/details/128132723