• Java网络编程之阻塞式IO与非阻塞IO


    前言

    最近读完《Java网络编程》之后,感觉Java进行网络编程的时候,通道的设计非常巧妙,这里再进行一下记录

    前置知识

    Http1.1新特性

    • 持久连接
      建立连接,完成一次请求之后保持连接,不会跟之前一样立即关闭。
    • 块编码
      servlet容器可以在接收到一些字节之后就开始发送相应信息,不必等到收到所有信息。HTTP1.1通过transfer-encoding来标识分块发送,XX\r\n 其中XX标识还剩多少数据要传送。
    • 状态码100的使用
      发送较长请求体之前,发送一个短的试探请求,来查看服务器响应与否。如果拒绝接收,那么较长的请求体也没必要再发了,自然减少了没必要的开销。

    TCP/UDP

    传输层协议。

    TCP

    可靠、通信前需要建立连接、基于流、速度相对慢。对数据可靠性有要求的场景下使用,如邮件、HTTP等。

    UDP

    不可靠、不需要建立连接、基于块、速度快。对数据可靠性要求不高,允许些许丢失的场景下使用,如电话、电视等。

    套接字

    如果学过计算机网络,应该都知道通过IP和端口能唯一确认一个应用程序,而Java中的套接字(Socket)作为网络编程的核心类,自然是派生于这个基础知识的。这里不细说了,建议自己看一下源码中的构造方法。

    阻塞式IO vs. 非阻塞式IO

    这里基于多核处理器,不讲述之前的单一CPU的场景。

    阻塞式IO

    服务器Socket监听指定端口(特定应用程序),每个客户端(Socket)请求建立连接,那么在HTTP1.1及之后的版本里,只要客户端不关闭或者服务器不关闭请求,该线程就会一直维护这个连接,显然无法处理其他的事务了;

    虽然现在机器是多CPU,相对以前单一CPU,可以创建线程池让每个线程处理一个连接请求并进行响应,但是CPU终究是有限的,经不起这样子使用,更何况现在大并发动辄百万级别呢?

    非阻塞式IO

    终于Java1.4之后提出了NIO,通过Channel来进行通信。注意这里不再说Socket了,所有的客户端、服务器都指代Channel

    流程

    首先服务器通过监听指定端口,之后客户端发来请求,这个时候服务器不再直接进行连接,而是响应客户端一个端口信息(系统分配),让客户端之后去该端口获取信息,自己则继续监听其余请求了。那么服务器是怎么记录哪些端口已经准备好数据进行响应了、哪些客户端能够继续接收数据了呢?

    这里需要引出Selector类,上述所有过程中出现的客户端以及服务器都会向该Selector中注册自己,并且指明自己关注的主题,那么之后一旦这些客户端或者服务器关心的事情发生了,就会被收集出来进行特定的处理。

    这样子细想一个Selector的容量必然是远远高于你的CPU数量的,且起初响应端口,而不是准备数据直接写回,也减少阻塞的可能。

    案例

    阻塞式IO

    这里是开启了线程池

    package chapter09.demo05;
    
    import chapter09.ThreadPoolUtil;
    
    import java.io.*;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.URLConnection;
    import java.nio.charset.StandardCharsets;
    import java.nio.file.Files;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /**
     * @description 无论接收什么请求,都返回同一个文件;省略端口默认80,省略编码方式默认ASCII
     * @date:2022/10/28 15:39
     * @author: qyl
     */
    public class SingleFileHTTPServer {
        private static final Logger logger = Logger.getLogger("SingleFileHTTPServer");
        private static final ExecutorService pool = ThreadPoolUtil.getExecutorService();
    
        private final byte[] content;
        private final byte[] header;
        private final int port;
        private final String encoding;
    
    
        public SingleFileHTTPServer(String data, String encoding, String mimeType, int port) throws UnsupportedEncodingException {
            this(data.getBytes(encoding), encoding, mimeType, port);
        }
    
        /**
         * 将编码、首部、数据部分、mine类型
         * @param data
         * @param encoding
         * @param mimeType
         * @param port
         */
        public SingleFileHTTPServer(byte[] data, String encoding, String mimeType, int port) {
            this.content = data;
            this.port = port;
            this.encoding = encoding;
            String header = "HTTP/1.0 200 OK\r\n"
                    + "Server: OneFile 2.0\r\n"
                    + "Content-length: " + this.content.length + "\r\n"
                    + "Content-type: " + mimeType + "; charset=" + encoding + "\r\n\r\n";
            this.header = header.getBytes(StandardCharsets.US_ASCII);
        }
    
        /**
         * 不断监听客户请求,向线程池提交请求处理任务
         */
        public void start() {
            try (ServerSocket server = new ServerSocket(this.port)) {
                {
                    logger.info("Accepting connections on port " + server.getLocalPort());
                    logger.info("Data to be sent:");
                    logger.info(new String(this.content, encoding));
    
                    while (true) {
                        try (Socket conn = server.accept()) {
                            pool.submit(new HttpHandler(conn));
                        } catch (IOException ex) {
                            logger.log(Level.WARNING, "Exception accepting connection", ex);
                        } catch (RuntimeException ex) {
                            logger.log(Level.SEVERE, "Unexpected error", ex);
                        }
                    }
                }
            } catch (IOException ex) {
                logger.log(Level.SEVERE, "Could not start server", ex);
            }
        }
    
        /**
         * 请求处理
         */
        private class HttpHandler implements Callable<Void> {
            private final Socket conn;
    
            private HttpHandler(Socket conn) {
                this.conn = conn;
            }
    
            @Override
            public Void call() throws Exception {
                try {
                    OutputStream out = new BufferedOutputStream(conn.getOutputStream());
                    InputStream in = new BufferedInputStream(conn.getInputStream());
                    // 只读取第一行,这时我们需要的全部内容
                    StringBuilder request = new StringBuilder(80);
                    while (true) {
                        int c = in.read();
                        if (c != '\r' || c == '\n' || c == -1) break;
                        request.append((char) c);
                    }
                    // 如果时HTTP1.0或者以后的版本,则发送一个MIME首部
                    if (request.toString().indexOf("HTTP/") != -1) {
                        out.write(header);
                    }
                    out.write(content);
                    out.flush();
                } catch (IOException ex) {
                    logger.log(Level.WARNING, "Error writing to client", ex);
                } finally {
                    conn.close();
                }
                return null;
            }
        }
    
        public static void main(String[] args) {
            // 设置要监听的端口
            int port;
            try {
                port = Integer.parseInt(args[1]);
                if (port < 1 || port > 65535) port = 80;
            } catch (RuntimeException ex) {
                port = 80;
            }
            String encoding = "UTF-8";
            if (args.length > 2) encoding = args[2];
    
            try {
                // 读取文件内容
                Path path = Paths.get(args[0]);
                byte[] data = Files.readAllBytes(path);
                /**
                 * 获取文件内容类型
                 */
                String contentType = URLConnection.getFileNameMap().getContentTypeFor(args[0]);
                SingleFileHTTPServer server = new SingleFileHTTPServer(data, encoding, contentType, port);
                server.start();;
            } catch (IOException e) {
                logger.severe(e.getMessage());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144

    非阻塞IO

    package chapter11.demo02;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * 只用了一个线程来处理服务器接收请求与对现有的连接提供服务,但是多线程,高优先级接收新连接,低优先级对现有的连接提供服务性能更好。
     * @date:2022/10/29 21:36
     * @author: qyl
     */
    public class ChargenServer {
        public static int DEFAULT_PORT = 19;
    
        public static void main(String[] args) {
            int port;
            try {
                port = Integer.parseInt(args[0]);
            } catch (RuntimeException ex) {
                port = DEFAULT_PORT;
            }
            System.out.println("Listening for connections on port " + port);
    
            // 要发送的数组
            byte[] rotation = new byte[95 * 2];
    
            for (byte i = ' '; i <= '~'; i++) {
                rotation[i - ' '] = i;
                rotation[i + 95 - ' '] = i;
            }
    
            ServerSocketChannel serverChannel;
            Selector selector;
            try {
                serverChannel = ServerSocketChannel.open();
                // Java7之前
                // ServerSocket ss = serverChannel.socket();
                // SocketAddress address = new InetSocketAddress(port);
                // ss.bind(address);
                serverChannel.bind(new InetSocketAddress(port));
                serverChannel.configureBlocking(false);
                selector = Selector.open();
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
                return;
            }
    
            while (true) {
                try {
                    selector.select();
                } catch (IOException e) {
                    e.printStackTrace();
                    break;
                }
    
                Set<SelectionKey> readyKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = readyKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        if (key.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) key.channel();
                            SocketChannel client = server.accept();
                            System.out.println("Accepted connection from " + client);
                            client.configureBlocking(false);
                            SelectionKey key2 = client.register(selector, SelectionKey.OP_WRITE);
                            ByteBuffer buffer = ByteBuffer.allocate(74);
                            buffer.put(rotation, 0, 72);
                            buffer.put((byte) '\r');
                            buffer.put((byte) '\n');
                            buffer.flip();
                            key2.attach(buffer);
                        } else if (key.isWritable()) {
                            SocketChannel client = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            if (!buffer.hasRemaining()){
                                // 重新写入缓冲区 写位置position回到开头,覆盖原内容做准备
                                buffer.rewind();
                                // 得到上一次的首字符 position++
                                int first = buffer.get();
                                // 准备该笔那缓冲区中的数据 写位置position回到开头,覆盖原内容做准备
                                buffer.rewind();
                                // 找寻rotation中的新的首字符位置
                                int position = first - ' ' + 1;
                                // 将数据从rotation复制到缓冲区
                                buffer.put(rotation,position,72);
                                // 在缓冲区末尾存储一个行分隔符
                                buffer.put((byte) '\r');
                                buffer.put((byte) '\n');
                                // 准备缓冲区进行写入
                                buffer.flip();
                            }
                            // 将buffer读出 写到通道
                            client.write(buffer);
                        }
                    } catch (Exception e) {
                        key.cancel();
                        try {
                            key.channel().close();
                        } catch (IOException ex) {
                        }
                    }
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115

    更多的网络编程的知识都记录在了Java学习的net模块了,如果文章有误,欢迎指正!

  • 相关阅读:
    [manacher/二分+哈希]KFC Crazy Thursday 2022牛客多校第5场 G
    机器学习(3)
    Django select_related()方法
    MySQL数据库 -- 表的增删查改
    香橙派orangepi ubuntu 安装安装redis
    架构篇(七)安全架构
    微信小程序clearInterval无法关闭时间间隔器问题解决
    kerberos认证相关概念和流程
    互联网医院系统:数字化时代中医疗服务的未来
    壳聚糖-聚乙二醇-N-羟基琥珀酰亚胺|Chitosan-PEG-NHS
  • 原文地址:https://blog.csdn.net/weixin_43626356/article/details/127695316