以下这些例子分别使用Java 中的BIO,NIO,AIO,实现了一个简单的客户端/服务端聊天系统。
package com.qupeng.io.bio.tcp.singlethread;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class SocketServer {
public static void main(String[] args) {
try (ServerSocket serverSocket = new ServerSocket(8080)) {
System.out.println("Server started.");
while (true) {
try (Socket socket = serverSocket.accept();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {
StringBuilder clientMessage = new StringBuilder();
bufferedReader.lines().forEach(content -> clientMessage.append(content));
System.out.println("From socket client: " + clientMessage.toString());
printWriter.write("Hello, socket client. Got your message: " + clientMessage.toString());
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.qupeng.io.bio.tcp.singlethread;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) {
System.out.println("Client start.");
System.out.println("Pls type here: ");
try (Scanner scanner = new Scanner(System.in)) {
while (scanner.hasNextLine()) {
try (Socket socket = new Socket("localhost", 8080);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());) {
String input = scanner.nextLine();
printWriter.write(input);
printWriter.flush();
socket.shutdownOutput();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
bufferedReader.lines().forEach(content -> System.out.println("From socket server: " + content));
socket.shutdownInput();
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Pls type here: ");
}
}
}
}
package com.qupeng.io.bio.tcp.multithread;
import com.qupeng.concurrent.thread.MyThreadFactory;
import com.qupeng.concurrent.thread.ThreadUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2, new MyThreadFactory());
try (ServerSocket serverSocket = new ServerSocket(8080)) {
System.out.println("Server started.");
threadPoolExecutor.execute(new ServerSocketRunnable(serverSocket));
threadPoolExecutor.execute(new ServerSocketRunnable(serverSocket));
while (true) {
Thread.sleep(5000);
ThreadUtils.printThreadMessage("Waiting for client connecting...");
}
} catch (IOException e) {
e.printStackTrace();
}
threadPoolExecutor.shutdown();
}
}
class ServerSocketRunnable implements Runnable {
ServerSocket serverSocket;
boolean interrupted = false;
public ServerSocketRunnable(ServerSocket serverSocket) {
this.serverSocket = serverSocket;
}
@Override
public void run() {
while (!interrupted) {
try (Socket socket = serverSocket.accept();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream())) {
ThreadUtils.printThreadMessage("Thread id: " + Thread.currentThread().getId());
StringBuilder clientMessage = new StringBuilder();
bufferedReader.lines().forEach(content -> clientMessage.append(content));
ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
printWriter.write("Hello, socket client. Got your message: " + clientMessage.toString());
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.qupeng.io.bio.tcp.multithread;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) {
SocketClient.run();
}
public static void run() {
System.out.println("Client start.");
System.out.println("Pls type here: ");
try (Scanner scanner = new Scanner(System.in)) {
while (scanner.hasNextLine()) {
try (Socket socket = new Socket("localhost", 8080);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());) {
String input = scanner.nextLine();
printWriter.write(input);
printWriter.flush();
socket.shutdownOutput();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()))) {
bufferedReader.lines().forEach(content -> System.out.println("From socket server: " + content));
socket.shutdownInput();
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Pls type here: ");
}
}
}
}
package com.qupeng.io.bio.tcp.multithread;
public class SocketClient1 {
public static void main(String[] args) {
SocketClient.run();
}
}
package com.qupeng.io.bio.udp;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;
public class SocketServer {
public static void main(String[] args) {
System.out.println("Server start.");
while (true) {
DatagramPacket datagramPacket = new DatagramPacket(new byte[1024], 1024);
try (DatagramSocket datagramSocket = new DatagramSocket(8088)) {
datagramSocket.receive(datagramPacket);
String message = new String(datagramPacket.getData());
System.out.println(message);
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
package com.qupeng.io.bio.udp;
import java.io.IOException;
import java.net.*;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) {
System.out.println("Client start.");
try (Scanner scanner = new Scanner(System.in)) {
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
try {
InetAddress inetAddress = InetAddress.getByName("localhost");
DatagramPacket datagramPacket = new DatagramPacket(input.getBytes(), input.getBytes().length, inetAddress, 8088);
try (DatagramSocket datagramSocket = new DatagramSocket()) {
datagramSocket.send(datagramPacket);
}
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (SocketException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
package com.qupeng.io.nio.blocking;
import com.qupeng.io.nio.Utils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class SocketServer {
public static void main(String[] args) {
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080))
) {
System.out.println("Server started.");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Utils.printBufferInfo(byteBuffer);
while (true) {
try (SocketChannel socketChannel = serverSocketChannel.accept()) {
int readCount = socketChannel.read(byteBuffer);
Utils.printBufferInfo(byteBuffer);
if (-1 != readCount) {
System.out.println("Got " + readCount + " bytes from server.");
StringBuilder clientMessage = new StringBuilder();
byteBuffer.flip();
Utils.printBufferInfo(byteBuffer);
while (byteBuffer.hasRemaining()) {
clientMessage.append(byteBuffer.getChar());
Utils.printBufferInfo(byteBuffer);
}
System.out.println("From socket client: " + clientMessage.toString());
}
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
byteBuffer.asCharBuffer().put("0123456789-0123456789-0123456789-0123456789");
while (byteBuffer.hasRemaining()) {
int writeCount = socketChannel.write(byteBuffer);
Utils.printBufferInfo(byteBuffer);
System.out.println("Sent " + writeCount + " bytes to client.");
}
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.qupeng.io.nio.blocking;
import com.qupeng.io.nio.Utils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) {
try (Scanner scanner = new Scanner(System.in)) {
System.out.println("Client started.");
System.out.println("Pls type here: ");
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
try (SocketChannel socketChannel = SocketChannel.open()) {
if (socketChannel.connect(new InetSocketAddress("localhost", 8080))) {
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
Utils.printBufferInfo(byteBuffer);
byteBuffer.asCharBuffer().put(input);
Utils.printBufferInfo(byteBuffer);
while (byteBuffer.hasRemaining()) {
int writeCount = socketChannel.write(byteBuffer);
Utils.printBufferInfo(byteBuffer);
System.out.println("Sent " + writeCount + " bytes to server.");
}
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
StringBuilder serverMessage = new StringBuilder();
int readCount = socketChannel.read(byteBuffer);
do {
if (-1 != readCount) {
System.out.println("Got " + readCount + " bytes from server.");
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
serverMessage.append(byteBuffer.getChar());
Utils.printBufferInfo(byteBuffer);
}
}
byteBuffer.clear();
} while (-1 != socketChannel.read(byteBuffer));
System.out.println("From socket server: " + serverMessage.toString());
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Pls type here: ");
}
}
}
}
package com.qupeng.io.nio;
import java.nio.ByteBuffer;
public class Utils {
static boolean isPrintBufferInfo = false;
public static void printBufferInfo(ByteBuffer byteBuffer) {
if (isPrintBufferInfo) {
System.out.println(String.format("ByteBuffer: {position: %d, limit: %d, capacity: %d}", byteBuffer.position(), byteBuffer.limit(), byteBuffer.capacity()));
}
}
}
package com.qupeng.io.nio.nonblocking;
import com.qupeng.io.nio.Utils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open().bind(new InetSocketAddress(8080))
) {
serverSocketChannel.configureBlocking(false);
System.out.println("Server started.");
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Utils.printBufferInfo(byteBuffer);
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
while (null == socketChannel) {
System.out.println("Waiting for client connecting ...");
Thread.sleep(1000);
socketChannel = serverSocketChannel.accept();
}
int readCount = socketChannel.read(byteBuffer);
Utils.printBufferInfo(byteBuffer);
if (-1 != readCount) {
System.out.println("Got " + readCount + " bytes from client.");
StringBuilder clientMessage = new StringBuilder();
byteBuffer.flip();
Utils.printBufferInfo(byteBuffer);
while (byteBuffer.hasRemaining()) {
clientMessage.append(byteBuffer.getChar());
Utils.printBufferInfo(byteBuffer);
}
System.out.println("From socket client: " + clientMessage.toString());
}
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
byteBuffer.asCharBuffer().put("0123456789-0123456789-0123456789-0123456789");
while (byteBuffer.hasRemaining()) {
int writeCount = socketChannel.write(byteBuffer);
Utils.printBufferInfo(byteBuffer);
System.out.println("Sent " + writeCount + " bytes to client.");
}
socketChannel.shutdownOutput();
socketChannel.finishConnect();
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.qupeng.io.nio.nonblocking;
import com.qupeng.io.nio.Utils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
try (Scanner scanner = new Scanner(System.in)) {
System.out.println("Client started.");
System.out.println("Pls type here: ");
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
try (SocketChannel socketChannel = SocketChannel.open()) {
socketChannel.configureBlocking(false);
boolean connectingResult = socketChannel.connect(new InetSocketAddress("localhost", 8080));
while (!socketChannel.finishConnect()) {
System.out.println("Connection to server, pls wait ...");
}
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
Utils.printBufferInfo(byteBuffer);
byteBuffer.asCharBuffer().put(input);
Utils.printBufferInfo(byteBuffer);
while (byteBuffer.hasRemaining()) {
int writeCount = socketChannel.write(byteBuffer);
Utils.printBufferInfo(byteBuffer);
System.out.println("Sent " + writeCount + " bytes to server.");
}
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
StringBuilder serverMessage = new StringBuilder();
int readCount = socketChannel.read(byteBuffer);
while (readCount <= 0) {
System.out.println("Waiting for data from server ...");
Thread.sleep(500);
readCount = socketChannel.read(byteBuffer);
Utils.printBufferInfo(byteBuffer);
}
do {
if (-1 != readCount) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
serverMessage.append(byteBuffer.getChar());
Utils.printBufferInfo(byteBuffer);
}
}
byteBuffer.clear();
} while (-1 != socketChannel.read(byteBuffer));
System.out.println("From socket server: " + serverMessage.toString());
byteBuffer.clear();
Utils.printBufferInfo(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Pls type here: ");
}
}
}
}
package com.qupeng.io.nio.multiplex.singlethread;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
public class SocketServer {
public static void main(String[] args) {
try (ServerSocketChannel serverSocketChannel = (ServerSocketChannel)ServerSocketChannel.open().bind(new InetSocketAddress(8080)).configureBlocking(false);
Selector selector = Selector.open()) {
System.out.println("Server started.");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
SocketChannel channel = (SocketChannel)((ServerSocketChannel)selectionKey.channel()).accept().configureBlocking(false);
channel.configureBlocking(false);
channel.register(selector,SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
int readCount = channel.read(byteBuffer);
byteBuffer.flip();
StringBuilder clientMessage = new StringBuilder();
if (-1 != readCount) {
while (byteBuffer.hasRemaining()) {
clientMessage.append(byteBuffer.getChar());
}
System.out.println("From socket client: " + clientMessage);
selectionKey.attach(clientMessage.toString().trim());
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
}
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ);
} else if (selectionKey.isWritable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
byteBuffer.compact();
byteBuffer.asCharBuffer().put("Hello client. Got your message: " + selectionKey.attachment());
channel.write(byteBuffer);
byteBuffer.clear();
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.qupeng.io.nio.multiplex.singlethread;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class SocketClient {
public static void main(String[] args) {
try (Scanner scanner = new Scanner(System.in)) {
System.out.println("Client started.");
System.out.println("Pls type here: ");
while (scanner.hasNextLine()) {
String intput = scanner.nextLine();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try (SocketChannel socketChannel = (SocketChannel)SocketChannel.open().configureBlocking(false);
Selector selector = Selector.open()) {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("localhost", 8080));
boolean isNotGetResponse = true;
while (isNotGetResponse) {
int channelCount = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
if (channel.finishConnect()) {
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
byteBuffer.asCharBuffer().put(intput);
selectionKey.attach(byteBuffer);
}
} else if (selectionKey.isWritable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
channel.write((ByteBuffer) selectionKey.attachment());
byteBuffer.clear();
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE | SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
StringBuilder serverMessage = new StringBuilder("From socket server: ");
int count = channel.read(byteBuffer);
if (-1 != count) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
serverMessage.append(byteBuffer.getChar());
}
System.out.println(serverMessage.toString());
}
isNotGetResponse = false;
socketChannel.shutdownOutput();
socketChannel.finishConnect();
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Pls type here: ");
}
}
}
}
package com.qupeng.io.nio.multiplex.multithread;
import com.qupeng.concurrent.thread.MyThreadFactory;
import com.qupeng.concurrent.thread.ThreadUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SocketServer {
public static void main(String[] args) {
try (ServerSocketChannel serverSocketChannel = (ServerSocketChannel)ServerSocketChannel.open().bind(new InetSocketAddress(8080)).configureBlocking(false);
Selector selector = Selector.open()) {
System.out.println("Server started.");
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(2, new MyThreadFactory());
while (true) {
ThreadUtils.printThreadMessage("Waiting for client connecting ...");
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
SocketChannel channel = (SocketChannel)((ServerSocketChannel)selectionKey.channel()).accept().configureBlocking(false);
channel.configureBlocking(false);
channel.register(selector,SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_READ);
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
threadPoolExecutor.execute(new ReadRunnable(selectionKey));
} else if (selectionKey.isWritable()) {
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
threadPoolExecutor.execute(new WriteRunnable(selectionKey));
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ReadRunnable implements Runnable {
private SelectionKey selectionKey;
private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
public ReadRunnable (SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
SocketChannel channel = (SocketChannel)selectionKey.channel();
int readCount = 0;
try {
readCount = channel.read(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
byteBuffer.flip();
StringBuilder clientMessage = new StringBuilder();
if (-1 != readCount) {
while (byteBuffer.hasRemaining()) {
clientMessage.append(byteBuffer.getChar());
}
ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
selectionKey.attach(clientMessage.toString().trim());
selectionKey.interestOps(selectionKey.interestOps() | SelectionKey.OP_WRITE);
}
}
}
class WriteRunnable implements Runnable {
private SelectionKey selectionKey;
public WriteRunnable (SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
@Override
public void run() {
SocketChannel channel = (SocketChannel)selectionKey.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
ThreadUtils.printThreadMessage(String.valueOf(selectionKey.attachment()));
byteBuffer.asCharBuffer().put("Hello client. Got your message: " + selectionKey.attachment());
try {
channel.write(byteBuffer);
} catch (IOException e) {
e.printStackTrace();
}
byteBuffer.clear();
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE);
}
}
package com.qupeng.io.nio.multiplex.multithread;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class SocketClient {
public static void main(String[] args) {
SocketClient.run();
}
public static void run () {
try (Scanner scanner = new Scanner(System.in)) {
System.out.println("Client started.");
System.out.println("Pls type here: ");
while (scanner.hasNextLine()) {
String intput = scanner.nextLine();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
try (SocketChannel socketChannel = (SocketChannel)SocketChannel.open().configureBlocking(false);
Selector selector = Selector.open()) {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("localhost", 8080));
boolean isNotGetResponse = true;
while (isNotGetResponse) {
int channelCount = selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
if (channel.finishConnect()) {
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_WRITE);
byteBuffer.asCharBuffer().put(intput);
selectionKey.attach(byteBuffer);
}
} else if (selectionKey.isWritable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
channel.write((ByteBuffer) selectionKey.attachment());
byteBuffer.clear();
selectionKey.interestOps(selectionKey.interestOps() & ~SelectionKey.OP_WRITE | SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel)selectionKey.channel();
StringBuilder serverMessage = new StringBuilder("From socket server: ");
int count = channel.read(byteBuffer);
if (-1 != count) {
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
serverMessage.append(byteBuffer.getChar());
}
System.out.println(serverMessage.toString());
}
isNotGetResponse = false;
socketChannel.shutdownOutput();
socketChannel.finishConnect();
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Pls type here: ");
}
}
}
}
package com.qupeng.io.nio.multiplex.multithread;
public class SocketClient1 {
public static void main(String[] args) {
SocketClient.run();
}
}
package com.qupeng.concurrent.thread;
import java.util.Arrays;
public class ThreadUtils {
public static void printThreadMessage(String message) {
String threadName = Thread.currentThread().getName();
try {
int threadNumber = Integer.valueOf(threadName.substring(threadName.lastIndexOf('-') + 1));
System.out.println(String.format("%" + threadNumber * 60 +"s%s", " ", message));
} catch (NumberFormatException exception) {
System.out.println(message);
}
}
public static void printThreadInfo() {
Thread thread = Thread.currentThread();
printThreadMessage("Thread " + thread.getName() + " is running.");
printThreadMessage("Thread group: " + thread.getThreadGroup().getName());
printThreadMessage("Group active count: " + thread.getThreadGroup().activeCount());
printThreadMessage("Priority: " + thread.getPriority());
printThreadMessage("Id: " + thread.getId());
printThreadMessage("State: " + thread.getState());
printThreadMessage("Stack trace: " + thread.getStackTrace().toString());
printThreadMessage("Active count: " + thread.activeCount());
printThreadMessage("Is demon: " + thread.isDaemon());
Object object = new Object();
synchronized (object) {
printThreadMessage("Holds lock: " + Thread.holdsLock(object));
}
printThreadMessage("Uncaught exception handler: " + thread.getUncaughtExceptionHandler());
printThreadMessage("Context class loader: " + thread.getContextClassLoader());
Thread[] tArray = new Thread[10];
Thread.enumerate(tArray);
printThreadMessage("enumerate: " + Arrays.toString(tArray));
printThreadMessage("Is alive: " + thread.isAlive());
}
public static void printStartingMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s started ...", Thread.currentThread().getName()));
}
public static void printRunningMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s is running ...", Thread.currentThread().getName()));
}
public static void printInterruptedMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s is interrupted ...", Thread.currentThread().getName()));
}
public static void printEndingMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s ended ...", Thread.currentThread().getName()));
}
public static void printExecuingMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s is executing ...", Thread.currentThread().getName()));
}
public static void printCanceledMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s is canceled ...", Thread.currentThread().getName()));
}
public static void printWaitingMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s is waiting ...", Thread.currentThread().getName()));
}
public static void printNotifideMessage() {
ThreadUtils.printThreadMessage(String.format("Thread %s is notified ...", Thread.currentThread().getName()));
}
public static void sleep(int seconds) {
try {
Thread.sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
package com.qupeng.io.nio.asynchrous.singlethread.future;
import com.qupeng.concurrent.thread.ThreadUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
System.out.println("Server started.");
serverSocketChannel.bind(new InetSocketAddress(8080));
while (true) {
try (AsynchronousSocketChannel socketChannel = serverSocketChannel.accept().get()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
Integer readCount = socketChannel.read(byteBuffer).get();
if (-1 != readCount) {
StringBuilder clientMessage = new StringBuilder();
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
clientMessage.append(byteBuffer.getChar());
}
ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
byteBuffer.compact();
byteBuffer.asCharBuffer().put("Hello client. Got your message: " + clientMessage.toString().trim());
socketChannel.write(byteBuffer).get();
ThreadUtils.printThreadMessage("Sent message to client");
byteBuffer.clear();
}
} catch (ExecutionException e) {
e.printStackTrace();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.qupeng.io.nio.asynchrous.singlethread.future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
try (Scanner scanner = new Scanner(System.in)) {
System.out.println("Client started.");
System.out.println("Please type here: ");
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.connect(new InetSocketAddress("localhost", 8080)).get();
byteBuffer.asCharBuffer().put(input);
socketChannel.write(byteBuffer).get();
byteBuffer.clear();
Future<Integer> future = socketChannel.read(byteBuffer);
if (-1 != future.get()) {
StringBuilder serverMessage = new StringBuilder();
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
serverMessage.append(byteBuffer.getChar());
}
System.out.println("From socket server: " + serverMessage.toString());
}
} catch (IOException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("Please type here: ");
}
}
}
}
package com.qupeng.io.nio.asynchrous.singlethread.callback;
import com.qupeng.concurrent.thread.ThreadUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class SocketServer {
public static void main(String[] args) throws InterruptedException {
try (AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open()) {
System.out.println("Server started.");
serverSocketChannel.bind(new InetSocketAddress(8080));
final State state = new State();
serverSocketChannel.accept("", new AsynchronousSocketChannelCompletionHandler(serverSocketChannel, state));
while (true) {
switch (state.state) {
case "waiting":
System.out.println("Waiting for client connecting ...");
break;
case "connected":
System.out.println("Waiting for client message ...");
break;
case "reading":
System.out.println("Reading client message ...");
break;
case "sending":
System.out.println("Sending message to client ...");
break;
case "sent":
System.out.println("Sent finished ...");
break;
}
Thread.sleep(3000);
}
} catch (IOException e) {
e.printStackTrace();
}
}
static class State {
String state = "waiting";
}
private static class AsynchronousSocketChannelCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, String> {
private final AsynchronousServerSocketChannel serverSocketChannel;
private final State state;
private final ByteBuffer byteBuffer;
public AsynchronousSocketChannelCompletionHandler(AsynchronousServerSocketChannel serverSocketChannel, State state) {
this.serverSocketChannel = serverSocketChannel;
this.state = state;
this.byteBuffer = ByteBuffer.allocate(1024);
}
@Override
public void completed(AsynchronousSocketChannel socketChannel, String attachment) {
state.state = "connected";
socketChannel.read(byteBuffer, "", new ReadingCompletionHandler(serverSocketChannel, socketChannel, state, byteBuffer));
}
@Override
public void failed(Throwable exc, String attachment) {
}
}
private static class ReadingCompletionHandler implements CompletionHandler<Integer, String> {
private final AsynchronousServerSocketChannel serverSocketChannel;
private final AsynchronousSocketChannel socketChannel;
private final State state;
private final ByteBuffer byteBuffer;
public ReadingCompletionHandler(AsynchronousServerSocketChannel serverSocketChannel, AsynchronousSocketChannel socketChannel, State state, ByteBuffer byteBuffer) {
this.serverSocketChannel = serverSocketChannel;
this.socketChannel = socketChannel;
this.state = state;
this.byteBuffer = byteBuffer;
}
@Override
public void completed(Integer count, String attachment) {
socketChannel.read(byteBuffer, "", new CompletionHandler<Integer, String>() {
@Override
public void completed(Integer result, String attachment) {
state.state = "reading";
if (-1 != count) {
StringBuilder clientMessage = new StringBuilder();
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
clientMessage.append(byteBuffer.getChar());
}
ThreadUtils.printThreadMessage("From socket client: " + clientMessage.toString());
state.state = "sending";
byteBuffer.compact();
byteBuffer.asCharBuffer().put("Hello client. Got your message: " + clientMessage.toString().trim());
socketChannel.write(byteBuffer, "", new CompletionHandler<Integer, String>() {
@Override
public void completed(Integer result, String attachment) {
state.state = "sent";
ThreadUtils.printThreadMessage("Sent message to client");
state.state = "waiting";
serverSocketChannel.accept("", new AsynchronousSocketChannelCompletionHandler(serverSocketChannel, state));
}
@Override
public void failed(Throwable exc, String attachment) {
}
});
byteBuffer.clear();
} else {
socketChannel.read(byteBuffer,"", new ReadingCompletionHandler(serverSocketChannel, socketChannel, state, byteBuffer));
}
}
@Override
public void failed(Throwable exc, String attachment) {
}
});
}
@Override
public void failed(Throwable exc, String attachment) {
}
}
}
package com.qupeng.io.nio.asynchrous.singlethread.callback;
import com.qupeng.concurrent.thread.ThreadUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SocketClient {
public static void main(String[] args) throws InterruptedException {
try (Scanner scanner = new Scanner(System.in)) {
System.out.println("Client started.");
System.out.println("Please type here: ");
while (scanner.hasNextLine()) {
String input = scanner.nextLine();
try (AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open()) {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
socketChannel.connect(new InetSocketAddress("localhost", 8080), "", new CompletionHandler<Void, String>() {
@Override
public void completed(Void result, String attachment) {
}
@Override
public void failed(Throwable exc, String attachment) {
}
});
ThreadUtils.sleep(1);
byteBuffer.asCharBuffer().put(input);
socketChannel.write(byteBuffer, "", new CompletionHandler<Integer, String>() {
@Override
public void completed(Integer result, String attachment) {
}
@Override
public void failed(Throwable exc, String attachment) {
}
});
byteBuffer.clear();
Future<Integer> future = socketChannel.read(byteBuffer);
if (-1 != future.get()) {
StringBuilder serverMessage = new StringBuilder();
byteBuffer.flip();
while (byteBuffer.hasRemaining()) {
serverMessage.append(byteBuffer.getChar());
}
System.out.println("From socket server: " + serverMessage.toString());
}
} catch (IOException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("Please type here: ");
}
}
}
}
package com.qupeng.io.nio.file;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.util.Arrays;
import java.util.EnumSet;
public class TestFileChannel {
private static final int FILE_SIZE_MB = 0X100000; // 1 MB
public static void main(String[] args) throws IOException {
Path filePath = Paths.get(URI.create("file:/C:/Users/Administrator/IdeaProjects/java-demo/resources/test-file-channel.txt"));
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(FILE_SIZE_MB);
crateLargeFile(filePath, byteBuffer);
byteBuffer = ByteBuffer.allocate(FILE_SIZE_MB);
crateLargeFile(filePath, byteBuffer);
readLargeFile(filePath);
}
public static void crateLargeFile(Path filePath, ByteBuffer byteBuffer) throws IOException {
System.out.println("Start to write a 500MB file.");
if (Files.notExists(filePath)) {
filePath = Files.createFile(filePath);
}
try (FileChannel fileChannel = FileChannel.open(filePath,
EnumSet.of(StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING)
// , PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxrwx")) // Windows不支持
)) {
long startTime = System.currentTimeMillis();
byte[] bytes = new byte[FILE_SIZE_MB];
Arrays.fill(bytes, (byte)'1');
for (int i = 0; i < 512; i++) {
byteBuffer.put(bytes);
byteBuffer.flip();
fileChannel.write(byteBuffer);
byteBuffer.clear();
}
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + "ms");
}
}
public static void readLargeFile(Path filePath) throws IOException {
System.out.println("Start to read a 500MB file.");
try(FileChannel fileChannel = FileChannel.open(filePath,
EnumSet.of(StandardOpenOption.READ))) {
long startTime = System.currentTimeMillis();
ByteBuffer byteBuffer = ByteBuffer.allocate(FILE_SIZE_MB);
int count = fileChannel.read(byteBuffer);
byte[] bytes = new byte[512];
int i = 0;
while (-1 != count) {
byteBuffer.flip();
byteBuffer.get(bytes);
byteBuffer.clear();
count = fileChannel.read(byteBuffer);
}
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + "ms");
}
}
}
输出为:
Start to write a 500MB file.
Total time: 5114ms
Start to write a 500MB file.
Total time: 6482ms
Start to read a 500MB file.
Total time: 1032ms
package com.qupeng.io.nio.file;
import java.io.IOException;
import java.net.URI;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.stream.Collectors;
public class TestMappedByteBuffer {
private static final int FILE_SIZE_MB = 0X100000; // 1 MB
private static final int FILE_SIZE_GB = 0X20000000; // 500 MB
public static void main(String[] args) throws IOException {
Path filePath = Paths.get(URI.create("file:/C:/Users/Administrator/IdeaProjects/java-demo/resources/test-file-channel.txt"));
crateLargeFile(filePath);
readLargeFile(filePath);
}
public static void readLargeFile(Path filePath) throws IOException {
System.out.println("Start to read a 500MB file.");
try(FileChannel fileChannel = FileChannel.open(filePath,
EnumSet.of(StandardOpenOption.READ))) {
long startTime = System.currentTimeMillis();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, FILE_SIZE_GB);
byte[] bytes = new byte[FILE_SIZE_MB];
for (int i = 0; i < 512; i++) {
mappedByteBuffer.get(bytes);
}
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + "ms");
}
}
public static void crateLargeFile(Path filePath) throws IOException {
System.out.println("Start to write a 500MB file.");
if (Files.notExists(filePath)) {
filePath = Files.createFile(filePath);
}
try (FileChannel fileChannel = FileChannel.open(filePath,
Arrays.stream(new OpenOption[]{StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING}).collect(Collectors.toSet())
)) {
byte[] bytes = new byte[1024 * 1024];
Arrays.fill(bytes, (byte)'x');
long startTime = System.currentTimeMillis();
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, FILE_SIZE_GB);
for (int i = 0; i < 512; i++) {
mappedByteBuffer.put(bytes);
}
long endTime = System.currentTimeMillis();
System.out.println("Total time: " + (endTime - startTime) + "ms");
}
}
}
输出为:
Start to write a 500MB file.
Total time: 1406ms
Start to read a 500MB file.
Total time: 1519ms