Java BIO:同步阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,对每个客户端的连接服务器都会新建一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。
Java BIO模型示意图:
Java NIO:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),服务器将客户端的连接注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理。
Java NIO模型示意图:
Java AIO:异步非阻塞,AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长的应用。
| BIO | NIO | AIO | |
|---|---|---|---|
| IO模型 | 同步阻塞 | 同步非阻塞(多路复用) | 异步非阻塞 |
| 可靠性 | 低 | 高 | 高 |
| 吞吐量 | 低 | 高 | 高 |
| 编程难度 | 简单 | 复杂 | 复杂 |
package com.atguigu.bio;
import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author RedStar
* @date 2022/06/09 15:16
* @description
*/
public class BIOServer {
public static void main(String[] args) throws IOException {
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
// 创建ServerSocket
ServerSocket serverSocket = new ServerSocket(6666);
System.out.println("服务器启动");
while (true) {
// 监听等待客户端连接
final Socket socket = serverSocket.accept();
System.out.println("新客户端连接");
// 启动新线程
newCachedThreadPool.execute(new Runnable() {
@Override
public void run() {
handler(socket);
}
});
}
}
/**
* @author RedStar
* @date 2022/6/9 15:22
* @description socket的handler方法
*/
public static void handler(Socket socket) {
try {
byte[] bytes = new byte[1024];
// 通过socket获取输入流
InputStream inputStream = socket.getInputStream();
// 循环读取客户端发来的数据
while (true) {
int read = inputStream.read(bytes);
if (read != -1) {
System.out.println(new String(bytes, 0, read));
} else {
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("关闭socket连接");
try {
socket.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

缓冲区(Buffer):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个容器对象(含数组),该对象提供了一组方法,可以更轻松地使用内存块,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化。Channel提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer。
public abstract class Buffer {
// JDK1.4时,引入的api
public final int capacity( ); // 返回此缓冲区的容量
public final int position( ); // 返回此缓冲区的位置
public final Buffer position (int newPositio); // 设置此缓冲区的位置
public final int limit( ); // 返回此缓冲区的限制
public final Buffer limit (int newLimit); // 设置此缓冲区的限制
public final Buffer mark( ); // 在此缓冲区的位置设置标记
public final Buffer reset( ); // 将此缓冲区的位置重置为以前标记的位置
public final Buffer clear( ); // 清除此缓冲区, 即将各个标记恢复到初始状态,但是数据并没有真正擦除, 后面操作会覆盖
public final Buffer flip( ); // 反转此缓冲区
public final Buffer rewind( ); // 重绕此缓冲区
public final int remaining( ); // 返回当前位置与限制之间的元素数
public final boolean hasRemaining( ); // 告知在当前位置和限制之间是否有元素
public abstract boolean isReadOnly( ); // 告知此缓冲区是否为只读缓冲区
// JDK1.6时引入的api
public abstract boolean hasArray(); // 告知此缓冲区是否具有可访问的底层实现数组
public abstract Object array(); // 返回此缓冲区的底层实现数组
public abstract int arrayOffset(); // 返回此缓冲区的底层实现数组中第一个缓冲区元素的偏移量
public abstract boolean isDirect(); // 告知此缓冲区是否为直接缓冲区
}
Java的基本类型(除boolean)都有一个Buffer子类与之对应,最常用的是ByteBuffer类(二进制数据)
public abstract class ByteBuffer {
// 缓冲区创建相关api
public static ByteBuffer allocateDirect(int capacity) // 创建直接缓冲区
public static ByteBuffer allocate(int capacity) // 设置缓冲区的初始容量
public static ByteBuffer wrap(byte[] array) // 把一个数组放到缓冲区中使用
// 构造初始化位置offset和上界length的缓冲区
public static ByteBuffer wrap(byte[] array,int offset, int length)
// 缓存区存取相关API
public abstract byte get( ); // 从当前位置position上get,get之后,position会自动+1
public abstract byte get (int index); // 从绝对位置get
public abstract ByteBuffer put (byte b); // 从当前位置上添加,put之后,position会自动+1
public abstract ByteBuffer put (int index, byte b); // 从绝对位置上put
}
FileChannel 主要用来对本地文件进行 IO 操作
public int read(ByteBuffer dst); // 从通道读取数据并放到缓冲区中
public int write(ByteBuffer src); // 把缓冲区的数据写到通道中
public long transferFrom(ReadableByteChannel src, long position, long count); // 从目标通道中复制数据到当前通道
public long transferTo(long position, long count, WritableByteChannel target); // 把数据从当前通道复制给目标通道
package com.atguigu.channel;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @author RedStar
* @date 2022/06/10 16:27
* @description
*/
public class FileChannelRead {
public static void main(String[] args) throws IOException {
// 创建文件的输入流
File file = new File("d:\\file.txt");
FileInputStream fileInputStream = new FileInputStream(file);
// 获取流对象的channel
FileChannel channel = fileInputStream.getChannel();
// 创建buffer对象
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 将channel中的数据读到buffer
channel.read(byteBuffer);
// 将buffer中的字节数据转化为string并输出
System.out.println(new String(byteBuffer.array()));
// 关闭流对象
fileInputStream.close();
}
}
package com.atguigu.channel;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
/**
* @author RedStar
* @date 2022/06/10 16:12
* @description
*/
public class FileChannelWrite {
public static void main(String[] args) throws IOException {
String str = "hello world";
// 新建文件输出流对象
FileOutputStream fileOutputStream = new FileOutputStream("d:\\file.txt");
// 获取流对象的channel
FileChannel channel = fileOutputStream.getChannel();
// 新建buffer、存入数据并读写反转
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(str.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
// 将buffer中的数据写到channel中
channel.write(byteBuffer);
// 关闭channel
fileOutputStream.close();
}
}
package com.atguigu.channel;
import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/**
* @author RedStar
* @date 2022/06/10 16:36
* @description
*/
public class FileChannelCopy {
public static void main(String[] args) throws IOException {
// 获取输入输出流对象
String filePathFrom = "d://file.txt";
String filePathTo = "d://file1.txt";
FileInputStream fileInputStream = new FileInputStream(filePathFrom);
FileOutputStream fileOutputStream = new FileOutputStream(filePathTo);
// 获取输入输出channel
FileChannel inputStreamChannel = fileInputStream.getChannel();
FileChannel outputStreamChannel = fileOutputStream.getChannel();
// 创建buffer
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while (true) {
// 清空buffer
byteBuffer.clear();
// read为本次读取的长度,-1为读取完毕
int read = inputStreamChannel.read(byteBuffer);
if (read == -1) {
break;
}
// buffer读写操作反转
byteBuffer.flip();
// 将buffer中的数据写入outStreamChannel
outputStreamChannel.write(byteBuffer);
}
// 关闭输入输出流
fileInputStream.close();
fileOutputStream.close();
}
}
package com.atguigu.channel;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
/**
* @author RedStar
* @date 2022/06/10 16:56
* @description
*/
public class FileChannelDirect {
public static void main(String[] args) throws IOException {
// 获取输入输出流对象
String filePathFrom = "d://file.txt";
String filePathTo = "d://file1.txt";
FileInputStream fileInputStream = new FileInputStream(filePathFrom);
FileOutputStream fileOutputStream = new FileOutputStream(filePathTo);
// 获取输入输出channel
FileChannel inputStreamChannel = fileInputStream.getChannel();
FileChannel outputStreamChannel = fileOutputStream.getChannel();
// 使用transferFrom完成拷贝
outputStreamChannel.transferFrom(inputStreamChannel, 0, inputStreamChannel.size());
// 关闭流对象
fileInputStream.close();
fileOutputStream.close();
}
}
Selector示意图:

Selector是一个抽象类,常见方法和说明如下:
public abstract class Selector implements Closeable {
public static Selector open(); // 得到一个选择器对象
public int select(long timeout); // 监控所有注册的通道,当其中有 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集合中并返回,参数用来 设置超时时间
public Set<SelectionKey> selectedKeys(); // 从内部集合中得到所有的SelectionKey
}
selector.select() // 阻塞
selector.select(1000); // 阻塞1000毫秒,在1000毫秒后返回
selector.wakeup(); // 唤醒selector
selector.selectNow(); // 不阻塞,立马返回
SelectionKey表示Selector和网络通道的注册关系,该关系共四种:
public static final int OP_READ = 1 << 0; // 代表读操作,值为 1
public static final int OP_WRITE = 1 << 2; // 代表写操作,值为 4
public static final int OP_CONNECT = 1 << 3;// 代表连接已经建立,值为 8
public static final int OP_ACCEPT = 1 << 4; // 有新的网络连接可以 accept,值为 16
public abstract class SelectionKey {
public abstract Selector selector(); // 得到与之关联的Selector对象
public abstract SelectableChannel channel(); // 得到与之关联的通道
public final Object attachment(); // 得到与之关联的共享数据
public abstract SelectionKey interestOps(int ops); // 设置或改变监听事件
public final boolean isAcceptable(); // 是否可以 accept
public final boolean isReadable(); // 是否可以读
public final boolean isWritable(); // 是否可以写
}
Channel注册到Selector需要指定自己感兴趣的事件,感兴趣就是该通道需要关注的事件,如果channel1只有在客户端发来数据的时候执行相应操作,那么它只需要注册SelectionKey.OP_READ, 当发生读事件时,此通道对应的selectionKey.isReadable()方法返回true。除此之外该通道上发生的其他事件并不会被感知。
channel.register(selector, SelectionKey.OP_READ);
// 可以通过|(或运算符)监听多个事件
// channel注册到同一个Selector两次, 那么第二次的注册其实就是相当于更新这个Channel的interestSet为SelectionKey.OP_READ|SelectionKey.OP_WRITE
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);
Channel.register()方法实际上是得到第二个参数与运算后的结果,然后调用该通道对应的SelectionKey.interestOps()方法。也就是说SelectionKey的实现类有一个interestOps属性,该属性的值是对应通道想要监听的事件值与操作后的结果。
ServerSocketChannel是一个可以监听新建TCP连接的通道
public abstract class ServerSocketChannel extends AbstractSelectableChannel implements NetworkChannel{
public static ServerSocketChannel open(); // 得到一个 ServerSocketChannel 通道
public final ServerSocketChannel bind(SocketAddress local); // 设置服务器端端口号
public final SelectableChannel configureBlocking(boolean block); // 设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式
public SocketChannel accept(); // 接受一个连接,返回代表这个连接的通道对象SocketChannel
public final SelectionKey register(Selector sel, int ops); // 注册一个选择器并设置监听事件
}
通过ServerSocketChannel.accept()方法监听新进来的连接。当 accept()方法返回的时候,它返回一个包含新进来的连接的 SocketChannel。因此, accept()方法会一直阻塞到有新连接到达。
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(true);
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
// 业务处理
}
当然ServerSocketChannel也可以设置成非阻塞模式。在非阻塞模式下,accept() 方法会立刻返回,如果还没有新进来的连接,返回的将是null。 因此,需要检查返回的SocketChannel是否是null
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
if(socketChannel != null){
// 业务处理
}
}
SocketChannel是一个连接到TCP网络套接字的通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel{
public static SocketChannel open(); // 得到一个SocketChannel 通道
public final SelectableChannel configureBlocking(boolean block); // 设置阻塞或非阻塞模式,取值false表示采用非阻塞模式
public boolean connect(SocketAddress remote); // 连接服务器
public boolean finishConnect(); // 如果上面的方法连接失败,接下来就要通过该方法完成连接操作
public int write(ByteBuffer src); // 往通道里写数据
public int read(ByteBuffer dst); // 从通道里读数据
public final SelectionKey register(Selector sel, int ops, Object att); // 注册一个选择器并设置监听事件,最后一个参数可以设置共享数据
public final void close(); // 关闭通道
}
NIO核心组件关系图:

相关代码在群聊系统GroupChatServer中查看
ServerSocketChannel监听到连接accept事件SocketChannel并注册到一个Selector上,一个Selector可以注册多个 SocketChannelSelectionKey, 会和该Selector关联(集合)Selector进行监听select()方法, 返回有事件发生的通道的个数Selector.selectedKeys()方法返回所有有事件发生的通道对应的SelectionKey集合SelectionKeySelectionKey.channel()方法反向获取 SocketChannelpackage com.atguigu.nioGroupChat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
/**
* @author RedStar
* @date 2022/07/20 15:45
* @description NIO群聊系统服务端
*/
public class GroupChatServer {
private Selector selector;
private ServerSocketChannel listenChannel;
private static final int PORT = 9528;
/**
* @author RedStar
* @date 2022/7/20 15:47
* @description 构造器
*/
public GroupChatServer() {
try {
// 获取选择器
selector = Selector.open();
// 获取serverSocketChannel
listenChannel = ServerSocketChannel.open();
// 绑定端口
listenChannel.socket().bind(new InetSocketAddress(PORT));
// 设置非阻塞模式
listenChannel.configureBlocking(false);
// 将该listenChannel注册到selector
listenChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @author RedStar
* @date 2022/7/20 15:51
* @description 监听方法
*/
public void listen() {
try {
while (true) {
// selectCount为事件数量
int selectCount = selector.select(2000);
if (selectCount > 0) {
// 获取所有发生事件的SelectionKey集合
Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
while (keyIterator.hasNext()) {
// 取出selectionKey
SelectionKey key = keyIterator.next();
// 监听accept事件
if (key.isAcceptable()) {
SocketChannel sc = listenChannel.accept();
// 设置为非阻塞
sc.configureBlocking(false);
// 将socketChannel注册到seletor并监听读事件
sc.register(selector, SelectionKey.OP_READ);
System.out.println(sc.getRemoteAddress() + " 上线");
}
// 监听read事件
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
System.out.println("通道[" + channel.hashCode() + "] readable");
readData(key);
}
// 监听write事件
// if(key.isWritable()) {
// SocketChannel channel = (SocketChannel) key.channel();
// System.out.println("通道[" + channel.hashCode() + "] writeable" );
// }
// 监听connect事件
// if(key.isConnectable()) {
// SocketChannel channel = (SocketChannel) key.channel();
// System.out.println("通道[" + channel.hashCode() + "] connectable" );
// }
// 删除当前的事件key,防止重复处理
keyIterator.remove();
}
} else {
System.out.println("等待连接...");
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @author RedStar
* @date 2022/7/20 16:08
* @description 读取客户端消息
*/
public void readData(SelectionKey key) {
// 定义一个socketChannel
SocketChannel channel = null;
try {
// 取到关联的channel
channel = (SocketChannel) key.channel();
// 创建buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 获取数据长度
int len = channel.read(buffer);
if (len > 0) {
// 把缓冲区的数据转换成字符串
String msg = new String(buffer.array());
System.out.println("from 客户端:" + msg);
// 向其他客户端转发消息
sendInfoToOtherClients(msg, channel);
}
} catch (IOException e) {
try {
System.err.println(channel.getRemoteAddress() + " 离线了");
// 取消注册
key.cancel();
// 关闭通道
channel.close();
} catch (IOException e2) {
e2.printStackTrace();
}
}
}
/**
* @author RedStar
* @date 2022/7/20 16:17
* @description 消息转发方法
*/
private void sendInfoToOtherClients(String msg, SocketChannel selfChannel) throws IOException {
System.out.println("服务器转发消息...");
for (SelectionKey key : selector.keys()) {
// 通过key获取对应的socketChannel
SelectableChannel targetChannel = key.channel();
if (targetChannel instanceof SocketChannel && targetChannel != selfChannel) {
SocketChannel dest = (SocketChannel) targetChannel;
// 将msg存储到buffer中
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
// 将buffer数据写入到channel
dest.write(buffer);
}
}
}
public static void main(String[] args) {
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}
package com.atguigu.nioGroupChat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
/**
* @author RedStar
* @date 2022/07/20 16:30
* @description NIO群聊系统客户端
*/
public class GroupChatClient {
private final String HOST = "127.0.0.1";
private final int PORT = 9528;
private Selector selector;
private SocketChannel socketChannel;
private String userName;
public GroupChatClient() {
try {
selector = Selector.open();
// 连接服务器
socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
// 设置非阻塞
socketChannel.configureBlocking(false);
// 将socketChannel注册到selector
socketChannel.register(selector, SelectionKey.OP_READ);
// 得到userName
userName = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(userName + " is ok");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* @author RedStar
* @date 2022/7/20 16:35
* @description 向服务器发送消息
*/
public void sendMsg(String message) {
message = userName + ":" + message;
try {
socketChannel.write(ByteBuffer.wrap(message.getBytes()));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @author RedStar
* @date 2022/7/20 16:38
* @description 接收服务器发来的消息
*/
public void receiveMsg() {
try {
// 获取发生事件的数量,客户端只能是0或1
int readCount = selector.select();
if (readCount > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
// 得到相关SocketChannel
SocketChannel sc = (SocketChannel) key.channel();
// 申请buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 读取消息
sc.read(buffer);
//把缓冲区的数据转换成字符串
String msg = new String(buffer.array());
System.out.println(msg.trim());
}
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
GroupChatClient groupChatClient = new GroupChatClient();
// 启动新线程
new Thread(() -> {
while (true) {
groupChatClient.receiveMsg();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
// 获取键盘输入并发送到服务端
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String s = scanner.nextLine();
groupChatClient.sendMsg(s);
}
}
}
传统文件拷贝模式示意图:

由此可见,传统拷贝方式经过4次拷贝、4次状态切换才完成了文件的拷贝操作
mmap 即 memory map,也就是内存映射。mmap 是一种内存映射文件的方法,即将一个文件或者其它对象映射到进程的地址空间,实现文件磁盘地址和进程虚拟地址空间中一段虚拟地址的一一对映关系。
通过mmap方式,能够在系统内存中开辟一片空间和用户进程共享使用,这样就能够将应用缓冲区省掉,从而减少一次拷贝。
mmap拷贝示意图:

由此可见,mmap优化后的拷贝方式经过3次拷贝、4次状态切换完成了文件的拷贝操作
sendfile()系统调用利用DMA引擎将文件中的数据拷贝到操作系统的内核缓冲区中,然后又将数据被拷贝从内核文件缓冲区拷贝到协议栈(网卡)。其中有一次拷贝是从内核文件缓冲区到Socket缓冲区,但是由于只拷贝文件描述符(文件的描述信息),数据量很小忽略不计。
sendFile拷贝示意图:

由此可见,sendFile优化后的拷贝方式经过2次拷贝、2次状态切换完成了文件的拷贝操作
文章参考:尚硅谷韩顺平Netty视频教程
原文链接: https://www.jhxblog.cn/#/home/read?articleid=37