• Java 网络编程之TCP(五):分析服务端注册OP_WRITE写数据的各种场景(三)


    在服务端使用多线程对同个客户端进行读写,会带来意想不到的问题。

    前面的文章中,服务端都是在一个单线程main中,处理所有接收到的IO事件,为了提高效率,会自然的想到,为OP_READ和OP_WRITE事件分配多线程处理。

    需求:服务端把接收到的数据,原样返回给客户端

    服务端代码如下:

    直接在单线程的代码上,把单线程的read和write逻辑,放入一个单独的线程

    服务代码如下:

    1. import java.io.IOException;
    2. import java.net.InetSocketAddress;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.SelectionKey;
    5. import java.nio.channels.Selector;
    6. import java.nio.channels.ServerSocketChannel;
    7. import java.nio.channels.SocketChannel;
    8. import java.util.Iterator;
    9. import java.util.Set;
    10. public class SocketMultiplexingSingleThreadv2_2 {
    11. private ServerSocketChannel server = null;
    12. private Selector selector = null; //linux 多路复用器(select poll epoll) nginx event{}
    13. int port = 9090;
    14. public void initServer() {
    15. try {
    16. server = ServerSocketChannel.open();
    17. server.configureBlocking(false);
    18. server.bind(new InetSocketAddress(port));
    19. selector = Selector.open(); // select poll *epoll
    20. server.register(selector, SelectionKey.OP_ACCEPT);
    21. } catch (IOException e) {
    22. e.printStackTrace();
    23. }
    24. }
    25. public void start() {
    26. initServer();
    27. System.out.println("服务器启动了。。。。。");
    28. try {
    29. while (true) {
    30. // Set keys = selector.keys();
    31. // System.out.println(keys.size()+" size");
    32. while (selector.select(50) > 0) {
    33. Set selectionKeys = selector.selectedKeys();
    34. Iterator iter = selectionKeys.iterator();
    35. while (iter.hasNext()) {
    36. SelectionKey key = iter.next();
    37. iter.remove();
    38. if (key.isAcceptable()) {
    39. acceptHandler(key);
    40. } else if (key.isReadable()) {
    41. // key.cancel(); //现在多路复用器里把key cancel了
    42. System.out.println("in.....");
    43. readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发
    44. } else if(key.isWritable()){ //我之前没讲过写的事件!!!!!
    45. //写事件<-- send-queue 只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法
    46. //你真的要明白:什么时候写?不是依赖send-queue是不是有空间
    47. //1,你准备好要写什么了,这是第一步
    48. //2,第二步你才关心send-queue是否有空间
    49. //3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册
    50. //4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
    51. // key.cancel();
    52. key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
    53. writeHandler(key);
    54. }
    55. }
    56. }
    57. }
    58. } catch (IOException e) {
    59. e.printStackTrace();
    60. }
    61. }
    62. private void writeHandler(SelectionKey key) {
    63. new Thread(()->{
    64. System.out.println("write handler...");
    65. SocketChannel client = (SocketChannel) key.channel();
    66. ByteBuffer buffer = (ByteBuffer) key.attachment();
    67. buffer.flip();
    68. while (buffer.hasRemaining()) {
    69. try {
    70. int write = client.write(buffer);
    71. System.out.println("write " + Thread.currentThread().getName()+ " " + write);
    72. } catch (IOException e) {
    73. e.printStackTrace();
    74. }
    75. }
    76. try {
    77. Thread.sleep(2000);
    78. } catch (InterruptedException e) {
    79. e.printStackTrace();
    80. }
    81. buffer.clear();
    82. // key.cancel();
    83. // try {
    84. client.shutdownOutput();
    85. //
    86. client.close();
    87. //
    88. // } catch (IOException e) {
    89. // e.printStackTrace();
    90. // }
    91. }).start();
    92. }
    93. public void acceptHandler(SelectionKey key) {
    94. try {
    95. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    96. SocketChannel client = ssc.accept();
    97. client.configureBlocking(false);
    98. ByteBuffer buffer = ByteBuffer.allocate(8192);
    99. client.register(selector, SelectionKey.OP_READ, buffer);
    100. System.out.println("-------------------------------------------");
    101. System.out.println("新客户端:" + client.getRemoteAddress());
    102. System.out.println("-------------------------------------------");
    103. } catch (IOException e) {
    104. e.printStackTrace();
    105. }
    106. }
    107. public void readHandler(SelectionKey key) {
    108. new Thread(()->{
    109. System.out.println("read handler.....");
    110. SocketChannel client = (SocketChannel) key.channel();
    111. ByteBuffer buffer = (ByteBuffer) key.attachment();
    112. buffer.clear();
    113. int read = 0;
    114. try {
    115. while (true) {
    116. read = client.read(buffer);
    117. System.out.println("read " + Thread.currentThread().getName()+ " " + read);
    118. if (read > 0) {
    119. client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);
    120. } else if (read == 0) {
    121. break;
    122. } else {
    123. client.close();
    124. break;
    125. }
    126. }
    127. } catch (IOException e) {
    128. try {
    129. System.out.println("client " + client.getRemoteAddress() + " disconnected");
    130. client.close();
    131. } catch (IOException ex) {
    132. throw new RuntimeException(ex);
    133. }
    134. e.printStackTrace();
    135. }
    136. }).start();
    137. }
    138. public static void main(String[] args) {
    139. SocketMultiplexingSingleThreadv2_2 service = new SocketMultiplexingSingleThreadv2_2();
    140. service.start();
    141. }
    142. }

    测试:

    先启动一个服务端,再启动一个客户端,客户端发送数据

    服务端日志:

    1. 服务器启动了。。。。。
    2. -------------------------------------------
    3. 新客户端:/127.0.0.1:21598
    4. -------------------------------------------
    5. in.....
    6. in.....
    7. read handler.....
    8. in.....
    9. read handler.....
    10. read Thread-0 5
    11. read Thread-1 0
    12. read handler.....
    13. read Thread-2 0
    14. read Thread-0 0
    15. write handler...

    客户端日志:

    1. client connected to server
    2. 1234
    3. client receive data from consolejava.io.BufferedInputStream@6acfcaf3 : 1234

    可以看到,客户端发送数据,没有接收到服务端返回的数据;

    服务端接收到数据后,在写数据的时候,buffer中没有数据可写;

    再仔细看下服务端的日志,可以同个客户端只发送一条数据的时候,有3个线程来处理,其他两个线程读到的数据都是0;


    一个客户端的读事件,分配一个线程处理,但是线程还没处理完,下个读事件就来了,就又分配一个线程处理。。。而同一个客户端共享一个buffer,在register OP_READ的时候attach的。
    这样使得buffer中的数据还没来得及写出去,就被其他读线程给冲掉了(read == 0);

    tip:read事件来的时候,如果不读取数据,read事件会一直有的

    解决方法:不可以并发读同一个client, 在处理一个Client的 OP_READ的时候先取消 OP_READ的注册,读完了后,在注册一个 OP_READ

    新的服务端代码:

    1. import java.io.IOException;
    2. import java.net.InetSocketAddress;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.*;
    5. import java.util.Iterator;
    6. import java.util.Set;
    7. public class SocketMultiplexingSingleThreadv2 {
    8. private ServerSocketChannel server = null;
    9. private Selector selector = null; //linux 多路复用器(select poll epoll) nginx event{}
    10. int port = 9090;
    11. public void initServer() {
    12. try {
    13. server = ServerSocketChannel.open();
    14. server.configureBlocking(false);
    15. server.bind(new InetSocketAddress(port));
    16. selector = Selector.open(); // select poll *epoll
    17. server.register(selector, SelectionKey.OP_ACCEPT);
    18. } catch (IOException e) {
    19. e.printStackTrace();
    20. }
    21. }
    22. public void start() {
    23. initServer();
    24. System.out.println("服务器启动了。。。。。");
    25. try {
    26. while (true) {
    27. // Set keys = selector.keys();
    28. // System.out.println(keys.size()+" size");
    29. while (selector.select(50) > 0) {
    30. Set selectionKeys = selector.selectedKeys();
    31. Iterator iter = selectionKeys.iterator();
    32. while (iter.hasNext()) {
    33. SelectionKey key = iter.next();
    34. iter.remove();
    35. if (key.isAcceptable()) {
    36. acceptHandler(key);
    37. } else if (key.isReadable()) {
    38. // key.cancel(); //现在多路复用器里把key cancel了
    39. System.out.println("in.....");
    40. // 同一个Client,读之前先取消OP_READ,防止多线程冲突吹
    41. key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
    42. readHandler(key);//还是阻塞的嘛? 即便以抛出了线程去读取,但是在时差里,这个key的read事件会被重复触发
    43. } else if(key.isWritable()){ //我之前没讲过写的事件!!!!!
    44. //写事件<-- send-queue 只要是空的,就一定会给你返回可以写的事件,就会回调我们的写方法
    45. //你真的要明白:什么时候写?不是依赖send-queue是不是有空间
    46. //1,你准备好要写什么了,这是第一步
    47. //2,第二步你才关心send-queue是否有空间
    48. //3,so,读 read 一开始就要注册,但是write依赖以上关系,什么时候用什么时候注册
    49. //4,如果一开始就注册了write的事件,进入死循环,一直调起!!!
    50. // key.cancel();
    51. key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE);
    52. writeHandler(key);
    53. }
    54. }
    55. }
    56. }
    57. } catch (IOException e) {
    58. e.printStackTrace();
    59. }
    60. }
    61. private void writeHandler(SelectionKey key) {
    62. new Thread(()->{
    63. System.out.println("write handler...");
    64. SocketChannel client = (SocketChannel) key.channel();
    65. ByteBuffer buffer = (ByteBuffer) key.attachment();
    66. buffer.flip();
    67. while (buffer.hasRemaining()) {
    68. try {
    69. int write = client.write(buffer);
    70. System.out.println("write " + Thread.currentThread().getName()+ " " + write);
    71. } catch (IOException e) {
    72. e.printStackTrace();
    73. }
    74. }
    75. try {
    76. Thread.sleep(2000);
    77. } catch (InterruptedException e) {
    78. e.printStackTrace();
    79. }
    80. buffer.clear();
    81. // key.cancel();
    82. // try {
    83. client.shutdownOutput();
    84. //
    85. client.close();
    86. //
    87. // } catch (IOException e) {
    88. // e.printStackTrace();
    89. // }
    90. }).start();
    91. }
    92. public void acceptHandler(SelectionKey key) {
    93. try {
    94. ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    95. SocketChannel client = ssc.accept();
    96. client.configureBlocking(false);
    97. ByteBuffer buffer = ByteBuffer.allocate(8192);
    98. client.register(selector, SelectionKey.OP_READ, buffer);
    99. System.out.println("-------------------------------------------");
    100. System.out.println("新客户端:" + client.getRemoteAddress());
    101. System.out.println("-------------------------------------------");
    102. } catch (IOException e) {
    103. e.printStackTrace();
    104. }
    105. }
    106. public void readHandler(SelectionKey key) {
    107. new Thread(()->{
    108. System.out.println("read handler.....");
    109. SocketChannel client = (SocketChannel) key.channel();
    110. ByteBuffer buffer = (ByteBuffer) key.attachment();
    111. buffer.clear();
    112. int read = 0;
    113. try {
    114. while (true) {
    115. read = client.read(buffer);
    116. System.out.println(Thread.currentThread().getName()+ " " + read);
    117. if (read > 0) {
    118. // 同一个Client,读完数据后,要再次注册OP_READ,读后面发送过来的数据
    119. key.interestOps( SelectionKey.OP_READ);
    120. client.register(key.selector(), key.interestOps() + SelectionKey.OP_WRITE,buffer);
    121. } else if (read == 0) {
    122. break;
    123. } else {
    124. client.close();
    125. break;
    126. }
    127. }
    128. } catch (IOException e) {
    129. try {
    130. System.out.println("client " + client.getRemoteAddress() + " disconnected");
    131. client.close();
    132. } catch (IOException ex) {
    133. throw new RuntimeException(ex);
    134. }
    135. e.printStackTrace();
    136. }
    137. }).start();
    138. }
    139. public static void main(String[] args) {
    140. SocketMultiplexingSingleThreadv2 service = new SocketMultiplexingSingleThreadv2();
    141. service.start();
    142. }
    143. }

    测试:

    先启动一个服务端,再启动一个客户端1,客户端1发送数据

    服务端日志:

    1. 服务器启动了。。。。。
    2. -------------------------------------------
    3. 新客户端:/127.0.0.1:24029
    4. -------------------------------------------
    5. in.....
    6. read handler.....
    7. Thread-0 8
    8. Thread-0 0
    9. write handler...
    10. write Thread-1 8

    客户端1日志:

    1. client connected to server
    2. client1
    3. client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
    4. client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1

    可以看到,客户单和服务端都可以正常接收和发送数据。

    再添加一个客户端2,发送数据

    服务端日志:

    1. 服务器启动了。。。。。
    2. -------------------------------------------
    3. 新客户端:/127.0.0.1:24029
    4. -------------------------------------------
    5. in.....
    6. read handler.....
    7. Thread-0 8
    8. Thread-0 0
    9. write handler...
    10. write Thread-1 8
    11. -------------------------------------------
    12. 新客户端:/127.0.0.1:24105
    13. -------------------------------------------
    14. in.....
    15. read handler.....
    16. Thread-2 8
    17. Thread-2 0
    18. write handler...
    19. write Thread-3 8

    客户端2的日志:

    1. client connected to server
    2. client2
    3. client receive data from consolejava.io.BufferedInputStream@65231a33 : client2
    4. client receive data from serverjava.net.Socket$SocketInputStream@27f8302d data size:8: client2

    可以看到,客户端2和服务端都可以正常接收和发送数据。

    客户端1,再次发送数据

    客户端日志:

    1. client connected to server
    2. client1
    3. client receive data from consolejava.io.BufferedInputStream@65231a33 : client1
    4. client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:8: client1
    5. clent1_2
    6. client receive data from consolejava.io.BufferedInputStream@65231a33 : clent1_2
    7. client receive data from serverjava.net.Socket$SocketInputStream@4629104a data size:9: clent1_2

    服务端日志:

    1. 服务器启动了。。。。。
    2. -------------------------------------------
    3. 新客户端:/127.0.0.1:24029
    4. -------------------------------------------
    5. in.....
    6. read handler.....
    7. Thread-0 8
    8. Thread-0 0
    9. write handler...
    10. write Thread-1 8
    11. -------------------------------------------
    12. 新客户端:/127.0.0.1:24105
    13. -------------------------------------------
    14. in.....
    15. read handler.....
    16. Thread-2 8
    17. Thread-2 0
    18. write handler...
    19. write Thread-3 8
    20. in.....
    21. read handler.....
    22. Thread-4 9
    23. Thread-4 0
    24. write handler...
    25. write Thread-5 9

    从服务端日志中,可以看到,每个客户端的读事件,只有一个线程处理。

    整个处理流程是服务预期的。

  • 相关阅读:
    函数——两个数的合并
    One Last Kiss风格封面生成器;程序内存分析工具;Python入门课程资料;神经文本语音合成教程;前沿论文 | ShowMeAI资讯日报
    Promise笔记-同步回调-异步回调-JS中的异常error处理-Promis的理解和使用-基本使用-链式调用-七个关键问题
    课题学习(九)----阅读《导向钻井工具姿态动态测量的自适应滤波方法》论文笔记
    什么是RESTful API,Spring MVC如何支持RESTful架构
    麒麟系统安装找不到安装源!!!!设置基础软件仓库时出错
    复杂分数 马蹄集
    CTO 说了,用错@Autowired 和@Resource 的人可以领盒饭了
    Java 18 新功能介绍
    使用ElementUI结合Mock完成主页的搭建
  • 原文地址:https://blog.csdn.net/u013771019/article/details/138182198