- package com.zoo.nio;
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Date;
- import java.util.Iterator;
-
-
- public class ClientTest {
-
- public static void main(String[] args) throws Exception{
- //1.获取通道
- SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",4201));
- //2.切换非阻塞模式
- sChannel.configureBlocking(false);
- //3.分配指定大小缓冲区
- ByteBuffer buf = ByteBuffer.allocate(1024);
- //4.发送数据给服务端
- buf.put(new Date().toString().getBytes());
- buf.flip();
- sChannel.write(buf);
- buf.clear();
- //5.关闭通道
- sChannel.close();
- }
- }
- package com.zoo.nio;
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Date;
- import java.util.Iterator;
-
-
- public class ServiceTest {
- public static void main(String[] args) throws Exception{
- //1.获取通道
- ServerSocketChannel ssChannel = ServerSocketChannel.open();
- //2.切换非阻塞模式
- ssChannel.configureBlocking(false);
- //3.绑定连接
- ssChannel.bind(new InetSocketAddress(4201));
- //4.获取选择器
- Selector selector = Selector.open();
- //5.将通道注册到选择器上,并指定监听事件
- ssChannel.register(selector, SelectionKey.OP_ACCEPT);
- //6.轮询式获取选择器的准备就绪事件
- while(selector.select() > 0){
- //7.获取当前选择器中所有注册的选择键(已就绪的监听事件)
- Iterator
iterator = selector.selectedKeys().iterator(); - //8.获取准备就绪的事件
- while(iterator.hasNext()){
- SelectionKey sk = iterator.next();
- //9.判断什么事件准备就绪
- if (sk.isAcceptable()){
- //10.获取客户端连接
- SocketChannel sChannel = ssChannel.accept();
- //11.切换非阻塞模式
- sChannel.configureBlocking(false);
- //12.将该通道注册到选择器上
- sChannel.register(selector,SelectionKey.OP_READ);
- }else if(sk.isReadable()){//读就绪
- //13.获取当前选择器上读就绪状态的通道
- SocketChannel sChannel = (SocketChannel) sk.channel();
-
- //14.读取数据
- ByteBuffer buf = ByteBuffer.allocate(1024);
- int len = 0;
- while ((len=sChannel.read(buf))>0){
- buf.flip();
- System.out.println(new String(buf.array(),0,len));
- buf.clear();
- }
- }
- //15.取消选择键
- iterator.remove();
- }
- }
- }
- }
- //KQueueSelectorImpl.java
- protected int doSelect(long var1) throws IOException {
- boolean var3 = false;
- if (this.closed) {
- throw new ClosedSelectorException();
- } else {
- //处理注销的selectionKey队列
- this.processDeregisterQueue();
-
- int var7;
- try {
- this.begin();
- // 调用了poll方法,底层调用了native的epollCtl和epollWait方法
- var7 = this.kqueueWrapper.poll(var1);
- } finally {
- this.end();
- }
-
- this.processDeregisterQueue();
- return this.updateSelectedKeys(var7);
- }
- }
-
-
- //KQueueArrayWrapper.java
- // 这里抛出个问题,var1=timeout=-1 为什么超时是-1?
- int poll(long var1) {
- this.updateRegistrations();
- //Mac系统基于UNIX,window可能不一样,阻塞调用等待事件返回结果
- int var3 = this.kevent0(this.kq, this.keventArrayAddress, 128, var1);
- return var3;
- }
-
- //kqueue是在UNIX上比较高效IO复用技术。
- //所谓的IO复用,就是同时等待多个文件描述符就绪,以系统调用的形式提供。如果所有文件描述符都没有就绪的话,该系统调用阻塞,否则调用返回,允许用户进行后续的操作。
- //常见的IO复用技术有select, poll, epoll以及kqueue等等。其中epoll为Linux独占,而kqueue则在许多UNIX系统上存在
- //kevent() 是阻塞调用,等到有事件才返回。阻塞时线程处于sleep状态,有事件时系统激活kqueue,kevent()返回
- private native int kevent0(int var1, long var2, int var4, long var5);
-
-
-
- //从cancellledKeys集合中取出注销的SelectionKey,执行注销操作。
- //将处理后的SelectionKey从cancelledKeys集合中移除。执行processDeregisterQueue()
- //后cancelledKeys集合会为空
- void processDeregisterQueue() throws IOException {
- // Precondition: Synchronized on this, keys, and selectedKeys
- Set
cks = cancelledKeys(); - synchronized (cks) {
- if (!cks.isEmpty()) {
- Iterator
i = cks.iterator(); - while (i.hasNext()) {
- SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
- try {
- implDereg(ski);
- } catch (SocketException se) {
- throw new IOException("Error deregistering key", se);
- } finally {
- i.remove();
- }
- }
- }
- }
- }
-
-
-
- //执行完该方法后,注销的SelectionKey就不会出现在keys、selectedKeys以及cancelledKeys这三个集合中
- protected void implDereg(SelectionKeyImpl ski) throws IOException {
- assert (ski.getIndex() >= 0);
- SelChImpl ch = ski.channel;
- int fd = ch.getFDVal();
-
- // 将已经注销的selectionKey从fdToKey(文件描述符与SelectionKeyImpl的映射表)中移除
- fdToKey.remove(Integer.valueOf(fd));
-
- // 将selectionKey所代表的channel的文件描述符从EPollArrayWrapper中移除
- pollWrapper.remove(fd);
- ski.setIndex(-1);
-
- //将selectionKey从keys集合中移除
- keys.remove(ski);
- selectedKeys.remove(ski);
- deregister((AbstractSelectionKey)ski);
- SelectableChannel selch = ski.channel();
-
- // 如果对应的频道已经关闭并且没有注册其他的选择了,则将该信道关闭
- if (!selch.isOpen() && !selch.isRegistered())
- ((SelChImpl)selch).kill();
- }
-
-
-
- private int updateSelectedKeys() {
- //更新了的keys的个数,或在说是产生的事件的个数
- int entries = pollWrapper.updated;
- int numKeysUpdated = 0;
- for (int i=0; i
- //对应的channel的fd
- int nextFD = pollWrapper.getDescriptor(i);
- //通过fd找到对应的SelectionKey
- SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
- if (ski != null) {
- int rOps = pollWrapper.getEventOps(i);
- //更新selectedKey变量,并通知响应的channel来做响应的处理
- if (selectedKeys.contains(ski)) {
- if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
- numKeysUpdated++;
- }
- } else {
- ski.channel.translateAndSetReadyOps(rOps, ski);
- if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
- selectedKeys.add(ski);
- numKeysUpdated++;
- }
- }
- }
- }
- return numKeysUpdated;
- }
四、通道Channel
Channel是一个通道,它就像自来水管一样,网络数据通过Channel读取和写入。通 道 与 流 的 不 同 之 处 在 于 通 道 是 双 向 的,流只是在一个方向上移 动( 一 个 流 必 须 是 Inputstream或者OutpulStream的子类),而通道可以用于读、写或者一者同时进行。因为Channel是全双工的,所以它可以比流更好地映射底层操作系统的API。特别是在UNIX网络编程模型中,底层操作系统的通道都是全双工的,同时支持读写操作。
五、多路复用器Selector
我们将探索多路夏用器Selector,它是Java NIO编程的基础,熟练地掌握 Selector对于NIO编程至关重要。多路复用器提供选择已经就绪的任务的能力。简单来讲, Selector会不断地轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件, 这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的IO操作。 一个多路复用器Selector可以同时轮询多个Channel,由JDK使用了epoll。代替传 统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一 个线程负责Selector的轮询,就可以接入成千上万的客户端,这确实是个非常巨大的进步。
六、nio读写流程
1、nio读写流程
1、selector.select()循环获取已经可读的SelectionKey
2、迭代SelectionKey获取SocketChannel通道
3、判断通道是否准备就绪
4、开始读写
5、抛出个问题,SelectionKey list如何和内核映射?我调试没有调试出结果
2、获取linux内核select状态
1、这篇文章有介绍kevent函数使用,其实就是linux的select模型
2、什么是linux文件描述符
3、kevent依赖文件描述符的就绪状态,这篇文章介绍文件描述符的就绪条件