• 【专栏】RPC系列(番外)-“土气”的IO实现


    公众号【离心计划】,掌握一手文章,一起离开地球表面

    【RPC系列合集】

    【专栏】RPC系列(理论)-夜的第一章

    【专栏】RPC系列(理论)-协议与序列化

    【专栏】RPC系列(理论)-动态代理

    【专栏】RPC系列(实战)-摸清RPC骨架

    【专栏】RPC系列(实战)-优雅的序列化

    【专栏】RPC系列(番外)-IO模型与线程模型

    前言

        前一章番外篇中我们讲解了几种IO模型和线程模型的区别,这一节主要我们动手实现一下从BIO到NIO的过程,实际感受一下。

    BIO

    单线程BIO

    1. public static void main(String[] args) {
    2. byte[] buffer = new byte[1024];
    3. try {
    4. ServerSocket serverSocket = new ServerSocket();
    5. System.out.println("绑定端口:8080");
    6. serverSocket.bind(new InetSocketAddress(8080));
    7. System.out.println("等待连接");
    8. while (true) {
    9. //阻塞点1:建立连接
    10. Socket accept = serverSocket.accept();
    11. System.out.println("-------------------------");
    12. System.out.println("建立连接成功,读取数据");
    13. //阻塞点2:等待数据
    14. accept.getInputStream().read(buffer);
    15. String data = new String(buffer);
    16. System.out.println("接收到数据:" + data);
    17. System.out.println("发送数据...");
    18. accept.getOutputStream().write("我收到数据啦".getBytes(StandardCharsets.UTF_8));
    19. System.out.println("发送完成");
    20. System.out.println("-------------------------");
    21. }
    22. } catch (Exception e) {
    23. e.printStackTrace();
    24. }
    25. }

        这是单线程阻塞IO的实现的Server,上面有两个阻塞点:accept和read。这也是我们在番外篇中理解的阻塞IO,由于没有连接的到来或者没有IO事件的到来就会阻塞在这里,然后我们关注一点ServerSocket的包路径:package java.net,并不是在NIO包下。

        这种单线程阻塞模型的问题很明显,只能连接一个客户端,其他的连接都会被阻塞。

    多线程BIO

        基于上面的问题,由于单线程导致的并发连接阻塞问题,那么我们给每个连接分配一个线程去处理不就行了?按照这个思路,代码就是这样

    1. public static void main(String[] args) {
    2. byte[] buffer = new byte[1024];
    3. try {
    4. ServerSocket serverSocket = new ServerSocket();
    5. System.out.println("绑定端口:8080");
    6. serverSocket.bind(new InetSocketAddress(8080));
    7. System.out.println("等待连接");
    8. while (true) {
    9. Socket accept = serverSocket.accept();
    10. //连接到了后分配一个线程处理这个连接后续的io操作
    11. Thread thread = new Thread(new Runnable() {
    12. @Override
    13. public void run()
    14. {
    15. long threadId = Thread.currentThread().getId();
    16. System.out.println("-------------------------");
    17. System.out.printf("[%s] 建立连接成功\n", threadId);
    18. while (true){
    19. try {
    20. int read = accept.getInputStream().read(buffer);
    21. if(read>0){
    22. String data = new String(buffer);
    23. System.out.println(String.format("[%s] 接收到数据:", threadId) + data);
    24. System.out.println("发送数据...");
    25. accept.getOutputStream().write(String.format("Hello!I have received your message:%s",data).getBytes(StandardCharsets.UTF_8));
    26. }else{
    27. accept.close();
    28. break;
    29. }
    30. } catch (IOException e) {
    31. e.printStackTrace();
    32. break;
    33. }
    34. System.out.println("发送完成");
    35. System.out.println("-------------------------");
    36. }
    37. }
    38. });
    39. thread.start();
    40. }
    41. } catch (Exception e) {
    42. e.printStackTrace();
    43. }
    44. }

        这样的做法在大量连接出现的情况下就会导致C10K问题,也就是服务器创建线程数量的上限引起的瓶颈,通常为了不浪费资源会使用线程池来做线程公用,但是存在两个基本问题:

    1. 线程模型问题。线程池中线程数量够还好,不够还是得阻塞。

    2. IO模型问题,accept这一步的阻塞,一定会导致并发连接出现等待的问题(再短也是等待)。read这一步的阻塞,依旧会导致大量线程的切换。

    NIO

        NIO与BIO的区别在于,accept与read不再阻塞。从阻塞的定义来看:调用方不等待被调用方的真实返回,所以是成立的。

    1. public static void main(String[] args) {
    2. ByteBuffer buffer = ByteBuffer.allocate(1024);
    3. try {
    4. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    5. System.out.println("绑定端口:8080");
    6. serverSocketChannel.bind(new InetSocketAddress(8080));
    7. serverSocketChannel.configureBlocking(false);
    8. System.out.println("等待连接");
    9. while (true) {
    10. SocketChannel socketChannel = serverSocketChannel.accept();
    11. if (socketChannel != null) {
    12. socketChannel.configureBlocking(false);
    13. System.out.println("-------------------------");
    14. System.out.println("建立连接成功,读取数据");
    15. int hasData = socketChannel.read(buffer);
    16. if (hasData != 0) {
    17. String data = new String(buffer.array());
    18. System.out.println("接收到数据:" + data);
    19. System.out.println("发送数据...");
    20. //重置buffer
    21. buffer.flip();
    22. buffer.put("我拿到数据啦".getBytes(StandardCharsets.UTF_8));
    23. socketChannel.write(buffer);
    24. System.out.println("发送完成");
    25. System.out.println("-------------------------");
    26. } else {
    27. System.out.println("还没数据啊");
    28. }
    29. } else {
    30. System.out.println("还没有客户端连接,但是不阻塞");
    31. Thread.sleep(2000);
    32. }
    33. }
    34. } catch (Exception e) {
    35. e.printStackTrace();
    36. }
    37. }

        我们看ServerSocketChannel的包路径在java.nio下面,说明这是nio的socket实现。当我们调用了accept时无论有无连接都会直接返回,这就是nio与bio的区别所在,没有连接就会返回null。

        这个是单线程的NIO模式,问题其实可以想象到,客户端A建立了连接后,客户端B也马上建立了连接,虽然他们之间不会阻塞了,但是Server还没等AB发送数据就已经下一次循环了,Server根本没记住A和B的连接。

    NIO

        要记住每个连接,必然要把连接存储下来,每个连接建立后,放到一个socketList里面,然后有连接进来就放到这个list中,每次判断后都隐形list的循环,看看其中的每个socket是否都有数据进来

    1. publicstatic void main(String[] args) throws InterruptedException {
    2. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    3. List socketList = newArrayList();
    4. try{
    5. //Java为非阻塞设置的类
    6. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    7. serverSocketChannel.bind(newInetSocketAddress(8080));
    8. //设置为非阻塞
    9. serverSocketChannel.configureBlocking(false);
    10. while(true) {
    11. SocketChannel socketChannel = serverSocketChannel.accept();
    12. if(socketChannel==null) {
    13. //表示没人连接
    14. System.out.println("还没连接");
    15. Thread.sleep(5000);
    16. }else{
    17. System.out.println("建立连接成功");
    18. socketList.add(socketChannel);
    19. }
    20. for(SocketChannel socket:socketList) {
    21. socket.configureBlocking(false);
    22. int effective = socket.read(byteBuffer);
    23. if(effective!=0) {
    24. byteBuffer.flip();//切换模式 写-->读
    25. String content = Charset.forName("UTF-8").decode(byteBuffer).toString();
    26. System.out.println("接收到消息:"+content);
    27. byteBuffer.clear();
    28. }else{
    29. System.out.println("没收到消息呢");
    30. }
    31. }
    32. }
    33. } catch(IOException e) {
    34. e.printStackTrace();
    35. }
    36. }

        其实这种“将连接暂存起来”的做法,就是IO多路复用的做法,上面我们看起来很挫的代码体现的也是复用的思想,即一个对外的socket可以同时保持多个连接,并且还能通过轮询的方式监听每个连接有没有数据。说到轮询,问题又来了,这边的for循环其实是同步的,也就意味着如果队列第一个接收的数据特别大,那么就会导致第二个连接的数据读取延迟,越往后延迟越严重,这倒是有点http队头阻塞的感觉,解决这个问题的办法其实在于使用正确的线程模型,也就是在上一篇番外篇中提到的Reactor模型,让处理连接和处理IO事件的线程分离,所以IO复用+Reactor正是最后的大招。

        在引出代码之前我们还需要回想一个问题:IO复用+Reactor的模式可以使用BIO么?BIO是阻塞的,accept和read都阻塞,那么拿accept来说,我们根本不可能有下面这段代码,因为阻塞被唤醒后必然已经是有连接进来了,read操作亦是如此,所以显然这个问题的答案是不能。

    1. if(socketChannel==null) {
    2. //表示没人连接
    3. System.out.println("还没连接");
    4. Thread.sleep(5000);
    5. }else{
    6. System.out.println("建立连接成功");
    7. socketList.add(socketChannel);
    8. }

    IO复用+Reactor

        这一块我们主要复现前一篇番外篇(再贴一下【专栏】RPC系列(番外)-IO模型与线程模型)中所说的三种Reactor线程模型,直观地感受一下,首先是单线程地Reactor

    Reactor单线程

    1. 创建监听的channel

    2. 创建selector,并注册channel的连接事件到selector上

    3. 调用selector.select()阻塞等待连接

    4. 分流连接channel的连接、可读、可写事件分别处理

    1. public class Server {
    2. private Selector selector;
    3. private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    4. private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
    5. String str;
    6. public void start() throws IOException {
    7. // 创建channel
    8. ServerSocketChannel ssc = ServerSocketChannel.open();
    9. // 服务器配置为非阻塞
    10. ssc.configureBlocking(false);
    11. ssc.bind(new InetSocketAddress("localhost", 8001));
    12. // 创建selector
    13. selector = Selector.open();
    14. // 将channel注册到selector并监听连接事件
    15. ssc.register(selector, SelectionKey.OP_ACCEPT);
    16. while (!Thread.currentThread().isInterrupted()) {
    17. //这一步是阻塞的
    18. selector.select();
    19. Set keys = selector.selectedKeys();
    20. Iterator keyIterator = keys.iterator();
    21. while (keyIterator.hasNext()) {
    22. SelectionKey key = keyIterator.next();
    23. if (!key.isValid()) {
    24. continue;
    25. }
    26. if (key.isAcceptable()) {
    27. accept(key);
    28. } else if (key.isReadable()) {
    29. read(key);
    30. } else if (key.isWritable()) {
    31. write(key);
    32. }
    33. keyIterator.remove();
    34. }
    35. }
    36. }
    37. private void write(SelectionKey key) throws IOException, ClosedChannelException {
    38. SocketChannel channel = (SocketChannel) key.channel();
    39. System.out.println("write:"+str);
    40. sendBuffer.clear();
    41. sendBuffer.put(str.getBytes());
    42. //转成读模式,重置position
    43. sendBuffer.flip();
    44. channel.write(sendBuffer);
    45. channel.register(selector, SelectionKey.OP_READ);
    46. }
    47. private void read(SelectionKey key) throws IOException {
    48. SocketChannel socketChannel = (SocketChannel) key.channel();
    49. this.readBuffer.clear();
    50. int numRead;
    51. try {
    52. numRead = socketChannel.read(this.readBuffer);
    53. } catch (IOException e) {
    54. key.cancel();
    55. socketChannel.close();
    56. return;
    57. }
    58. str = new String(readBuffer.array(), 0, numRead);
    59. System.out.println(str);
    60. socketChannel.register(selector, SelectionKey.OP_WRITE);
    61. }
    62. private void accept(SelectionKey key) throws IOException {
    63. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    64. SocketChannel clientChannel = ssc.accept();
    65. clientChannel.configureBlocking(false);
    66. //将这个连接channel注册到selector上
    67. clientChannel.register(selector, SelectionKey.OP_READ);
    68. System.out.println("[New Client Connected]"+clientChannel.getRemoteAddress());
    69. }
    70. public static void main(String[] args) throws IOException {
    71. System.out.println("Server started...");
    72. new Server().start();
    73. }
    74. }

        这里代码我们没有用复杂的Reactor模型,可以看到处理读写事件与select阻塞是一个线程,要想变成多线程Reactor模型,其实只要创建一个多线程worker线程池,在select后把处理IO事件的逻辑交给线程池处理就行了。

    单Reactor多线程Reactor

        为了不浪费篇幅,省去了部分去上面重复地代码。我们创建了一个线程池(按照规范不建议这样创建,这里为了方便)并接收IO任务、

    1. public class ServerMulti {
    2. private Selector selector;
    3. private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    4. private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
    5. private ExecutorService executorService = Executors.newFixedThreadPool(50);
    6. String str;
    7. public void start() throws IOException {
    8. // 创建channel
    9. ServerSocketChannel ssc = ServerSocketChannel.open();
    10. // 服务器配置为非阻塞
    11. ssc.configureBlocking(false);
    12. ssc.bind(new InetSocketAddress("localhost", 8001));
    13. // 通过open()方法找到Selector
    14. selector = Selector.open();
    15. // 注册连接事件到selector
    16. ssc.register(selector, SelectionKey.OP_ACCEPT);
    17. while (!Thread.currentThread().isInterrupted()) {
    18. //这一步是阻塞的
    19. selector.select();
    20. Set keys = selector.selectedKeys();
    21. Iterator keyIterator = keys.iterator();
    22. while (keyIterator.hasNext()) {
    23. SelectionKey key = keyIterator.next();
    24. if (!key.isValid()) {
    25. continue;
    26. }
    27. if (key.isAcceptable()) {
    28. accept(key);
    29. } else if (key.isReadable()) {
    30. executorService.submit(() -> {
    31. try {
    32. read(key);
    33. write(key);
    34. } catch (IOException e) {
    35. e.printStackTrace();
    36. }
    37. });
    38. }
    39. keyIterator.remove();
    40. }
    41. }
    42. }
    43. private void write(SelectionKey key) throws IOException {
    44. SocketChannel channel = (SocketChannel) key.channel();
    45. String data = "Hello Client";
    46. System.out.println("write:" + data);
    47. sendBuffer.clear();
    48. sendBuffer.put(data.getBytes());
    49. //转成读模式,重置position
    50. sendBuffer.flip();
    51. channel.write(sendBuffer);
    52. channel.register(selector, SelectionKey.OP_READ);
    53. }
    54. }

    主从Reactor多线程Reactor

        主从Reactor其实是将Reactor拆分,主Reactor只负责获取连接,从Reactor负责监听连接事件的IO事件并分配给工作线程处理对应的IO事件,为了节省篇幅去除了与上面重复的方法。

    1. public class MasterSlaveServer {
    2. static ExecutorService followerExecutor = Executors.newFixedThreadPool(10);
    3. static ExecutorService workerExecutor = Executors.newFixedThreadPool(10);
    4. static class MasterReactor {
    5. private Selector selector;
    6. int slaveNum = 4;
    7. private List slaveReactors = new ArrayList<>(slaveNum);
    8. public void start() throws IOException {
    9. Selector selector = Selector.open();
    10. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    11. serverSocketChannel.configureBlocking(false);
    12. serverSocketChannel.socket().bind(new InetSocketAddress(8001));
    13. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    14. //初始化slaveReactor
    15. for (int i = 0; i < slaveNum; i++) {
    16. SlaveReactor reactor = new SlaveReactor();
    17. slaveReactors.add(reactor);
    18. reactor.run();
    19. }
    20. int index = 0;
    21. while (!Thread.currentThread().isInterrupted()) {
    22. selector.select();
    23. Set keys = selector.selectedKeys();
    24. Iterator keyIterator = keys.iterator();
    25. while (keyIterator.hasNext()) {
    26. SelectionKey key = keyIterator.next();
    27. if (key.isValid() && key.isAcceptable()) {
    28. //为新连接创建channel并分配给slaveReactor
    29. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    30. SocketChannel clientChannel = ssc.accept();
    31. clientChannel.configureBlocking(false);
    32. SlaveReactor slaveReactor = slaveReactors.get(index++ % slaveNum);
    33. slaveReactor.registerChannel(clientChannel);
    34. }
    35. keyIterator.remove();
    36. }
    37. }
    38. }
    39. }
    40. static class SlaveReactor {
    41. private Selector selector;
    42. public SlaveReactor() throws IOException {
    43. selector = Selector.open();
    44. run();
    45. }
    46. //注册连接channel到当前SlaveReactor
    47. public void registerChannel(SocketChannel socketChannel) throws ClosedChannelException {
    48. socketChannel.register(selector, SelectionKey.OP_READ);
    49. }
    50. public void run() {
    51. followerExecutor.submit(() -> {
    52. while (!Thread.currentThread().isInterrupted()) {
    53. try {
    54. //这一步是阻塞的
    55. selector.select(500);
    56. } catch (IOException e) {
    57. e.printStackTrace();
    58. }
    59. Set keys = selector.selectedKeys();
    60. Iterator keyIterator = keys.iterator();
    61. while (keyIterator.hasNext()) {
    62. SelectionKey key = keyIterator.next();
    63. if (!key.isValid()) {
    64. continue;
    65. }
    66. if (key.isReadable()) {
    67. workerExecutor.submit(() -> {
    68. try {
    69. read(key);
    70. write(key);
    71. } catch (IOException e) {
    72. e.printStackTrace();
    73. }
    74. });
    75. }
    76. keyIterator.remove();
    77. }
    78. }
    79. });
    80. }
    81. }
    82. public static void main(String[] args) throws IOException {
    83. MasterReactor masterReactor = new MasterReactor();
    84. masterReactor.start();
    85. }
    86. }

    小结

       今天我们对照上一篇番外篇文章实现了各种IO模型与线程模型,相信大家对IO方面有了更深的认识,这是为更容易上手Netty,Netty也是基于java nio,做了很多抽象与管理工作,因此也产出了很多组件,ok周末快乐~

  • 相关阅读:
    初识 Node.js 与内置模块:fs 文件系统模块及考试成绩整理案例
    云服务器上传文件到阿里云盘
    周赛补题
    LCR 136. 删除链表的节点
    把图片压缩成指定大小,释放你的内存空间
    XGBoost论文翻译
    逻辑器件与热插拔
    系列十二、强引用、软引用、弱引用、虚引用分别是什么?
    centos7安装MySQL—以MySQL5.7.30为例
    SpringBoot之MVC配置(WebMvcConfigurer详解)
  • 原文地址:https://blog.csdn.net/scwMason/article/details/126914856