• Java I/O(四)示例代码


    示例代码

    1 客户端/服务端聊天程序

    以下这些例子分别使用Java 中的BIO,NIO,AIO,实现了一个简单的客户端/服务端聊天系统。

    1.1 Java BIO 基于TCP协议的服务端,客户端通信(单线程)

    package com.qupeng.io.bio.tcp.singlethread;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    public class SocketServer {
        public static void main(String[] args) {
            try (ServerSocket serverSocket = new ServerSocket(8080)) {
                System.out.println("Server started.");
                while (true) {
                    try (Socket socket = serverSocket.accept();
                         BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                         PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {
                        StringBuilder clientMessage = new StringBuilder();
                        bufferedReader.lines().forEach(content -> clientMessage.append(content));
                        System.out.println("From socket client: " + clientMessage.toString());
                        printWriter.write("Hello, socket client. Got your message: " + clientMessage.toString());
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    package com.qupeng.io.bio.tcp.singlethread;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.net.UnknownHostException;
    import java.util.Scanner;
    
    public class SocketClient {
        public static void main(String[] args) {
            System.out.println("Client start.");
            System.out.println("Pls type here: ");
            try (Scanner scanner = new Scanner(System.in)) {
                while (scanner.hasNextLine()) {
                    try (Socket socket = new Socket("localhost", 8080);
                         PrintWriter printWriter = new PrintWriter(socket.getOutputStream());) {
                        String input = scanner.nextLine();
                        printWriter.write(input);
                        printWriter.flush();
                        socket.shutdownOutput();
    
                        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
                            bufferedReader.lines().forEach(content -> System.out.println("From socket server: " + content));
                            socket.shutdownInput();
                        }
                    } catch (UnknownHostException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Pls type here: ");
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    1.2 Java BIO 基于TCP协议的服务端,客户端通信(多线程)

    package com.qupeng.io.bio.tcp.multithread;
    
    import com.qupeng.concurrent.thread.MyThreadFactory;
    import com.qupeng.concurrent.thread.ThreadUtils;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class SocketServer {
        public static void main(String[] args) throws InterruptedException {
            ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2, new MyThreadFactory());
            try (ServerSocket serverSocket = new ServerSocket(8080)) {
                System.out.println("Server started.");
                threadPoolExecutor.execute(new ServerSocketRunnable(serverSocket));
                threadPoolExecutor.execute(new ServerSocketRunnable(serverSocket));
                while (true) {
                    Thread.sleep(5000);
                    ThreadUtils.printThreadMessage("Waiting for client connecting...");
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            threadPoolExecutor.shutdown();
        }
    }
    
    class ServerSocketRunnable implements Runnable {
    
        ServerSocket serverSocket;
    
        boolean interrupted = false;
    
        public ServerSocketRunnable(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }
    
        @Override
        public void run() {
            while (!interrupted) {
                try (Socket socket = serverSocket.accept();
                     BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                     PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {
                    ThreadUtils.printThreadMessage("Thread id: " + Thread.currentThread().getId());
                    StringBuilder clientMessage = new StringBuilder();
                    bufferedReader.lines().forEach(content -> clientMessage.append(content));
                    ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
                    printWriter.write("Hello, socket client. Got your message: " + clientMessage.toString());
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    package com.qupeng.io.bio.tcp.multithread;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.PrintWriter;
    import java.net.Socket;
    import java.net.UnknownHostException;
    import java.util.Scanner;
    
    public class SocketClient {
        public static void main(String[] args) {
            SocketClient.run();
        }
    
        public static void run() {
            System.out.println("Client start.");
            System.out.println("Pls type here: ");
            try (Scanner scanner = new Scanner(System.in)) {
                while (scanner.hasNextLine()) {
                    try (Socket socket = new Socket("localhost", 8080);
                         PrintWriter printWriter = new PrintWriter(socket.getOutputStream());) {
                        String input = scanner.nextLine();
                        printWriter.write(input);
                        printWriter.flush();
                        socket.shutdownOutput();
    
                        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
                            bufferedReader.lines().forEach(content -> System.out.println("From socket server: " + content));
                            socket.shutdownInput();
                        }
                    } catch (UnknownHostException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Pls type here: ");
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    package com.qupeng.io.bio.tcp.multithread;
    
    public class SocketClient1 {
        public static void main(String[] args) {
            SocketClient.run();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.3 Java BIO 基于UDP协议的服务端,客户端通信

    package com.qupeng.io.bio.udp;
    
    import java.io.IOException;
    import java.net.DatagramPacket;
    import java.net.DatagramSocket;
    import java.net.SocketException;
    
    public class SocketServer {
        public static void main(String[] args) {
            System.out.println("Server start.");
            while (true) {
                DatagramPacket datagramPacket = new DatagramPacket(new byte[1024], 1024);
                try (DatagramSocket datagramSocket = new DatagramSocket(8088)) {
                    datagramSocket.receive(datagramPacket);
                    String message = new String(datagramPacket.getData());
                    System.out.println(message);
                } catch (SocketException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    package com.qupeng.io.bio.udp;
    
    import java.io.IOException;
    import java.net.*;
    import java.util.Scanner;
    
    public class SocketClient {
        public static void main(String[] args) {
            System.out.println("Client start.");
            try (Scanner scanner = new Scanner(System.in)) {
                while (scanner.hasNextLine()) {
                    String input = scanner.nextLine();
                    try {
                        InetAddress inetAddress = InetAddress.getByName("localhost");
                        DatagramPacket datagramPacket = new DatagramPacket(input.getBytes(), input.getBytes().length, inetAddress, 8088);
                        try (DatagramSocket datagramSocket = new DatagramSocket()) {
                            datagramSocket.send(datagramPacket);
                        }
                    } catch (UnknownHostException e) {
                        e.printStackTrace();
                    } catch (SocketException e) {
                        e.printStackTrace();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    1.4 Java NIO 基于TCP协议的服务端,客户端通信(阻塞)

    package com.qupeng.io.nio.blocking;
    
    import com.qupeng.io.nio.Utils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    
    public class SocketServer {
    
        public static void main(String[] args) {
            try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080))
            ) {
                System.out.println("Server started.");
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                Utils.printBufferInfo(byteBuffer);
                while (true) {
                    try (SocketChannel socketChannel = serverSocketChannel.accept()) {
                        int readCount = socketChannel.read(byteBuffer);
                        Utils.printBufferInfo(byteBuffer);
                        if (-1 != readCount) {
                            System.out.println("Got " + readCount + " bytes from server.");
                            StringBuilder clientMessage = new StringBuilder();
                            byteBuffer.flip();
                            Utils.printBufferInfo(byteBuffer);
                            while (byteBuffer.hasRemaining()) {
                                clientMessage.append(byteBuffer.getChar());
                                Utils.printBufferInfo(byteBuffer);
                            }
                            System.out.println("From socket client: " + clientMessage.toString());
                        }
    
                        byteBuffer.clear();
                        Utils.printBufferInfo(byteBuffer);
    
                        byteBuffer.asCharBuffer().put("0123456789-0123456789-0123456789-0123456789");
                        while (byteBuffer.hasRemaining()) {
                            int writeCount = socketChannel.write(byteBuffer);
                            Utils.printBufferInfo(byteBuffer);
                            System.out.println("Sent " + writeCount + " bytes to client.");
                        }
    
                        byteBuffer.clear();
                        Utils.printBufferInfo(byteBuffer);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    package com.qupeng.io.nio.blocking;
    
    import com.qupeng.io.nio.Utils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    
    public class SocketClient {
    
        public static void main(String[] args) {
            try (Scanner scanner = new Scanner(System.in)) {
                System.out.println("Client started.");
                System.out.println("Pls type here: ");
                while (scanner.hasNextLine()) {
                    String input = scanner.nextLine();
                    try (SocketChannel socketChannel = SocketChannel.open()) {
                        if (socketChannel.connect(new InetSocketAddress("localhost", 8080))) {
                            ByteBuffer byteBuffer = ByteBuffer.allocate(32);
                            Utils.printBufferInfo(byteBuffer);
                            byteBuffer.asCharBuffer().put(input);
                            Utils.printBufferInfo(byteBuffer);
                            while (byteBuffer.hasRemaining()) {
                                int writeCount = socketChannel.write(byteBuffer);
                                Utils.printBufferInfo(byteBuffer);
                                System.out.println("Sent " + writeCount + " bytes to server.");
                            }
    
                            byteBuffer.clear();
                            Utils.printBufferInfo(byteBuffer);
    
                            StringBuilder serverMessage = new StringBuilder();
                            int readCount = socketChannel.read(byteBuffer);
                            do {
                                if (-1 != readCount) {
                                    System.out.println("Got " + readCount + " bytes from server.");
                                    byteBuffer.flip();
                                    while (byteBuffer.hasRemaining()) {
                                        serverMessage.append(byteBuffer.getChar());
                                        Utils.printBufferInfo(byteBuffer);
                                    }
                                }
                                byteBuffer.clear();
                            } while (-1 != socketChannel.read(byteBuffer));
    
                            System.out.println("From socket server: " + serverMessage.toString());
    
                            byteBuffer.clear();
                            Utils.printBufferInfo(byteBuffer);
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Pls type here: ");
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    package com.qupeng.io.nio;
    
    import java.nio.ByteBuffer;
    
    public class Utils {
        static boolean isPrintBufferInfo = false;
        public static void printBufferInfo(ByteBuffer byteBuffer) {
            if (isPrintBufferInfo) {
                System.out.println(String.format("ByteBuffer: {position: %d, limit: %d, capacity: %d}", byteBuffer.position(), byteBuffer.limit(), byteBuffer.capacity()));
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    1.5 Java NIO 基于TCP协议的服务端,客户端通信(非阻塞)

    package com.qupeng.io.nio.nonblocking;
    
    import com.qupeng.io.nio.Utils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    
    public class SocketServer {
    
        public static void main(String[] args) throws InterruptedException {
            try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080))
            ) {
                serverSocketChannel.configureBlocking(false);
                System.out.println("Server started.");
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                Utils.printBufferInfo(byteBuffer);
                while (true) {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    while (null == socketChannel) {
                        System.out.println("Waiting for client connecting ...");
                        Thread.sleep(1000);
                        socketChannel = serverSocketChannel.accept();
                    }
                    int readCount = socketChannel.read(byteBuffer);
                    Utils.printBufferInfo(byteBuffer);
                    if (-1 != readCount) {
                        System.out.println("Got " + readCount + " bytes from client.");
                        StringBuilder clientMessage = new StringBuilder();
                        byteBuffer.flip();
                        Utils.printBufferInfo(byteBuffer);
                        while (byteBuffer.hasRemaining()) {
                            clientMessage.append(byteBuffer.getChar());
                            Utils.printBufferInfo(byteBuffer);
                        }
                        System.out.println("From socket client: " + clientMessage.toString());
                    }
    
                    byteBuffer.clear();
                    Utils.printBufferInfo(byteBuffer);
    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                    byteBuffer.asCharBuffer().put("0123456789-0123456789-0123456789-0123456789");
                    while (byteBuffer.hasRemaining()) {
                        int writeCount = socketChannel.write(byteBuffer);
                        Utils.printBufferInfo(byteBuffer);
                        System.out.println("Sent " + writeCount + " bytes to client.");
                    }
                    socketChannel.shutdownOutput();
                    socketChannel.finishConnect();
    
                    byteBuffer.clear();
                    Utils.printBufferInfo(byteBuffer);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    package com.qupeng.io.nio.nonblocking;
    
    import com.qupeng.io.nio.Utils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    
    public class SocketClient {
    
        public static void main(String[] args) throws InterruptedException {
            try (Scanner scanner = new Scanner(System.in)) {
                System.out.println("Client started.");
                System.out.println("Pls type here: ");
                while (scanner.hasNextLine()) {
                    String input = scanner.nextLine();
                    try (SocketChannel socketChannel = SocketChannel.open()) {
                        socketChannel.configureBlocking(false);
                        boolean connectingResult = socketChannel.connect(new InetSocketAddress("localhost", 8080));
                        while (!socketChannel.finishConnect()) {
                            System.out.println("Connection to server, pls wait ...");
                        }
                        ByteBuffer byteBuffer = ByteBuffer.allocate(32);
                        Utils.printBufferInfo(byteBuffer);
                        byteBuffer.asCharBuffer().put(input);
                        Utils.printBufferInfo(byteBuffer);
                        while (byteBuffer.hasRemaining()) {
                            int writeCount = socketChannel.write(byteBuffer);
                            Utils.printBufferInfo(byteBuffer);
                            System.out.println("Sent " + writeCount + " bytes to server.");
                        }
    
                        byteBuffer.clear();
                        Utils.printBufferInfo(byteBuffer);
    
                        StringBuilder serverMessage = new StringBuilder();
                        int readCount = socketChannel.read(byteBuffer);
                        while (readCount <= 0) {
                            System.out.println("Waiting for data from server ...");
                            Thread.sleep(500);
                            readCount = socketChannel.read(byteBuffer);
                            Utils.printBufferInfo(byteBuffer);
                        }
                        do {
                            if (-1 != readCount) {
                                byteBuffer.flip();
                                while (byteBuffer.hasRemaining()) {
                                    serverMessage.append(byteBuffer.getChar());
                                    Utils.printBufferInfo(byteBuffer);
                                }
                            }
                            byteBuffer.clear();
                        } while (-1 != socketChannel.read(byteBuffer));
    
                        System.out.println("From socket server: " + serverMessage.toString());
    
                        byteBuffer.clear();
                        Utils.printBufferInfo(byteBuffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Pls type here: ");
                }
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69

    1.6 NIO - Java NIO 基于多路复用的服务端,客户端通信(非阻塞,单线程)

    package com.qupeng.io.nio.multiplex.singlethread;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    
    public class SocketServer {
        public static void main(String[] args) {
            try (ServerSocketChannel serverSocketChannel = (ServerSocketChannel)ServerSocketChannel.open().bind(new InetSocketAddress(8080)).configureBlocking(false);
                 Selector selector = Selector.open()) {
                System.out.println("Server started.");
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                while (true) {
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isAcceptable()) {
                            SocketChannel channel = (SocketChannel)((ServerSocketChannel)selectionKey.channel()).accept().configureBlocking(false);
                            channel.configureBlocking(false);
                            channel.register(selector,SelectionKey.OP_READ);
                        } else if (selectionKey.isReadable()) {
                            SocketChannel channel = (SocketChannel)selectionKey.channel();
                            int readCount = channel.read(byteBuffer);
                            byteBuffer.flip();
                            StringBuilder clientMessage = new StringBuilder();
                            if (-1 != readCount) {
                                while (byteBuffer.hasRemaining()) {
                                    clientMessage.append(byteBuffer.getChar());
                                }
                                System.out.println("From socket client: " + clientMessage);
                                selectionKey.attach(clientMessage.toString().trim());
                                selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
                            }
                            selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ);
                        } else if (selectionKey.isWritable()) {
                            SocketChannel channel = (SocketChannel)selectionKey.channel();
                            byteBuffer.compact();
                            byteBuffer.asCharBuffer().put("Hello client. Got your message: " + selectionKey.attachment());
                            channel.write(byteBuffer);
                            byteBuffer.clear();
                            selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
                        }
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    package com.qupeng.io.nio.multiplex.singlethread;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    public class SocketClient {
        public static void main(String[] args) {
            try (Scanner scanner = new Scanner(System.in)) {
                System.out.println("Client started.");
                System.out.println("Pls type here: ");
                while (scanner.hasNextLine()) {
                    String intput = scanner.nextLine();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    try (SocketChannel socketChannel = (SocketChannel)SocketChannel.open().configureBlocking(false);
                         Selector selector = Selector.open()) {
                        socketChannel.register(selector, SelectionKey.OP_CONNECT);
                        socketChannel.connect(new InetSocketAddress("localhost", 8080));
    
                        boolean isNotGetResponse = true;
                        while (isNotGetResponse) {
                            int channelCount = selector.select();
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> iterator = selectionKeys.iterator();
                            while (iterator.hasNext()) {
                                SelectionKey selectionKey = iterator.next();
                                if (selectionKey.isConnectable()) {
                                    SocketChannel channel = (SocketChannel)selectionKey.channel();
                                    if (channel.finishConnect()) {
                                        selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
                                        byteBuffer.asCharBuffer().put(intput);
                                        selectionKey.attach(byteBuffer);
                                    }
                                } else if (selectionKey.isWritable()) {
                                    SocketChannel channel = (SocketChannel)selectionKey.channel();
                                    channel.write((ByteBuffer) selectionKey.attachment());
                                    byteBuffer.clear();
                                    selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                                } else if (selectionKey.isReadable()) {
                                    SocketChannel channel = (SocketChannel)selectionKey.channel();
                                    StringBuilder serverMessage = new StringBuilder("From socket server: ");
                                    int count = channel.read(byteBuffer);
                                    if (-1 != count) {
                                        byteBuffer.flip();
                                        while (byteBuffer.hasRemaining()) {
                                            serverMessage.append(byteBuffer.getChar());
                                        }
                                        System.out.println(serverMessage.toString());
                                    }
                                    isNotGetResponse = false;
                                    socketChannel.shutdownOutput();
                                    socketChannel.finishConnect();
                                }
                                iterator.remove();
                            }
                        }
    
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Pls type here: ");
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71

    1.7 Java NIO 基于多路复用的服务端,客户端通信(非阻塞,多线程)

    package com.qupeng.io.nio.multiplex.multithread;
    
    import com.qupeng.concurrent.thread.MyThreadFactory;
    import com.qupeng.concurrent.thread.ThreadUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class SocketServer {
        public static void main(String[] args) {
            try (ServerSocketChannel serverSocketChannel = (ServerSocketChannel)ServerSocketChannel.open().bind(new InetSocketAddress(8080)).configureBlocking(false);
                 Selector selector = Selector.open()) {
                System.out.println("Server started.");
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2, new MyThreadFactory());
                while (true) {
                    ThreadUtils.printThreadMessage("Waiting for client connecting ...");
                    selector.select();
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        if (selectionKey.isAcceptable()) {
                            SocketChannel channel = (SocketChannel)((ServerSocketChannel)selectionKey.channel()).accept().configureBlocking(false);
                            channel.configureBlocking(false);
                            channel.register(selector,SelectionKey.OP_READ);
                        } else if (selectionKey.isReadable()) {
                            selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ);
                            selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
                            threadPoolExecutor.execute(new ReadRunnable(selectionKey));
                        } else if (selectionKey.isWritable()) {
                            selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
                            threadPoolExecutor.execute(new WriteRunnable(selectionKey));
                        }
                        iterator.remove();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    class ReadRunnable implements Runnable {
    
        private SelectionKey selectionKey;
        private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    
        public ReadRunnable (SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
        }
    
        @Override
        public void run() {
            SocketChannel channel = (SocketChannel)selectionKey.channel();
            int readCount = 0;
            try {
                readCount = channel.read(byteBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            byteBuffer.flip();
            StringBuilder clientMessage = new StringBuilder();
            if (-1 != readCount) {
                while (byteBuffer.hasRemaining()) {
                    clientMessage.append(byteBuffer.getChar());
                }
                ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
                selectionKey.attach(clientMessage.toString().trim());
                selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
            }
        }
    }
    
    class WriteRunnable implements Runnable {
    
        private SelectionKey selectionKey;
    
        public WriteRunnable (SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
        }
        @Override
        public void run() {
            SocketChannel channel = (SocketChannel)selectionKey.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            ThreadUtils.printThreadMessage(String.valueOf(selectionKey.attachment()));
            byteBuffer.asCharBuffer().put("Hello client. Got your message: " + selectionKey.attachment());
            try {
                channel.write(byteBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            byteBuffer.clear();
            selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    package com.qupeng.io.nio.multiplex.multithread;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    
    public class SocketClient {
        public static void main(String[] args) {
            SocketClient.run();
        }
    
        public static void run () {
            try (Scanner scanner = new Scanner(System.in)) {
                System.out.println("Client started.");
                System.out.println("Pls type here: ");
                while (scanner.hasNextLine()) {
                    String intput = scanner.nextLine();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    try (SocketChannel socketChannel = (SocketChannel)SocketChannel.open().configureBlocking(false);
                         Selector selector = Selector.open()) {
                        socketChannel.register(selector, SelectionKey.OP_CONNECT);
                        socketChannel.connect(new InetSocketAddress("localhost", 8080));
    
                        boolean isNotGetResponse = true;
                        while (isNotGetResponse) {
                            int channelCount = selector.select();
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> iterator = selectionKeys.iterator();
                            while (iterator.hasNext()) {
                                SelectionKey selectionKey = iterator.next();
                                if (selectionKey.isConnectable()) {
                                    SocketChannel channel = (SocketChannel)selectionKey.channel();
                                    if (channel.finishConnect()) {
                                        selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
                                        byteBuffer.asCharBuffer().put(intput);
                                        selectionKey.attach(byteBuffer);
                                    }
                                } else if (selectionKey.isWritable()) {
                                    SocketChannel channel = (SocketChannel)selectionKey.channel();
                                    channel.write((ByteBuffer) selectionKey.attachment());
                                    byteBuffer.clear();
                                    selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE | SelectionKey.OP_READ);
                                } else if (selectionKey.isReadable()) {
                                    SocketChannel channel = (SocketChannel)selectionKey.channel();
                                    StringBuilder serverMessage = new StringBuilder("From socket server: ");
                                    int count = channel.read(byteBuffer);
                                    if (-1 != count) {
                                        byteBuffer.flip();
                                        while (byteBuffer.hasRemaining()) {
                                            serverMessage.append(byteBuffer.getChar());
                                        }
                                        System.out.println(serverMessage.toString());
                                    }
                                    isNotGetResponse = false;
                                    socketChannel.shutdownOutput();
                                    socketChannel.finishConnect();
                                }
                                iterator.remove();
                            }
                        }
    
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    System.out.println("Pls type here: ");
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    package com.qupeng.io.nio.multiplex.multithread;
    
    public class SocketClient1 {
        public static void main(String[] args) {
            SocketClient.run();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    package com.qupeng.concurrent.thread;
    
    import java.util.Arrays;
    
    public class ThreadUtils {
        public static void printThreadMessage(String message) {
            String threadName = Thread.currentThread().getName();
            try {
                int threadNumber = Integer.valueOf(threadName.substring(threadName.lastIndexOf('-') + 1));
                System.out.println(String.format("%" + threadNumber * 60 +"s%s", " ", message));
            } catch (NumberFormatException exception) {
                System.out.println(message);
            }
    
        }
    
        public static void printThreadInfo() {
            Thread thread = Thread.currentThread();
            printThreadMessage("Thread " + thread.getName() + " is running.");
            printThreadMessage("Thread group: " + thread.getThreadGroup().getName());
            printThreadMessage("Group active count: " + thread.getThreadGroup().activeCount());
            printThreadMessage("Priority: " + thread.getPriority());
            printThreadMessage("Id: " + thread.getId());
            printThreadMessage("State: " + thread.getState());
            printThreadMessage("Stack trace: " + thread.getStackTrace().toString());
            printThreadMessage("Active count: " + thread.activeCount());
            printThreadMessage("Is demon: " + thread.isDaemon());
            Object object = new Object();
            synchronized (object) {
                printThreadMessage("Holds lock: " + Thread.holdsLock(object));
            }
            printThreadMessage("Uncaught exception handler: " + thread.getUncaughtExceptionHandler());
            printThreadMessage("Context class loader: " + thread.getContextClassLoader());
            Thread[] tArray = new Thread[10];
            Thread.enumerate(tArray);
            printThreadMessage("enumerate: " + Arrays.toString(tArray));
            printThreadMessage("Is alive: " + thread.isAlive());
        }
        
        public static void printStartingMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s started ...", Thread.currentThread().getName()));
        }
    
        public static void printRunningMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s is running ...", Thread.currentThread().getName()));
        }
    
        public static void printInterruptedMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s is interrupted ...", Thread.currentThread().getName()));
        }
    
        public static void printEndingMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s ended ...", Thread.currentThread().getName()));
        }
    
        public static void printExecuingMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s is executing ...", Thread.currentThread().getName()));
        }
    
        public static void printCanceledMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s is canceled ...", Thread.currentThread().getName()));
        }
    
        public static void printWaitingMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s is waiting ...", Thread.currentThread().getName()));
        }
    
        public static void printNotifideMessage() {
            ThreadUtils.printThreadMessage(String.format("Thread %s is notified ...", Thread.currentThread().getName()));
        }
    
        public static void sleep(int seconds) {
            try {
                Thread.sleep(seconds * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79

    1.8 Java NIO 基于AIO的服务端,客户端通信(非阻塞)

    package com.qupeng.io.nio.asynchrous.singlethread.future;
    
    import com.qupeng.concurrent.thread.ThreadUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.concurrent.ExecutionException;
    
    public class SocketServer {
        public static void main(String[] args) throws InterruptedException {
            try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
                System.out.println("Server started.");
                serverSocketChannel.bind(new InetSocketAddress(8080));
                while (true) {
                    try (AsynchronousSocketChannel socketChannel = serverSocketChannel.accept().get()) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        Integer readCount =  socketChannel.read(byteBuffer).get();
                        if (-1 != readCount) {
                            StringBuilder clientMessage = new StringBuilder();
                            byteBuffer.flip();
                            while (byteBuffer.hasRemaining()) {
                                clientMessage.append(byteBuffer.getChar());
                            }
                            ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
    
                            byteBuffer.compact();
                            byteBuffer.asCharBuffer().put("Hello client. Got your message: " + clientMessage.toString().trim());
                            socketChannel.write(byteBuffer).get();
                            ThreadUtils.printThreadMessage("Sent message to client");
    
                            byteBuffer.clear();
                        }
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    package com.qupeng.io.nio.asynchrous.singlethread.future;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.util.Scanner;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class SocketClient {
        public static void main(String[] args) throws InterruptedException {
            try (Scanner scanner = new Scanner(System.in)) {
                System.out.println("Client started.");
                System.out.println("Please type here: ");
                while (scanner.hasNextLine()) {
                    String input = scanner.nextLine();
                    try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        socketChannel.connect(new InetSocketAddress("localhost", 8080)).get();
                        byteBuffer.asCharBuffer().put(input);
                        socketChannel.write(byteBuffer).get();
                        byteBuffer.clear();
    
                        Future<Integer> future = socketChannel.read(byteBuffer);
                        if (-1 != future.get()) {
                            StringBuilder serverMessage = new StringBuilder();
                            byteBuffer.flip();
                            while (byteBuffer.hasRemaining()) {
                                serverMessage.append(byteBuffer.getChar());
                            }
                            System.out.println("From socket server: " + serverMessage.toString());
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
    
                    System.out.println("Please type here: ");
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    1.9 Java NIO 基于AIO的服务端,客户端通信(阻塞)

    package com.qupeng.io.nio.asynchrous.singlethread.callback;
    
    import com.qupeng.concurrent.thread.ThreadUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    
    public class SocketServer {
        public static void main(String[] args) throws InterruptedException {
            try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
                System.out.println("Server started.");
                serverSocketChannel.bind(new InetSocketAddress(8080));
                final State state = new State();
                serverSocketChannel.accept("", new AsynchronousSocketChannelCompletionHandler(serverSocketChannel, state));
    
                while (true) {
                    switch (state.state) {
                        case "waiting":
                            System.out.println("Waiting for client connecting ...");
                            break;
                        case "connected":
                            System.out.println("Waiting for client message ...");
                            break;
                        case "reading":
                            System.out.println("Reading client message ...");
                            break;
                        case "sending":
                            System.out.println("Sending message to client ...");
                            break;
                        case "sent":
                            System.out.println("Sent finished ...");
                            break;
                    }
                    Thread.sleep(3000);
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        static class State {
            String state = "waiting";
        }
    
        private static class AsynchronousSocketChannelCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, String> {
            private final AsynchronousServerSocketChannel serverSocketChannel;
            private final State state;
            private final ByteBuffer byteBuffer;
    
            public AsynchronousSocketChannelCompletionHandler(AsynchronousServerSocketChannel serverSocketChannel, State state) {
                this.serverSocketChannel = serverSocketChannel;
                this.state = state;
                this.byteBuffer = ByteBuffer.allocate(1024);
            }
    
            @Override
            public void completed(AsynchronousSocketChannel socketChannel, String attachment) {
                state.state = "connected";
                socketChannel.read(byteBuffer, "", new ReadingCompletionHandler(serverSocketChannel, socketChannel, state, byteBuffer));
            }
    
            @Override
            public void failed(Throwable exc, String attachment) {
    
            }
        }
    
        private static class ReadingCompletionHandler implements CompletionHandler<Integer, String> {
            private final AsynchronousServerSocketChannel serverSocketChannel;
            private final AsynchronousSocketChannel socketChannel;
            private final State state;
            private final ByteBuffer byteBuffer;
    
            public ReadingCompletionHandler(AsynchronousServerSocketChannel serverSocketChannel, AsynchronousSocketChannel socketChannel, State state, ByteBuffer byteBuffer) {
                this.serverSocketChannel = serverSocketChannel;
                this.socketChannel = socketChannel;
                this.state = state;
                this.byteBuffer = byteBuffer;
            }
    
            @Override
            public void completed(Integer count, String attachment) {
                socketChannel.read(byteBuffer, "", new CompletionHandler<Integer, String>() {
                    @Override
                    public void completed(Integer result, String attachment) {
                        state.state = "reading";
                        if (-1 != count) {
                            StringBuilder clientMessage = new StringBuilder();
                            byteBuffer.flip();
                            while (byteBuffer.hasRemaining()) {
                                clientMessage.append(byteBuffer.getChar());
                            }
                            ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
    
                            state.state = "sending";
                            byteBuffer.compact();
                            byteBuffer.asCharBuffer().put("Hello client. Got your message: " + clientMessage.toString().trim());
                            socketChannel.write(byteBuffer, "", new CompletionHandler<Integer, String>() {
                                @Override
                                public void completed(Integer result, String attachment) {
                                    state.state = "sent";
                                    ThreadUtils.printThreadMessage("Sent message to client");
    
                                    state.state = "waiting";
                                    serverSocketChannel.accept("", new AsynchronousSocketChannelCompletionHandler(serverSocketChannel, state));
                                }
    
                                @Override
                                public void failed(Throwable exc, String attachment) {
    
                                }
                            });
                            byteBuffer.clear();
                        } else {
                            socketChannel.read(byteBuffer,"", new ReadingCompletionHandler(serverSocketChannel, socketChannel, state, byteBuffer));
                        }
                    }
    
                    @Override
                    public void failed(Throwable exc, String attachment) {
    
                    }
                });
            }
    
            @Override
            public void failed(Throwable exc, String attachment) {
    
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    package com.qupeng.io.nio.asynchrous.singlethread.callback;
    
    import com.qupeng.concurrent.thread.ThreadUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.CompletionHandler;
    import java.util.Scanner;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    public class SocketClient {
        public static void main(String[] args) throws InterruptedException {
            try (Scanner scanner = new Scanner(System.in)) {
                System.out.println("Client started.");
                System.out.println("Please type here: ");
                while (scanner.hasNextLine()) {
                    String input = scanner.nextLine();
                    try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        socketChannel.connect(new InetSocketAddress("localhost", 8080), "", new CompletionHandler<Void, String>() {
    
                            @Override
                            public void completed(Void result, String attachment) {
                            }
    
                            @Override
                            public void failed(Throwable exc, String attachment) {
    
                            }
                        });
                        ThreadUtils.sleep(1);
                        byteBuffer.asCharBuffer().put(input);
                        socketChannel.write(byteBuffer, "", new CompletionHandler<Integer, String>() {
                            @Override
                            public void completed(Integer result, String attachment) {
                            }
    
                            @Override
                            public void failed(Throwable exc, String attachment) {
    
                            }
                        });
    
                        byteBuffer.clear();
                        Future<Integer> future = socketChannel.read(byteBuffer);
                        if (-1 != future.get()) {
                            StringBuilder serverMessage = new StringBuilder();
                            byteBuffer.flip();
                            while (byteBuffer.hasRemaining()) {
                                serverMessage.append(byteBuffer.getChar());
                            }
                            System.out.println("From socket server: " + serverMessage.toString());
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        e.printStackTrace();
                    }
    
                    System.out.println("Please type here: ");
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    2 文件I/O

    2.1 使用FileChannel读写500MB文件

    package com.qupeng.io.nio.file;
    
    import java.io.IOException;
    import java.net.URI;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.file.*;
    import java.util.Arrays;
    import java.util.EnumSet;
    
    public class TestFileChannel {
    
        private static final int FILE_SIZE_MB = 0X100000; // 1 MB
    
        public static void main(String[] args) throws IOException {
            Path filePath = Paths.get(URI.create("file:/C:/Users/Administrator/IdeaProjects/java-demo/resources/test-file-channel.txt"));
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(FILE_SIZE_MB);
            crateLargeFile(filePath, byteBuffer);
            byteBuffer = ByteBuffer.allocate(FILE_SIZE_MB);
            crateLargeFile(filePath, byteBuffer);
            readLargeFile(filePath);
        }
    
        public static void crateLargeFile(Path filePath, ByteBuffer byteBuffer) throws IOException {
            System.out.println("Start to write a 500MB file.");
            if (Files.notExists(filePath)) {
                filePath = Files.createFile(filePath);
            }
    
            try (FileChannel fileChannel = FileChannel.open(filePath,
                    EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)
    //                , PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxrwx")) // Windows不支持
            )) {
                long startTime = System.currentTimeMillis();
                byte[] bytes = new byte[FILE_SIZE_MB];
                Arrays.fill(bytes, (byte)'1');
                for (int i = 0; i < 512; i++) {
                    byteBuffer.put(bytes);
                    byteBuffer.flip();
                    fileChannel.write(byteBuffer);
                    byteBuffer.clear();
                }
                long endTime = System.currentTimeMillis();
                System.out.println("Total time: " + (endTime - startTime) + "ms");
            }
        }
    
        public static void readLargeFile(Path filePath) throws IOException {
            System.out.println("Start to read a 500MB file.");
            try(FileChannel fileChannel = FileChannel.open(filePath,
                    EnumSet.of(StandardOpenOption.READ))) {
                long startTime = System.currentTimeMillis();
                ByteBuffer byteBuffer = ByteBuffer.allocate(FILE_SIZE_MB);
                int count = fileChannel.read(byteBuffer);
                byte[] bytes = new byte[512];
                int i = 0;
                while (-1 != count) {
                    byteBuffer.flip();
                    byteBuffer.get(bytes);
                    byteBuffer.clear();
                    count = fileChannel.read(byteBuffer);
                }
                long endTime = System.currentTimeMillis();
                System.out.println("Total time: " + (endTime - startTime) + "ms");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    输出为:

    Start to write a 500MB file.
    Total time: 5114ms
    Start to write a 500MB file.
    Total time: 6482ms
    Start to read a 500MB file.
    Total time: 1032ms
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    2.1 使用MappedByteBuffer读写500MB文件

    package com.qupeng.io.nio.file;
    
    import java.io.IOException;
    import java.net.URI;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.file.*;
    import java.util.Arrays;
    import java.util.EnumSet;
    import java.util.stream.Collectors;
    
    public class TestMappedByteBuffer {
        private static final int FILE_SIZE_MB = 0X100000; // 1 MB
        private static final int FILE_SIZE_GB = 0X20000000; // 500 MB
    
        public static void main(String[] args) throws IOException {
            Path filePath = Paths.get(URI.create("file:/C:/Users/Administrator/IdeaProjects/java-demo/resources/test-file-channel.txt"));
            crateLargeFile(filePath);
            readLargeFile(filePath);
        }
        public static void readLargeFile(Path filePath) throws IOException {
            System.out.println("Start to read a 500MB file.");
            try(FileChannel fileChannel = FileChannel.open(filePath,
                    EnumSet.of(StandardOpenOption.READ))) {
                long startTime = System.currentTimeMillis();
                MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, FILE_SIZE_GB);
                byte[] bytes = new byte[FILE_SIZE_MB];
                for (int i = 0; i < 512; i++) {
                    mappedByteBuffer.get(bytes);
                }
                long endTime = System.currentTimeMillis();
                System.out.println("Total time: " + (endTime - startTime) + "ms");
            }
        }
    
        public static void crateLargeFile(Path filePath) throws IOException {
            System.out.println("Start to write a 500MB file.");
            if (Files.notExists(filePath)) {
                filePath = Files.createFile(filePath);
            }
            try (FileChannel fileChannel = FileChannel.open(filePath,
                    Arrays.stream(new OpenOption[]{StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING}).collect(Collectors.toSet())
            )) {
                byte[] bytes = new byte[1024 * 1024];
                Arrays.fill(bytes, (byte)'x');
                long startTime = System.currentTimeMillis();
                MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, FILE_SIZE_GB);
                for (int i = 0; i < 512; i++) {
                    mappedByteBuffer.put(bytes);
                }
                long endTime = System.currentTimeMillis();
                System.out.println("Total time: " + (endTime - startTime) + "ms");
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    输出为:

    Start to write a 500MB file.
    Total time: 1406ms
    Start to read a 500MB file.
    Total time: 1519ms
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    P1025 [NOIP2001 提高组] 数的划分 题解
    linux 压缩webfile文件夹 webfile.tar.gz和webfile.tar的区别
    linux上部署python环境
    OpenCV | 直线拟合fitline函数(Python)
    few shot目标检测survey paper笔记(整体概念)
    【HMS Core】定位服务无法获取街道信息问题
    数据挖掘与分析应用:分类算法:k近邻KNN,决策树CART,贝叶斯,支持向量机SVM
    Redis实战篇(1)
    数据结构与算法(C语言版)P4---顺序表、链表总结
    MySQL 8.0数据库主从搭建和问题处理
  • 原文地址:https://blog.csdn.net/yunyun1886358/article/details/127780636