NIO 也叫 Non-Blocking IO 是同步非阻塞的 IO 模型。线程发起 IO 请求后,立即返回。同步指的是必须等待 IO 缓冲区内的数据就绪,而非阻塞指的是,用户线程不原地等待 IO 缓冲区,可以先做一些其他操作,但是要定时轮询检查 IO 缓冲区数据是否就绪。
~
本篇内容包括:Java NIO 介绍、Java NIO 核心组件、NIO 代码示例。
NIO 也叫 Non-Blocking IO 是同步非阻塞的 IO 模型。线程发起 IO 请求后,立即返回。同步指的是必须等待 IO 缓冲区内的数据就绪,而非阻塞指的是,用户线程不原地等待 IO 缓冲区,可以先做一些其他操作,但是要定时轮询检查 IO 缓冲区数据是否就绪。
Java 中的 NIO 是 new IO的意思。其实是 NIO 加上 IO 多路复用技术。普通的 NIO 是线程轮询查看一个 IO 缓冲区是否就绪,而 Java 中的 new IO 指的是线程轮询地去查看一堆 IO 缓冲区中哪些就绪,这是一种 IO 多路复用的思想。IO多路复用模型中,将检查 IO 数据是否就绪的任务,交给系统级别的 select 或 epoll 模型,由系统进行监控,减轻用户线程负担。
NIO 与原来的 IO 有同样的作用和目的,但是使用的方式完全不同,NIO 支持面向缓冲区的、基于通道的 IO 操作。NIO 将以更加高效的方式进行文件的读写操作。NIO 可以理解为非阻塞 IO,传统的 IO 的 read 和 write 只能阻塞执行,线程在读写 IO 期间不能干其他事情,比如调用 socket.read() 时,如果服务器一直没有数据传输过来,线程就一直阻塞,而 NIO 中可以配置 socket 为非阻塞模式。
NIO主要有 buffer、channel、selector 三种技术的整合,通过零拷贝的 buffer 取得数据,每一个客户端通过 channel 在 selector(多路复用器)上进行注册。服务端不断轮询 channel 来获取客户端的信息。channel 上有 connect、accept(阻塞)、read(可读)、write(可写)四种状态标识。根据标识来进行后续操作。所以一个服务端可接收无限多的 channel。不需要新开一个线程。大大提升了性能。
NIO 通信模型图:
NIO 有三大核心部分:Channel(通道) ,Buffer( 缓冲区),Selector(选择器)
缓冲区本质上是一块可以写入数据,然后可以从中读取数据的内存。这块内存被包装成 NIO Buffer 对象,并提供了一组方法,用来方便的访问该块内存。相比较直接对数组的操作,Buffer API 更加容易操作和管理。Channel 提供从文件、网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer
在一般的 Java IO 操作中,我们以流式的方式,顺序的从一个 Stream 中读取一个或者多个字节,直至读取所有字节。因为它没有缓存区,所以我们就不能随意改变读取指针的位置。
我们在从 Channel 中读取数据到 Buffer 中,这样 Buffer 中就有了数据后,我们就可以对这些数据进行操作了。并且不同于一般的 Java IO 操作那样是顺序操作,NIO 中我们可以随意的读取任意位置的数据,这样大大增加了处理过程中的灵活性。
在NIO中,Buffer是一个顶层父类,它是一个抽象类,常用的Buffer子类有:
对于Java中的基本数据类型,都有一个具体 Buffer 类型与之相对应,最常用的自然是 ByteBuffer 类(二进制数据),该类的主要方法如下所示 :
public abstract ByteBuffer put(byte[] b);
:存储字节数据到缓冲区public abstract byte[] get();
:从缓冲区获得字节数据public final byte[] array();
:把缓冲区数据转换成字节数组public static ByteBuffer allocate(int capacity);
:设置缓冲区的初始容量public static ByteBuffer wrap(byte[] array);
:把一个现成的数组放到缓冲区中使用public final Buffer flip();
:翻转缓冲区,将缓冲区进行读写切换。Java NIO 的通道类似流,但又有些不同:既可以从通道中读取数据,又可以写数据到通道。但流的(input 或 output)读写通常是单向的。通道可以非阻塞读取和写入通道,通道可以支持读取或写入缓冲区,也支持异步地读写。
Java IO 的各种流是阻塞的 IO 操作。这就意味着,当一个线程执行读或写 IO 操作时,该线程会被阻塞,直到有一些数据被读取,或者数据完全写入。
Java NIO 可以让我们非阻塞的使用 IO 操作,例如:
也就是说,线程可以将非阻塞 IO 的空闲时间用于在其他 Channel 上执行 IO 操作。所以,一个单独的线程,可以管理多个 Channel 的读取和写入 IO 操作。
常用的Channel类有:FileChannel、DatagramChannel、ServerSocketChannel和SocketChannel。
FileChannel类,该类主要用来对本地文件进行IO操作,主要方法如下所示 :
public int read(ByteBuffer dst)
:读取数据并放到缓冲区中public int write(ByteBuffer src)
:把缓冲区的数据写到通道中public long transferFrom(ReadableByteChannel src,long position,long count)
:从目标通道中复制数据public long transferTo(long position,long count,WritableByteChannel target)
:把数据从当前通道复制给目标通道import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileWriteTest {
public static void main(String[] args) throws IOException {
String str = "HELLO,NIO,我是我";
// 创建输出流
FileOutputStream fileOutputStream = new FileOutputStream("/Users/lizhengi/test/iodemo/demo.txt");
// 从流中得到一个通道
FileChannel fileChannel = fileOutputStream.getChannel();
// 提供一个缓冲区
ByteBuffer allocate = ByteBuffer.allocate(1024);
// 往缓冲区中存入数据
allocate.put(str.getBytes());
// 缓冲区进行读写切换。
// 当数据写入到缓冲区中时,指针指向数据最后一行,那么缓冲区写入通道中输出时,是从最后一行数据开始写入,
// 这样就会导致写入1024的剩余没有数据的空缓冲区。所以需要翻转缓冲区,重置位置到初始位置。
allocate.flip();
// 把缓冲区写到通道中,通道负责把数据写入到文件中
fileChannel.write(allocate);
// 关闭输出流,因为通道是输出流创建的,所以会一起关闭
fileOutputStream.close();
}
}
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
public class FileReadTest {
public static void main(String[] args) throws IOException {
File file = new File("/Users/lizhengi/test/iodemo/demo.txt");
// 1. 创建输入流
FileInputStream fis = new FileInputStream(file);
// 2. 得到一个通道
FileChannel fc = fis.getChannel();
// 3. 准备一个缓冲区
ByteBuffer buffer = ByteBuffer.allocate((int)file.length());
// 4. 从通道里读取数据并存到缓冲区中
fc.read(buffer);
System.out.println(new String(buffer.array()));
// 5.关闭
fis.close();
}
}
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.channels.FileChannel;
public class FileCopyTest {
public static void main(String[] args) throws IOException {
//1. 创建两个流
FileInputStream fis = new FileInputStream("/Users/lizhengi/test/iodemo/demo.txt");
FileOutputStream fos = new FileOutputStream("/Users/lizhengi/test/iodemo/temp.txt");
// 2. 得到两个通道
FileChannel sourceFc = fis.getChannel();
FileChannel destFc = fos.getChannel();
//3. 复制
destFc.transferFrom(sourceFc,0,sourceFc.size());
//4.关闭
fis.close();
fos.close();
}
}
Selector 是一个 Java NIO 组件,能够检测多个注册的 NIO 通道上是否有事件发生,便获取事件然后针对每个事件进行相应的响应处理。这样就可以只用一个线程去管理多个通道,也就是管理多个连接。这样使得只用在连接真正有读写事件发生时,才会调用函数来进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程,并且避免了多线程之间的上下文切换导致的开销。
选择器(Selector)是 NIO 能实现非阻塞的基础
程序切换到哪个 Channel 是由事件决定的,每个 Channel 都会对应一个 Buffer。
Selector 会根据不同的事件,在各个通道上切换,一个线程对应一个 Selector,一个 Selector 对应多个 Channel(连接)。
该类的常用方法如下所示 :
public static Selector open()
:得到一个选择器对象public int select(long timeout)
:监控所有注册的 Channel,当其中有注册的 IO 操作可以进行时,将对应的 SelectionKey 加入到内部集团中并返回,参数用来设置超时时间public Set selectedKeys()
:从内部集合中得到所有的 SelectionKeySelectionKey,代表了 Selector 和 serverSocketChannel 的注册关系,一共四种 :
该类的常用方法如下所示 :
public abstract Selector selector()
,得到与之关联的 Selector 对象public abstract SelectorChannel channel()
,得到与之关联的通道public final Object attachment()
,得到与之关联的共享数据public abstract SelectionKey interestOps(int ops)
,设置或改变监听事件public final boolean isAcceptable()
,是否可以 acceptpublic final boolean isReadable()
,是否可以读public final boolean isWritable()
,是否可以写ServerSocketChannel,用来在服务器端监听新的客户端 Socket 连接,常用方法如下所示 :
public static ServerSocketChannel open()
,得到一个 ServerSocketChannel 通道public final ServerSocketChannel bind(SocketAddress local)
,设置服务器端端口号public final SelectableChannel configureBlocking(boolean block)
,设置阻塞或非阻塞模式,取值 false 表示采用非阻塞模式public SocketChannel accept()
,接受一个连接,返回代表这个连接的通道对象public final SelectionKey register(Selector sel,int ops)
,注册一个选择器并设置监听事件SocketChannel,网络IO通道,具体负责进行读写操作。NIO总是把缓冲区的数据写入通道,或者把通道里的数据读出到缓冲区(buffer)。常用方法如下所示 :
public static SocketChannel open()
,得到一个SocketChannel通道public final SelectableChannel configureBlocking(boolean block)
,设置阻塞或非阻塞模式,取值false表示采用非阻塞模式public boolean connect(SocketAddress remote)
,连接服务器public boolean finishConnect()
,如果上面的方法连接失败,接下来就要通过该方法完成连接操作public int write(ByteBuffer src)
,往通道里写数据public int read(ByteBuffer dst)
,从通道里读数据public final SelectionKey register(Selector sel,int ops,Object att)
,注册一个选择器并设置监听事件,最后一个参数可以设置共享数据public final void close()
,关闭通道服务端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
/**
* 选择器
*/
private Selector selector;
/**
* 默认服务绑定端口
*/
private static final int DEFAULT_BIND_PORT = 9000;
public NIOServer(int port) {
initServer(port);
}
private void initServer(int port) {
try {
// 开启一个服务通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 将通道绑定到指定端口
serverChannel.bind((port < 1 || port > 65535) ?
new InetSocketAddress(DEFAULT_BIND_PORT) :
new InetSocketAddress(port));
// 将通道设置为非阻塞模式
serverChannel.configureBlocking(false);
// 打开一个 IO 监视器:Selector
this.selector = Selector.open();
// 将服务通道注册到 Selector 上,并在服务端通道注册 OP_ACCEPT 事件
serverChannel.register(this.selector, SelectionKey.OP_ACCEPT);
} catch (IOException ioException) {
ioException.printStackTrace();
System.out.println("init exception: " + ioException);
}
}
public void startServer() throws InterruptedException {
while (true) {
System.out.println("Selector 巡查 IO 事件---------------开始");
try {
int ioEventCount = this.selector.select(); // 此处以收集到所有 IO 事件
System.out.println("Selector 检测到:" + ioEventCount);
} catch (IOException ioException) {
ioException.printStackTrace();
break;
}
// 对各个 IO 事件做出对应的响应
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove(); //通过调用迭代器的 remove() 方法将这个键 key 从已选择键的集合中删除
try {
// 可接收连接 能注册SelectionKey.OP_ACCEPT事件的只有 ServerSocketChannel通道
if (key.isAcceptable()) {
System.out.println("监控到 OP_ACCEPT 连接事件");
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 接受客户端连接
SocketChannel client = server.accept();
System.out.println("Accept connection from " + client);
client.configureBlocking(false); // 设置客户端通道非阻塞
// 为客户端通道注册 OP_WRITE 和 OP_READ 事件
SelectionKey clientKey = client.register(selector,
SelectionKey.OP_WRITE |
SelectionKey.OP_READ);
// 为客户端通道添加一个数据缓存区
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
// 可读数据
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
int read = client.read(output);
System.out.println("Read data from client: " + client);
System.out.println("------------MSG : " + output.toString());
System.out.println(read);
}
// 可写数据
if (key.isWritable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output);
output.compact();
System.out.println("Write data to " + client);
}
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
Thread.sleep(2000);// 为了观察控制台打印数据
System.out.println("Selector 巡查 IO 事件---------------完成");
}
}
public static void main(String[] args) throws InterruptedException {
NIOServer nioServer = new NIOServer(DEFAULT_BIND_PORT);
nioServer.startServer();
}
}