NIO有三大核心部分:Channel(通道),Buffer(缓冲区),Selector(选择器)。
NIO是面向缓冲区编程的。数据被读取到一个可以稍后处理的缓冲区中,待需要时再进行处理。
缓冲区本质上是一个可以读写数据的内存块。Buffer类的几个属性如下所示:
public abstract class Buffer {
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
long address;
}
Buffer中常用方法以及期间position,limit,capacity的变化:
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println("allocate: " + buffer);
// 放入数据
buffer.put("12".getBytes(StandardCharsets.UTF_8));
System.out.println("put: " + buffer);
System.out.println(buffer.get());
// 直接get会移位
System.out.println("get1: " + buffer);
// 输出1的asci码
System.out.println(buffer.get(0));
// 切换读写模式
buffer.flip();
System.out.println("flip: " + buffer);
// 再去读就能读到内容了
System.out.println(buffer.get());
System.out.println("get2: " + buffer);
// 把还未读的内容放到缓冲区最前面
buffer.compact();
System.out.println("compact: " + buffer);
// 重置一下position的位置
buffer.rewind();
System.out.println("rewind: " + buffer);
buffer.clear();
System.out.println("clear: " + buffer);
}
输出结果:
allocate: java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]
put: java.nio.HeapByteBuffer[pos=2 lim=10 cap=10]
0
get1: java.nio.HeapByteBuffer[pos=3 lim=10 cap=10]
49
flip: java.nio.HeapByteBuffer[pos=0 lim=3 cap=10]
49
get2: java.nio.HeapByteBuffer[pos=1 lim=3 cap=10]
compact: java.nio.HeapByteBuffer[pos=2 lim=10 cap=10]
rewind: java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]
clear: java.nio.HeapByteBuffer[pos=0 lim=10 cap=10]
Direct Buffer
: 堆外内存时分配在C Heap上的Buffer, 不属于JVM堆。DirectByteBuffer本身在java堆内,但真正存放数据的Buffer在堆外本地内存中,这是malloc()
分配出来的内存,是用户态的。
它可以减少一次从HeapByteBuffer
-> DirectByteBuffer
的拷贝,正常的young gc 或者 mark and compact 的时候,如果需要将java里的byte[]对象的引用传给native代码时需要将HeapByteBuffer中的内容拷贝到一个DirectByteBuffer,然后再发送DirectByteBuffer中的数据,如果直接使用DirectByteBuffer的话可以减少一次拷贝。
常用Channel :
FileChannel
: 文件读取DatagramChannel
: UDP通信ServerSocketChannel
: 监听TCP连接SocketChannel
:TCP通信,同时处于服务端和客户端SocketChannel
获取private void socketChannelDemo() throws IOException {
// 开启一个socketChannel
SocketChannel socketChannel = SocketChannel.open();
// 设置socketChannel为非阻塞模式
socketChannel.configureBlocking(false);
// 设置连接地址
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (!socketChannel.finishConnect()) {
// 其他操作
}
}
SocketChannel
读取数据:// 读取SocketChannel
// 准备一个buffer,作为承载内容
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 当返回值为-1时表示读取操作结束
int read = socketChannel.read(buffer);
SocketChannel
写入数据:// 写入SocketChannel
buffer.flip();
socketChannel.write(buffer);
Buffer.flip()
在此处所做的操作是:
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
即把缓冲区设为读模式。
SocketChannel
:// 关闭SocketChannel
socketChannel.shutdownOutput();
socketChannel.close();
在关闭SocketChannel前,如果通道用于写入数据,则建议调用一次shutdownOutput()
方法,向对方发送一个输出的结束标志,再调用close()
方法进行关闭。
读操作同理,socketChannel
存在 shutdownInput()
方法。
FileChannel 只能为阻塞模式,不能设置为非阻塞模式。
FileChannel
的获取// 文件输入流 只能读
FileInputStream inputStream = new FileInputStream("文件路径");
// 获取文件流输入流的通道
FileChannel inputStreamChannel = inputStream.getChannel();
// 创建一个文件输出流 只能写
FileOutputStream fileOutputStream = new FileOutputStream("文件输出路径");
// 创建一个文件输出流通道
FileChannel outputStreamChannel = fileOutputStream.getChannel();
// 通过RandomAccessFile(文件随机访问)类来获取FileChannel实例,可读可写
RandomAccessFile randomAccessFile = new RandomAccessFile("文件路径","rw");
FileChannel accessFileChannel = randomAccessFile.getChannel();
// 进行读写操作后,数据是存在操作系统的缓存里,如果需要强制刷盘,则需要调用
accessFileChannel.force(true);
private void writeFileChannelDemo() throws IOException {
String filePath = "targetFile.txt";
RandomAccessFile randomAccessFile = new RandomAccessFile(filePath, "rw");
FileChannel fileChannel = randomAccessFile.getChannel();
ByteBuffer allocate = ByteBuffer.allocate(1024);
allocate.put("asdfqwerzxvd".getBytes(StandardCharsets.UTF_8));
allocate.flip();
fileChannel.write(allocate);
fileChannel.close();
}
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
public class ChatServer {
private int port = 8080;
private ServerSocketChannel serverSocketChannel;
private Selector selector;
// 相当于一个标志位
private volatile boolean isClosed;
private List<SocketChannel> socketChannelList = new ArrayList<>();
private void dealWithMessage() {
// nio编程步骤:
try {
// 1. 创建ServerSocketChannel通道,绑定监听端口
serverSocketChannel = ServerSocketChannel.open();
// 此处绑定的是channel里面的socket
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 2. 设置通道为非阻塞模式
serverSocketChannel.configureBlocking(false);
// 3. 创建Selector选择器
selector = Selector.open();
// 4. 把channel注册到选择器上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器初始化工作结束");
// 开始正式的数据处理
while (!isClosed) {
// 5. 调用Selector的select方法,监测通道的就绪状态
selector.select();
// 6. 调用SelectedKeys获取到选择键集合
Set<SelectionKey> selectionKeys = selector.selectedKeys();
// 7. 遍历就绪channel集合,获取其事件类型,实现具体业务
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
handlerChannel(key);
// 8. 删除处理完的事件
keyIterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void handlerChannel(SelectionKey key) throws IOException {
if(key.isAcceptable()) {
// 连接事件处理
acceptChannel(key);
} else if(key.isReadable()) {
// 读事件处理
readChannel(key);
}
}
private void acceptChannel(SelectionKey key) throws IOException {
System.out.println("欢迎新客户端上线");
// 有新客户端上线的时候需要将其注册到selector
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel acceptChannel = serverSocketChannel.accept();
channel.configureBlocking(false);
acceptChannel.register(selector, SelectionKey.OP_ACCEPT);
// 这是客户端的channel list
socketChannelList.add(acceptChannel);
System.out.println("来自" + acceptChannel.getRemoteAddress() + "的客户端上线了");
}
private void readChannel(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readLen = socketChannel.read(byteBuffer);
if(readLen != -1) {
String content = new String(byteBuffer.array(), 0, readLen);
System.out.println(content);
}
// 改变监听事件
key.interestOps(SelectionKey.OP_READ);
}
}