Selector 一般称 为选择器 ,当然你也可以翻译为 多路复用器 。它是Java NIO核心组件中的一个,用于检查一个或多个NIO Channel(通道)的状态是否处于可读、可写。如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。
上一篇进行文件IO时用到的FileChannel并不支持非阻塞操作,NIO主要就是进行网络IO的学习, Java NIO中的网络通道是非阻塞IO的实现,基于事件驱动,非常适用于服务器需要维持大量连接但数据交换量不大的情况,例如一些即时通信的服务等等。
在之前的BIO中我们已经编写过Socket服务器:
1、ServerSocket–BIO模式的服务器,一个客户端一个线程,虽然写起来简单,但是如果连接越多,线程就会越多,容易耗尽服务器资源而使其宕机。
2、使用线程池优化–让每个客户端的连接请求交给固定数量线程的连接池处理,写起来简单还能处理大量连接。但是线程的开销依然不低,如果请求过多,会出现排队现象。
如果使用java中的NIO,就可以用非堵塞的IO模式处理,这种模式下可以使用一个线程,处理大量的客户端连接请求。
只有在连接真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程, 不用去维护多个线程。避免了多线程之间的上下文切换导致的开销。
Selector:选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态。
Selector是一个抽象类,实际使用的时候用的是SelectorImpl。
1、无参的select():
Selector类的select()方法会无限阻塞等待,直到有信道准备好了IO操作,或另一个线程唤醒了它(调用了该选择器的wakeup())返回SelectionKey
2、带有超时参数的select(long time):
当需要限制线程等待通道就绪的时间时使用,如果在指定的超时时间(以毫秒计算)内没有通道就绪时,它将返回0。将超时参数设为0表示将无限期等待,那么它就等价于select( )方法了。
3、selectNow()是完全非阻塞的:
该方法执行就绪检查过程,但不阻塞。如果当前没有通道就绪,它将立即返回0
通过调用Selector.open()方法创建一个Selector对象,如下:
Selector selector = Selector.open();
SelectionKey:一个SelectionKey键表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。这种注册的关系共有四种:
常用方法:
1、configureBlocking()方法:设置阻塞或非阻塞模式
public abstract SelectableChannel configureBlocking(boolean block)
SelectableChannel抽象类的configureBlocking() 方法是由 AbstractSelectableChannel抽象类实现的,SocketChannel、ServerSocketChannel、DatagramChannel都是直接继承了
AbstractSelectableChannel抽象类 (上图明确展示了这些类关系)。
2、register()方法 注册一个选择器并设置监听事件
public abstract SelectionKey register(Selector sel, int ops)
register() 方法的第二个参数是一个interset集合 ,指通过Selector监听Channel时对什么事件感兴趣。
可以监听四种不同类型的事件:
Connect
Accept
Read
Write
通道触发了一个事件意思是该事件已经就绪。比如某个Channel成功连接到另一个服务器称为“ 连接就绪 ”。一个ServerSockeChannel准备好接收新进入的连接称为“ 接收就绪 ”。一个有数据可读的通道可以说是“ 读就绪 ”。等待写数据的通道可以说是“ 写就绪 ”。
这四种事件用SelectionKey的四个常量来表示:
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
如果你对不止一种事件感兴趣,使用或运算符即可,如下:
int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
ServerSocketChannel用来在服务器端监听新的客户端Socket连接.
常用的方法除了两个从SelectableChannel类中继承而来的两个方法configureBlocking()和register()方法外,还有以下要记住的方法:
SocketChannel,网络IO通道,具体负责读写操作。NIO总是把缓冲区的数据写入通道,或者把通道里的数据读出到缓冲区(buffer) 。
常用方法如下所示:
常用的方法除了两个从SelectableChannel类中继承而来的两个方法configureBlocking()和register()方法外,还有以下要记住的方法:
服务器端
package NIOTest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
public static void main(String[] args) throws IOException {
//创建一个 服务器
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
// 绑定服务器端口
serverSocketChannel.bind(new InetSocketAddress(10086));
// (重要)设置为非堵塞模式
serverSocketChannel.configureBlocking(false);
// 创建selector对象
Selector selector=Selector.open();
// 将通道注册到selector中, 并监听请求连接时间
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 循环等待客户端的链接
while(true){
if (selector.select(5000)==0){
System.out.println("服务器:怎么还没有人连接我,我生气了!!!");
/*
* 服务器干自己的事情,但是一直在监听客户端的连接
* */
continue;
}
// 获取Selectionkey 集合
Set <SelectionKey> selectionKeys=selector.selectedKeys();
Iterator <SelectionKey> iterator=selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey key=iterator.next();
if (key.isAcceptable()){
//获取连接的客户端 serverSocketChannel负责连接操作
SocketChannel socketChannel=serverSocketChannel.accept();
//设置为非堵塞
socketChannel.configureBlocking(false);
System.out.println("客户端进来了新的连接"+socketChannel.hashCode());
//注册
socketChannel.register(selector,SelectionKey.OP_READ, ByteBuffer.allocate(1024));
}
if (key.isReadable()){ //读 --》 客户端发送来的消息 SocketChannel 负责读写操作
SocketChannel socketChannel=(SocketChannel) key.channel();
ByteBuffer buffer =(ByteBuffer) key.attachment();
socketChannel.read(buffer);
// System.out.println(bytes.length); //1024
// String msg=new String(buffer.toString()); //java.nio.HeapByteBuffer[pos=11 lim=1024 cap=1024]
buffer.flip();
byte [] data =new byte[buffer.limit()];
buffer.get(data);
System.out.println("客户端发来消息了"+new String(data));
}
iterator.remove();
}
}
}
}
当没有客户端连接的时候:每五秒输出一次
客户端:
package NIOTest;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
public class NIOClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel=SocketChannel.open();
socketChannel.configureBlocking(false);
InetSocketAddress inetSocketAddress= new InetSocketAddress("127.0.0.1",10086);
if (!socketChannel.connect(inetSocketAddress)){
while (!socketChannel.finishConnect()){
System.out.println("没连接上我继续连接,我先干点其他的事");
}
}
String msg="hi,小李";
ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());
socketChannel.write(buffer);
System.in.read();
}
}
客户端:
package chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
public class chatClient {
private final String HOSTNAME="127.0.0.1";
private final int PORT =10086;
private SocketChannel socketChannel;
private Selector selector;
private String userName;
public chatClient(){
//初始化
try {
socketChannel=SocketChannel.open(new InetSocketAddress(HOSTNAME,PORT));
socketChannel.configureBlocking(false);
selector=Selector.open();
socketChannel.register(selector, SelectionKey.OP_READ);
userName=socketChannel.getLocalAddress().toString();
System.out.println(userName+"准备好了!!");
} catch (Exception e) {
e.printStackTrace();
}
}
public void sendInfo(String info){
try {
info=userName+info;
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
public void readInfo(){
try {
int readyChannel =selector.select();
if (readyChannel>0){
Iterator <SelectionKey>iterator=selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if (key.isReadable()){
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=socketChannel.read(buffer);
String msg =new String(buffer.array(),0,len);
System.out.println(msg);
}
iterator.remove();
}
}else {
System.out.println("没有准备就绪的通道");
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
chatClient chatClient=new chatClient();
new Thread(new Runnable() {
@Override
public void run() {
while (true){
chatClient.readInfo();
}
}
}).start();
Scanner scanner=new Scanner(System.in);
while (scanner.hasNextLine()){
String msg=scanner.nextLine();
chatClient.sendInfo(msg);
}
}
}
服务端:
package chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class chatServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private static final int PORT=10086;
public chatServer() {
//完成属性的初始化
try {
serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
selector=Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
//监听事件,监听客户端的连接
public void listening(){
try {
while (true){
int num=selector.select(5000);
if (num>0){
Iterator <SelectionKey>iterator=selector.selectedKeys().iterator();
while (iterator.hasNext()){
SelectionKey key = iterator.next();
if (key.isAcceptable()){
//连接到服务器的客户端
SocketChannel socketChannel=serverSocketChannel.accept();
socketChannel.configureBlocking(false);//非堵塞
socketChannel.register(selector,SelectionKey.OP_READ);
System.out.println("欢迎"+socketChannel.getRemoteAddress()+"连接成功 进入了聊天室内!");
}
if (key.isReadable()){ //处理客户端的通信请求
//处理数据的读取,和转发给除了自己之外的客户端
ReadData(key);
}
//避免 一直处理同一事件
iterator.remove();
}
}else {
System.out.println("server一直在等待客户端连接");
}
}
}catch (IOException e) {
e.printStackTrace();
}
}
//处理数据的读取
private void ReadData(SelectionKey key) {
SocketChannel socketChannel=null;
try {
//拿到了当前的客户端消息
socketChannel=(SocketChannel) key.channel();
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len= socketChannel.read(buffer);
if (len>0){
buffer.flip();
byte [] data =new byte[buffer.limit()];
buffer.get(data);
String msg=new String(data);
System.out.println("客户端发送来了消息"+msg);
//转发消息给其他客户端
System.out.println("服务器开始转发消息");
for (SelectionKey selectionKey : selector.keys()) {
SelectableChannel channel = selectionKey.channel();
//排除除了自己的其他客户端的连接
if (channel instanceof SocketChannel && channel!=socketChannel){
SocketChannel client=(SocketChannel) channel;
ByteBuffer buffer1=ByteBuffer.wrap(msg.getBytes());
client.write(buffer1);
}
}
}
} catch (IOException e) {
try {
System.out.println(socketChannel.getRemoteAddress()+"离开了聊天室");
key.cancel();//取消注册
socketChannel.close();//关闭通道
} catch (IOException ex) {
//ex.printStackTrace();
}
}
}
public static void main(String[] args) {
chatServer chatServer=new chatServer();
chatServer.listening();
}
}
BIO:Block IO 同步阻塞式 IO,就是我们平常使用的传统 IO,它的特点是模式简单使用方便,并发处理能力低。
NIO:Non IO 同步非阻塞 IO,是传统 IO 的升级,客户端和服务器端通过 Channel(通道)通讯,实现了多路复用。
BIO (Blocking I/O): 同步阻塞I/O模式,数据的读取写入必须阻塞在一个线程内等待其完成。在活动连接数不是特别高(小于单机1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的I/O 处理模型来应对更高的并发量。
NIO (New I/O): NIO是一种同步非阻塞的I/O模型,在Java 1.4 中引入了NIO框架,对应java.nio 包,提供了 Channel , Selector,Buffer等抽象。NIO中的N可以理解为Nonblocking,不单纯是New。它支持面向缓冲的,基于通道的I/O操作方法。 NIO提供了与传统BIO模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。