• Socket网络编程(五)——TCP数据发送与接收并行


    主要实现需求

    多线程收发并行
    TCP多线程收发协作
    TCP 服务端收发并行重构

    TCP 服务端收发并行重构

    启动main方法重构

    原有的main逻辑如下:
    20240229-034932-Jk.png

    重构后如下:

    public class Server {
     
        public static void main(String[] args) throws IOException {
     
            TCPServer tcpServer = new TCPServer(TCPConstants.PORT_SERVER);
            boolean isSucceed = tcpServer.start();
            if(!isSucceed){
                System.out.println("Start TCP server failed.");
            }
            UDPProvider.start(TCPConstants.PORT_SERVER);
     
            // 键盘输入:
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            String str;
            do {
                str = bufferedReader.readLine();
                tcpServer.broadcast(str);
            } while (!"00bye00".equalsIgnoreCase(str));
     
            UDPProvider.stop();
            tcpServer.stop();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    重构后,从while循环不断读取键盘输入信息,当输入“00bye00” 时退出读取。此处只读取键盘输入数据,客户端发送的数据在会重新拆分出来新的线程单独处理。

    重构分离收发消息的操作

    创建 ClientHandler.java 重构收发消息操作:

    public class ClientHandler {
        private final Socket socket;
        private final ClientReadHandler readHandler;
        private final ClientWriteHandler writeHandler;
        private final CloseNotiry closeNotiry;
     
        public ClientHandler(Socket socket, CloseNotiry closeNotiry ) throws IOException {
            this.socket = socket;
            this.readHandler = new ClientReadHandler(socket.getInputStream());
            this.writeHandler = new ClientWriteHandler(socket.getOutputStream());
            this.closeNotiry = closeNotiry;
            System.out.println("新客户链接: " + socket.getInetAddress() + "\tP:" + socket.getPort());
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    重构接收消息的操作

        /**
         * 接收数据
         */
        class ClientReadHandler extends Thread {
     
            private boolean done = false;
            private final InputStream inputStream;
     
            ClientReadHandler(InputStream inputStream){
                this.inputStream = inputStream;
            }
            @Override
            public void run(){
                super.run();
                try {
                    // 得到输入流,用于接收数据
                    BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                    do {
                        // 客户端拿到一条数据
                        String str = socketInput.readLine();
                        if(str == null){
                            System.out.println("客户端已无法读取数据!");
                            // 退出当前客户端
                            ClientHandler.this.exitBySelf();
                            break;
                        }
                        // 打印到屏幕
                        System.out.println(str);
                    }while (!done);
                    socketInput.close();
                }catch (IOException e){
                    if(!done){
                        System.out.println("连接异常断开");
                        ClientHandler.this.exitBySelf();
                    }
                }finally {
                    // 连接关闭
                    CloseUtils.close(inputStream);
                }
            }
            void exit(){
                done = true;
                CloseUtils.close(inputStream);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    创建一个单独的线程进行接收消息,该线程不需要关闭。

    重构发送消息

        /**
         * 发送数据
         */
        class ClientWriteHandler {
            private boolean done = false;
            private final PrintStream printStream;
            private final ExecutorService executorService;
     
            ClientWriteHandler(OutputStream outputStream) {
                this.printStream = new PrintStream(outputStream);
                // 发送消息使用线程池来实现
                this.executorService = Executors.newSingleThreadExecutor();
            }
     
            void exit(){
                done = true;
                CloseUtils.close(printStream);
                executorService.shutdown();
            }
     
            void send(String str) {
                executorService.execute(new WriteRunnable(str));
            }
     
            class WriteRunnable implements  Runnable{
                private final String msg;
     
                WriteRunnable(String msg){
                    this.msg = msg;
                }
                @Override
                public void run(){
                    if(ClientWriteHandler.this.done){
                        return;
                    }
                    try {
                        ClientWriteHandler.this.printStream.println(msg);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    TCPServer调用发送消息的逻辑

        public void broadcast(String str) {
            for (ClientHandler client : clientHandlerList){
                // 发送消息
                client.send(str);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    监听客户端链接逻辑重构

        private List<ClientHandler> clientHandlerList = new ArrayList<>();
     
        /**
         * 监听客户端链接
         */
        private class ClientListener extends Thread {
            private ServerSocket server;
            private boolean done = false;
     
            private ClientListener(int port) throws IOException {
                server = new ServerSocket(port);
                System.out.println("服务器信息: " + server.getInetAddress() + "\tP:" + server.getLocalPort());
            }
     
            @Override
            public void run(){
                super.run();
     
                System.out.println("服务器准备就绪~");
                // 等待客户端连接
                do{
                    // 得到客户端
                    Socket client;
                    try {
                        client = server.accept();
                    }catch (Exception e){
                        continue;
                    }
                    try {
                        // 客户端构建异步线程
                        ClientHandler  clientHandler = new ClientHandler(client, handler -> clientHandlerList.remove(handler));
                        // 启动线程
                        clientHandler.readToPrint();
                        clientHandlerList.add(clientHandler);
                    } catch (IOException e) {
                        e.printStackTrace();
                        System.out.println("客户端连接异常: " + e.getMessage());
                    }
     
                }while (!done);
                System.out.println("服务器已关闭!");
            }
            void exit(){
                done = true;
                try {
                    server.close();
                }catch (IOException e){
                    e.printStackTrace();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    clientHandlerList作为已经建立了连接的客户端的集合,用于管理当前用户的信息。接收与发送都使用该集合。

    Socket、流的退出与关闭

        /**
         * 退出、关闭流
         */
        public void exit(){
            readHandler.exit();
            writeHandler.exit();
            CloseUtils.close(socket);
            System.out.println("客户端已退出:" + socket.getInetAddress() + "\tP:" + socket.getPort());
        }
     
        /**
         * 发送消息
         * @param str
         */
        public void send(String str){
            writeHandler.send(str);
        }
     
        /**
         * 接收消息
         */
        public void readToPrint() {
            readHandler.exit();
        }
     
        /**
         *  接收、发送消息异常,自动关闭
         */
        private void exitBySelf() {
            exit();
            closeNotiry.onSelfClosed(this);
        }
        /**
         *  关闭流
         */
        public interface CloseNotiry{
            void onSelfClosed(ClientHandler handler);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    TCP 客户端收发并行重构

    客户端 main函数重构

        public static void main(String[] args) {
            // 定义10秒的搜索时间,如果超过10秒未搜索到,就认为服务器端没有开机
            ServerInfo info = UDPSearcher.searchServer(10000);
            System.out.println("Server:" + info);
     
            if( info != null){
                try {
                    TCPClient.linkWith(info);
                }catch (IOException e){
                    e.printStackTrace();
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    客户端接收消息重构

        static class ReadHandler extends Thread{
            private boolean done = false;
            private final InputStream inputStream;
     
            ReadHandler(InputStream inputStream){
                this.inputStream = inputStream;
            }
            @Override
            public void run(){
                try {
                    // 得到输入流,用于接收数据
                    BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));
                    do {
                        // 客户端拿到一条数据
                        String str = null;
                        try {
                            str = socketInput.readLine();
                        }catch (SocketTimeoutException e){
     
                        }
                        if(str == null){
                            System.out.println("连接已关闭,无法读取数据!");
                            break;
                        }
                        // 打印到屏幕
                        System.out.println(str);
                    }while (!done);
                    socketInput.close();
                }catch (IOException e){
                    if(!done){
                        System.out.println("连接异常断开:" + e.getMessage());
                    }
                }finally {
                    // 连接关闭
                    CloseUtils.close(inputStream);
                }
            }
            void exit(){
                done = true;
                CloseUtils.close(inputStream);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    创建ReadHandler用单独的线程去接收服务端的消息。连接关闭则exit() 关闭客户端。

    客户端发送消息重构

        private static void write(Socket client) throws IOException {
            // 构建键盘输入流
            InputStream in = System.in;
            BufferedReader input = new BufferedReader(new InputStreamReader(in));
     
            // 得到Socket输出流,并转换为打印流
            OutputStream outputStream = client.getOutputStream();
            PrintStream socketPrintStream = new PrintStream(outputStream);
     
            boolean flag = true;
            do {
                // 键盘读取一行
                String str = input.readLine();
                // 发送到服务器
                socketPrintStream.println(str);
     
                // 从服务器读取一行
                if("00bye00".equalsIgnoreCase(str)){
                    break;
                }
            }while(flag);
            // 资源释放
            socketPrintStream.close();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在linkWith() 中调用write() 发送方法,由 do-while 循环读取本地键盘输入信息进行发送操作。当满足 “00bye00” 时,关闭循环,关闭socket连接,结束该线程。

    客户端 linkWith 主方法重构

         public static void linkWith(ServerInfo info) throws IOException {
            Socket socket = new Socket();
            // 超时时间
            socket.setSoTimeout(3000);
            // 端口2000;超时时间300ms
            socket.connect(new InetSocketAddress(Inet4Address.getByName(info.getAddress()),info.getPort()));//
     
            System.out.println("已发起服务器连接,并进入后续流程~");
            System.out.println("客户端信息: " + socket.getLocalAddress() + "\tP:" + socket.getLocalPort());
            System.out.println("服务器信息:" + socket.getInetAddress() + "\tP:" + socket.getPort());
     
            try {
                ReadHandler readHandler = new ReadHandler(socket.getInputStream());
                readHandler.start();
                // 发送接收数据
                write(socket);
            }catch (Exception e){
                System.out.println("异常关闭");
            }
     
            // 释放资源
            socket.close();
            System.out.println("客户端已退出~");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    原有的逻辑里,是调用 todo() 方法,在todo() 方法里同时进行收发操作。现在是进行读写分离。

    TCP 收发并行重构测试

    服务端重构后执行日志

    20240229-053719-hC.png

    客户端重构后执行日志

    20240229-053740-Qt.png

    源码下载

    下载地址:https://gitee.com/qkongtao/socket_study/tree/master/src/main/java/cn/kt/socket/SocketDemo_L5_TCP_Channel

  • 相关阅读:
    GAT解读graph attention network
    【Image captioning】ruotianluo/self-critical.pytorch之4—模型训练之train.py代码解析
    手撸nano-gpt
    Sentinel 规则持久化
    基于Java的Android计算器设计与实现
    Ceph入门到静态-deep scrub 深度清理处理
    敏捷开发失败的五个原因以及解决方案
    Rockland丨Rockland多克隆抗体阶段执行方案
    今年的 618 不行了?
    php常见的危险函数
  • 原文地址:https://blog.csdn.net/qq_42038623/article/details/136402787