• JavaIO流:BIO梳理


    BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。

    本篇内容包括:Java BIO 介绍、Java BIO 工作机制、BIO 案例。



    一、Java BIO 介绍

    Java BIO 就是传统的 java io 编程,其相关的类和接口在 java.io

    BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程池机制改善(实现多个客户连接服务器)。

    针对网络通信都是一请求一应答的方式,虽然简化了上层的应用开发,但在性能和可靠性方面存在着巨大瓶颈,试想一下如果每个请求都需要新建一个线程来专门处理,那么在高并发的场景下,机器资源很快就会被耗尽。

    BIO 通信模型图:

    img


    二、Java BIO 工作机制

    BIO模式下的基础工作机制

    对 BIO 编程流程的梳理

    1. 服务端启动一个 ServerSocket,注册端口,调用 accpet 方法监听客户端的 Socket 连接。
    2. 客户端启动 Socket 对服务器进行通信,默认情况下服务器端需要对每个客户建立一个线程与之通讯

    网络编程的基本模型是 Client/Server 模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定IP地址和端口),客户端通过连接操作向服务端监听的端口地址发起连接请求,基于 TCP 协议下进行三次握手连接,连接成功后,双方通过网络套接字(Socket)进行通信。

    传统的同步阻塞模型开发中,服务端 ServerSocket 负责绑定 IP 地址,启动监听端口;客户端 Socket 负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。

    基于 BIO 模式下的通信,客户端 - 服务端是完全同步,完全耦合的。


    三、BIO 案例

    1、实例1:传统的BIO编程

    服务端:

    import java.io.BufferedReader;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * 服务端
     **/
    public class ServerDemo {
        public static void main(String[] args) throws Exception {
            System.out.println("==服务器的启动==");
            // (1)注册端口
            ServerSocket serverSocket = new ServerSocket(8888);
            //(2)开始在这里暂停等待接收客户端的连接,得到一个端到端的Socket管道
            Socket socket = serverSocket.accept();
            //(3)从Socket管道中得到一个字节输入流。
            InputStream is = socket.getInputStream();
            //(4)把字节输入流包装成自己需要的流进行数据的读取。
            BufferedReader br = new BufferedReader(new InputStreamReader(is));
            //(5)读取数据
            String line;
            while ((line = br.readLine()) != null) {
                System.out.println("服务端收到:" + line);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    客户端:

    import java.io.OutputStream;
    import java.io.PrintStream;
    import java.net.Socket;
    /**
     *  目标: Socket 网络编程。
     *
     *  Java 提供了一个包:java.net 下的类都是用于网络通信。
     *  Java 提供了基于套接字(端口)Socket 的网络通信模式,我们基于这种模式就可以直接实现TCP通信。
     *  只要用 Socke t通信,那么就是基于 TCP 可靠传输通信。
     *
     *  功能1:客户端发送一个消息,服务端接口一个消息,通信结束!!
     *
     *  创建客户端对象:
     *  (1)创建一个 Socket 的通信管道,请求与服务端的端口连接。
     * (2)从 Socket 管道中得到一个字节输出流。
     * (3)把字节流改装成自己需要的流进行数据的发送
     * 创建服务端对象:
     * (1)注册端口
     * (2)开始等待接收客户端的连接,得到一个端到端的Socket管道
     * (3)从 Socket 管道中得到一个字节输入流。
     * (4)把字节输入流包装成自己需要的流进行数据的读取。
     *
     *  Socket的使用:
     *      构造器:public Socket(String host, int port)
     *      方法:  public OutputStream getOutputStream():获取字节输出流
     *      public InputStream getInputStream() :获取字节输入流
     *
     * ServerSocket的使用:
     *      构造器:public ServerSocket(int port)
     *
     *  小结:
     *      通信是很严格的,对方怎么发你就怎么收,对方发多少你就只能收多少!!
     */
    public class ClientDemo {
        public static void main(String[] args) throws Exception {
            System.out.println("==客户端的启动==");
            // (1)创建一个Socket的通信管道,请求与服务端的端口连接。
            Socket socket = new Socket("127.0.0.1",8888);
            // (2)从Socket通信管道中得到一个字节输出流。
            OutputStream os = socket.getOutputStream();
            // (3)把字节流改装成自己需要的流进行数据的发送
            PrintStream ps = new PrintStream(os);
            // (4)开始发送消息
            ps.println("我是客户端,我想约你吃小龙虾!!!");
            ps.println("我是客户端,我想约你吃大闸蟹!!!");
            Thread.sleep(1000);
            ps.println("我是客户端,我想约你吃油泼面!!!");
            ps.flush();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 在以上通信中,服务端会一致等待客户端的消息,如果客户端没有进行消息的发送,服务端将一直进入阻塞状态。
    • 同时服务端是按照行获取消息的,这意味着客户端也必须按照行进行消息的发送,否则服务端将进入等待消息的阻塞状态!
    2、实例2:BIO模式下多发和多收消息

    服务端:同实例1

    客户端:

    import java.io.OutputStream;
    import java.io.PrintStream;
    import java.net.Socket;
    import java.util.Scanner;
    
    /**
     * 目标: Socket网络编程。
     * 功能:客户端可以反复发消息,服务端可以反复收消息
     * 小结:通信是很严格的,对方怎么发你就怎么收,对方发多少你就只能收多少!!
     */
    public class ClientDemo {
        public static void main(String[] args) throws Exception {
            System.out.println("==客户端的启动==");
            // (1)创建一个Socket的通信管道,请求与服务端的端口连接。
            Socket socket = new Socket("127.0.0.1",8888);
            // (2)从Socket通信管道中得到一个字节输出流。
            OutputStream os = socket.getOutputStream();
            // (3)把字节流改装成自己需要的流进行数据的发送
            PrintStream ps = new PrintStream(os);
            // (4)开始发送消息
            Scanner sc = new Scanner(System.in);
            while(true){
                System.out.print("请说:");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    3、实例3:接受多个客户端

    服务端(同上):

    import java.io.OutputStream;
    import java.io.PrintStream;
    import java.net.Socket;
    import java.util.Scanner;
    
    /**
     * 目标: Socket网络编程。
     * 功能:客户端可以反复发,一个服务端可以接收无数个客户端的消息!!
     * 小结:服务器如果想要接收多个客户端,那么必须引入线程,一个客户端一个线程处理!!
     */
    public class ClientDemo {
        public static void main(String[] args) throws Exception {
            System.out.println("==客户端的启动==");
            // (1)创建一个Socket的通信管道,请求与服务端的端口连接。
            Socket socket = new Socket("127.0.0.1",7777);
            // (2)从Socket通信管道中得到一个字节输出流。
            OutputStream os = socket.getOutputStream();
            // (3)把字节流改装成自己需要的流进行数据的发送
            PrintStream ps = new PrintStream(os);
            // (4)开始发送消息
            Scanner sc = new Scanner(System.in);
            while(true){
                System.out.print("请说:");
                String msg = sc.nextLine();
                ps.println(msg);
                ps.flush();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    服务端:

    import java.io.BufferedReader;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * 服务端
     *
     * @author liziheng
     * @version 1.0.0
     * @date 2022-06-23 11:23 上午
     */
    public class ServerDemo {
        public static void main(String[] args) throws Exception {
            System.out.println("==服务器的启动==");
            // (1)注册端口
            ServerSocket serverSocket = new ServerSocket(7777);
            while (true) {
                //(2)开始在这里暂停等待接收客户端的连接,得到一个端到端的Socket管道
                Socket socket = serverSocket.accept();
                new ServerReadThread(socket).start();
                System.out.println(socket.getRemoteSocketAddress() + "上线了!");
            }
        }
    }
    
    class ServerReadThread extends Thread {
    
        private final Socket socket;
    
        public ServerReadThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                //(3)从Socket管道中得到一个字节输入流。
                InputStream is = socket.getInputStream();
                //(4)把字节输入流包装成自己需要的流进行数据的读取。
                BufferedReader br = new BufferedReader(new InputStreamReader(is));
                //(5)读取数据
                String line;
                while ((line = br.readLine()) != null) {
                    System.out.println("服务端收到:" + socket.getRemoteSocketAddress() + ":" + line);
                }
            } catch (Exception e) {
                System.out.println(socket.getRemoteSocketAddress() + "下线了!");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 每个Socket接收到,都会创建一个线程,线程的竞争、切换上下文影响性能;
    • 每个线程都会占用栈空间和CPU资源;
    • 并不是每个 socket 都进行 IO 操作,无意义的线程处理;
    • 客户端的并发访问增加时。服务端将呈现 1:1 的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。
    4、实例4:伪异步 I/O 编程

    在上述案例(实例3)中:客户端的并发访问增加时。服务端将呈现 1:1 的线程开销,访问量越大,系统将发生线程栈溢出,线程创建失败,最终导致进程宕机或者僵死,从而不能对外提供服务。

    接下来我们采用一个伪异步 I/O 的通信框架,采用线程池和任务队列实现,当客户端接入时,将客户端的 Socket 封装成一个 Task(该任务实现 java.lang.Runnable 线程任务接口)交给后端的线程池中进行处理。JDK 的线程池维护一个消息队列和 N 个活跃的线程,对消息队列中 Socket 任务进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

    image-20200619085953166

    客户端:

    import java.io.BufferedReader;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.PrintWriter;
    import java.net.Socket;
    
    public class Client {
        public static void main(String[] args) {
            try {
                // 1.简历一个与服务端的Socket对象:套接字
                Socket socket = new Socket("127.0.0.1", 9999);
                // 2.从socket管道中获取一个输出流,写数据给服务端
                OutputStream os = socket.getOutputStream() ;
                // 3.把输出流包装成一个打印流
                PrintWriter pw = new PrintWriter(os);
                // 4.反复接收用户的输入
                BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                String line = null ;
                while((line = br.readLine()) != null){
                    pw.println(line);
                    pw.flush();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    线程池:

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
    	线程池处理类
     **/
    public class HandlerSocketThreadPool {
    
        // 线程池
        private ExecutorService executor;
    
        public HandlerSocketThreadPool(int maxPoolSize, int queueSize){
    
            this.executor = new ThreadPoolExecutor(
                    3, // 8
                    maxPoolSize,
                    120L,
                    TimeUnit.SECONDS,
                    new ArrayBlockingQueue<Runnable>(queueSize) );
        }
    
        public void execute(Runnable task){
            this.executor.execute(task);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    服务端:

    import java.io.BufferedReader;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.Reader;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class Server {
        public static void main(String[] args) {
            try {
                System.out.println("----------服务端启动成功------------");
                ServerSocket ss = new ServerSocket(9999);
    
                // 一个服务端只需要对应一个线程池
                HandlerSocketThreadPool handlerSocketThreadPool =
                        new HandlerSocketThreadPool(3, 1000);
    
                // 客户端可能有很多个
                while(true){
                    Socket socket = ss.accept() ; // 阻塞式的!
                    System.out.println("有人上线了!!");
                    // 每次收到一个客户端的socket请求,都需要为这个客户端分配一个
                    // 独立的线程 专门负责对这个客户端的通信!!
                    handlerSocketThreadPool.execute(new ReaderClientRunnable(socket));
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    class ReaderClientRunnable implements Runnable{
    
        private Socket socket ;
    
        public ReaderClientRunnable(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                // 读取一行数据
                InputStream is = socket.getInputStream() ;
                // 转成一个缓冲字符流
                Reader fr = new InputStreamReader(is);
                BufferedReader br = new BufferedReader(fr);
                // 一行一行的读取数据
                String line = null ;
                while((line = br.readLine())!=null){ // 阻塞式的!!
                    System.out.println("服务端收到了数据:"+line);
                }
            } catch (Exception e) {
                System.out.println("有人下线了");
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 伪异步io采用了线程池实现,因此避免了为每个请求创建一个独立线程造成线程资源耗尽的问题,但由于底层依然是采用的同步阻塞模型,因此无法从根本上解决问题。
    • 如果单个消息处理的缓慢,或者服务器线程池中的全部线程都被阻塞,那么后续socket的i/o消息都将在队列中排队。新的Socket请求将被拒绝,客户端会发生大量连接超时。
    5、实例5:基于 BIO 形式下的文件上传

    客户端:

    import java.io.DataOutputStream;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.net.Socket;
    
    /**
     * 目标:实现客户端上传任意类型的文件数据给服务端保存起来。
     */
    public class Client {
        public static void main(String[] args) {
            try (
                    InputStream is = new FileInputStream("/Users/lizhengi/Downloads/BIO模式下的基础工作机制.png");
                ){
                //  1、请求与服务端的Socket链接
                Socket socket = new Socket("127.0.0.1", 8888);
                //  2、把字节输出流包装成一个数据输出流
                DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
                //  3、先发送上传文件的后缀给服务端
                dos.writeUTF(".png");
                //  4、把文件数据发送给服务端进行接收
                byte[] buffer = new byte[1024];
                int len;
                while ((len = is.read(buffer)) > 0) {
                    dos.write(buffer, 0, len);
                }
                dos.flush();
                Thread.sleep(10000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    服务端线程:

    import java.io.DataInputStream;
    import java.io.FileOutputStream;
    import java.io.OutputStream;
    import java.net.Socket;
    import java.util.UUID;
    
    public class ServerReaderThread extends Thread {
    
        private final Socket socket;
    
        public ServerReaderThread(Socket socket) {
            this.socket = socket;
        }
    
        @Override
        public void run() {
            try {
                // 1、得到一个数据输入流读取客户端发送过来的数据
                DataInputStream dis = new DataInputStream(socket.getInputStream());
                // 2、读取客户端发送过来的文件类型
                String suffix = dis.readUTF();
                System.out.println("服务端已经成功接收到了文件类型:" + suffix);
                // 3、定义一个字节输出管道负责把客户端发来的文件数据写出去
                OutputStream os = new FileOutputStream("/Users/lizhengi/test/iodemo/" +
                        UUID.randomUUID().toString() + suffix);
                // 4、从数据输入流中读取文件数据,写出到字节输出流中去
                byte[] buffer = new byte[1024];
                int len;
                while ((len = dis.read(buffer)) > 0) {
                    os.write(buffer, 0, len);
                }
                os.close();
                System.out.println("服务端接收文件保存成功!");
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    服务端:

    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * 目标:服务端开发,可以实现接收客户端的任意类型文件,并保存到服务端磁盘。
     */
    public class Server {
        public static void main(String[] args) {
            try {
                ServerSocket ss = new ServerSocket(8888);
                while (true) {
                    Socket socket = ss.accept();
                    // 交给一个独立的线程来处理与这个客户端的文件通信需求。
                    new ServerReaderThread(socket).start();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    客户端怎么发,服务端就怎么接收

  • 相关阅读:
    vue2 中this.$emit(“update:xx“,value)和xx.sync的用法
    船用低速发动机缸压在线监测系统
    Mybatis缓存机制
    Java入门,运行Java源码之前需要做些什么,命令行Hello World
    webpack优化篇(四十九):使用 webpack 进行图片压缩
    面试题整理:vue 的双向数据绑定的实现原理?
    教你用API插件开发一个AI快速处理图片小助手
    ElasticSearch+Kibana+Logstash实现日志可视化运维监控
    C++ merge()和inplace_merge()函数用法详解(深入了解,一文学会)
    数值分析学习笔记——误差【华科B站教程版本】
  • 原文地址:https://blog.csdn.net/weixin_45187434/article/details/128026700