多路复用
Selector 是 NIO 中的选择器,也称多路复用器Channel 的 状态ChannelChannel 都可以被多路复用,需要继承 SelectableChannel
SocketChannel 都继承了FileChannel 没有继承注册
Channel.register(Selector selector, int ops) 完成
selector 是一个指定的选择器ops 是 selector 的关注行为Selector 上注册 Channel,二者的数量关系是多对多的
Selector 上可以注册多个 Channel(废话,否则选择个屁)Channel 可以注册到多个选择器,并指定选择器的 关注行为(比如读或写)Channel 不能在同一个 Selector 上注册多次Channel 一旦注册就会 一直在选择器中存在取消注册
通过选择键的 cancel()
Channel 的状态与 Selector 的关注行为
Selector 的关注行为一一对应,反映了 Channel 当前可以执行的行为Selector 的关注行为就是 选择器关注 Channel 是否处于对应的状态SelectionKey.Key 中定义,包括
SelectionKey.Key.OP_READSelectionKey.Key.OP_WRITE,通道关闭也会触发此行为SelectionKey.Key.OP_CONNECTSelectionKey.Key.OP_ACCEPT位或 进行,如 SelectionKey.Key.OP_READ | SelectionKey.Key.OP_WRITE监听/选择
select() 进行选择Channel 时,只对注册的 关注行为 进行反应Channel 处于某状态,若此状态不被当前选择器关注,就不会进行任何动作Channel 放入选择键集合选择键
Selector 与 Channel 的 注册关系 与 关注行为cancel() 方法注销选择器上的 Channel
cancel() 只会将当前选择键 加入需要被注销的选择键队列,随后自动在下一次 select() 时注销continue; 以触发下一轮选择CancelledKeyException创建
Selector.open()
关闭
close()
select() 而阻塞额线程ChannelChannel
注册
Channel.register(Selector selector, int ops)
Channel 必须处于非阻塞模式
IllegalBlockingModeExceptionFileChannel 不能向选择器注册,因为没有非阻塞模式validOps() 获取 Channel 在指定选择器上注册的行为列表选择
select()
Channel 的数量select() 一共有三种变种,横向对比见下表
select() 阻塞至至少一个 Channel 就绪select(timeout) 限制最长阻塞时间selectNow() ,不阻塞,立即获取停止选择
wakeup()
select() 立即返回
select() 阻塞,此方法可以使它立即返回select() 阻塞,此方法可以使它的下一次 select() 立即返回选择键
selectKeys()
获取现在所有已经就绪的选择键
案例说明
ServerSocketChannel 多路复用监听端口
SocketChannel 绑定 ServerSocketChannel 并发送信息
代码
server 端
@Component
public class SelectorHandlerDemo {
public void handle(){
try (
ServerSocketChannel server = ServerSocketChannel.open();
Selector selector = Selector.open();
){
server.configureBlocking(false);
server.bind(new InetSocketAddress(9999));
server.register(selector, SelectionKey.OP_ACCEPT);
SelectionKey key = null; // 每轮获取到的 keys 中的一个
while(true){
if(0 == selector.selectNow()){
TimeUnit.MILLISECONDS.sleep(200);
continue;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while(keys.hasNext()){
key = keys.next();
keys.remove();
if(!key.isValid())
key.cancel();
if(key.isAcceptable()) {
doAccept(key, selector);
continue;
}
if(key.isConnectable()){
doConnect(key,selector);
continue;
}
if(key.isReadable()){
doRead(key,selector);
continue;
}
if(key.isWritable()) {
doWrite(key, selector);
continue;
}
}
}
}catch (Exception e){
e.printStackTrace();
}
}
private void doValid(SelectionKey key, Selector selector) {}
private void doWrite(SelectionKey key, Selector selector) {
try {
key.channel().close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void doRead(SelectionKey key, Selector selector) {
try {
SelectableChannel selected = key.channel();
if(!(selected instanceof SocketChannel)){
System.out.println("类型异常:" + key);
return;
}
SocketChannel socket = (SocketChannel) selected;
ByteBuffer buf = ByteBuffer.allocate(10);
List<byte[]> bufs = new ArrayList<>();
for(int length=socket.read(buf);-1!=length;buf.clear(),length=socket.read(buf)){
buf.flip();
bufs.add(ArrayUtil.sub(buf.array(),0,Math.min(length,10)));
}
key.cancel();
System.out.println(new String(ArrayUtil.addAll(bufs.toArray(new byte[][]{})), StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
}
}
private void doConnect(SelectionKey key, Selector selector) {}
private void doAccept(SelectionKey key, Selector selector) {
try {
SelectableChannel selected = key.channel();
if(!(selected instanceof ServerSocketChannel)){
System.out.println("类型异常:" + key);
return;
}
ServerSocketChannel server = (ServerSocketChannel) selected;
SocketChannel socket = null;
socket = server.accept();
socket.configureBlocking(false);
socket.register(selector,SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
}
client 端
@Component
public class SelectorCallerDemo {
public void call(){
try (
SocketChannel channel = SocketChannel.open(new InetSocketAddress("192.168.3.7",9999))
){
channel.configureBlocking(false);
ByteBuffer buf = ByteBuffer.allocate(1024);
buf.put(String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
buf.flip();
channel.write(buf);
} catch (Exception e) {
e.printStackTrace();
}
}
}