Java NIO 最重要的三大组件:Buffer、Channel、Selector
I/O传输数据存储的地方。
重要属性:
重要方法:
6. allocate:创建缓冲区
7. put:数据写入缓冲区
8. flip:写入模式转为读取模式
9. get:从缓冲区获取数据
10. rewind:重复读取缓冲区数据
11. mark() 和 reset():标记&恢复position
12. clear:清空缓冲区,缓冲区转为写入模式
13. compact:压缩缓冲区,缓冲区转为写入模式,未读数据往缓冲区最前放
一个连接就是一个通道,从更广泛的层面来说,一个通道可以表示一个底层的文件描述符,例如硬件设备、文件、网络连接等。
通道分类:
选择器完成I/O的多路复用,一个通道就是一个连接,通道可注册在选择器上(需指定选择器要监控的I/O事件),通过选择器可以同时监听多个通道的I/O(输入输出)状态。
通道I/O事件:
SelectedKey选择键:
SelectionKey选择键就是那些被选择器选中的IO事件。
选择键的功能是很强大的。通过SelectionKey选择键,不仅仅可以获得通道的IO事件类型,比方说SelectionKey.OP_READ;还可以获得发生IO事件所在的通道;另外,也可以获得选出选择键的选择器实例。
在Java的OIO编程中,最初和最原始的网络服务器程序,是用一个while循环,不断地监听端口是否有新的连接。如果有,那么就调用一个处理函数来处理。
问题:如果前一个网络连接的handle(socket)没有处理完,那么后面的连接请求没法被接收,于是后面的请求通通会被阻塞住,服务器的吞吐量就太低了。对于服务器来说,这是一个严重的问题。
为了解决这个严重的连接阻塞问题,出现了一个极为经典模式:Connection Per Thread(一个线程处理一个连接)模式。
对于每一个新的网络连接都分配给一个线程。每个线程都独自处理自己负责的输入和输出。当然,服务器的监听线程也是独立的,任何的socket连接的输入和输出处理,不会阻塞到后面新socket连接的监听和建立。早期版本的Tomcat服务器,就是这样实现的。
优点:新连接不会被I/O操作而阻塞住,提升服务器的吞吐量
缺点:一个连接一个线程,当并发量起来的时候,需要耗费大量的线程资源,并且会出现频繁的线程创建、销毁、切换等操作。
两个重要组件:Reactor反应器、Handler处理器
Reactor反应器:负责查询IO事件,当检测到一个IO事件,将其发送给相应的Handler处理器去处理。这里的IO事件,就是NIO中选择器监控的通道IO事件。
Handler处理器:与IO事件(或者选择键)绑定,负责IO事件的处理。完成真正的连接建立、通道的读取、处理业务逻辑、负责将结果写出到通道等。
需要结合attach和attachment两个方法结合使用:
Handler处理器多线程升级:将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听受到阻塞。
Reactor反应器多线程升级:如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。
join方法的应用场景:A线程调用B线程的join方法,等待B线程执行完成;在B线程没有完成前,A线程阻塞。
问题:被合并的线程没有返回值。形象地说,join线程合并就是一像一个闷葫芦。只能发起合并线程,不能取到执行结果。
Callable接口:业务处理的方法命名为call,call方法有返回值。
Callable接口与Runnable接口相比,还有一个很大的不同:Callable接口的实例不能作为Thread线程实例的target来使用;而Runnable接口实例可以作为Thread线程实例的target构造参数,开启一个Thread线程。
FutureTask类代表一个未来执行的任务,表示新线程所执行的操作。FutureTask类也位于java.util.concurrent包中。FutureTask类的构造函数的参数为Callable类型,实际上是对Callable类型的二次封装,可以执行Callable的call方法。FutureTask类间接地继承了Runnable接口,从而可以作为Thread实例的target执行目标。
在Java语言中,将FutureTask类的一系列操作,抽象出来作为一个重要的接口——Future接口。当然,FutureTask类也实现了此接口。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
总体来说,FutureTask类首先是一个搭桥类的角色,FutureTask类能当作Thread线程去执行目标target,被异步执行;其次,如果要获取异步执行的结果,需要通过FutureTask类的方法去获取,在FutureTask类的内部,会将Callable的call方法的真正结果保存起来,以供外部获取。
问题:FutureTask 虽然比 join 方法要高明一点,可以获取到异步线程的执行结果,但是获取结果的get()方法也是一个阻塞方法。
上面提到,JDK提供的FutureTask虽然支持获取线程执行的结果,但是获取结果的get()还是一个阻塞方法。
Guava为了实现非阻塞获取异步线程的结果,对Java的异步回调机制进行了增强:
FutureCallback是一个新增的接口,用来填写异步任务执行完后的监听逻辑。
FutureCallback拥有两个回调方法:
(1)onSuccess方法,在异步任务执行成功后被回调;调用时,异步任务的执行结果,作为onSuccess方法的参数被传入。
(2)onFailure方法,在异步任务执行过程中,抛出异常时被回调;调用时,异步任务所抛出的异常,作为onFailure方法的参数被传入。
Guava引入了一个新接口ListenableFuture,它继承了Java的Future接口,增强了监控的能力。
ListenableFuture仅仅增加了一个方法——addListener方法。它的作用就是将前一小节的FutureCallback善后回调工作,封装成一个内部的Runnable异步回调任务,在Callable异步任务完成后,回调FutureCallback进行善后处理。
在实际编程中,如何将FutureCallback回调逻辑绑定到异步的ListenableFuture任务呢?
可以使用Guava的Futures工具类,它有一个addCallback静态方法,可以将FutureCallback的回调实例绑定到ListenableFuture异步任务。
其实Futures.addCallback 方法的原理也很简单,就是给ListenableFuture异步任务添加了一个CallbackListener,它实现了 Runnable 接口,里面的实现就是在死循环中,调用Future的get()方法来获取异步线程执行结果。获取到结果后,如果是有异常,则调用FutureCallback的onFailur方法,否则调用onSuccess方法。
Netty官方文档中指出Netty的网络操作都是异步的。在Netty源代码中,大量使用了异步回调处理模式。在Netty的业务开发层面,Netty应用的Handler处理器中的业务处理代码,也都是异步执行的。
Netty对JavaFuture异步任务的扩展如下:
Netty中的反应器有多个实现类,与Channel通道类有关系。而我们常用的TCP通信对应的反应器是NioEventLoop。
NioEventLoop类绑定了两个重要的Java成员属性:一个是Thread线程类的成员,一个是Java NIO选择器的成员属性。
NioEventLoop和前面讲到反应器,在思路上是一致的:一个NioEventLoop拥有一个Thread线程,负责一个Java NIO Selector选择器的IO事件轮询。
在Netty中,一个EventLoop相当于一个子反应器(SubReactor)。大家已经知道,一个NioEventLoop子反应器拥有了一个线程,同时拥有一个Java NIO选择器。Netty如何组织外层的反应器呢?答案是使用EventLoopGroup线程组。多个EventLoop线程组成一个EventLoopGroup线程组。
反过来说,Netty的EventLoopGroup线程组就是一个多线程版本的反应器。而其中的单个EventLoop线程对应于一个子反应器(SubReactor)。
默认的EventLoopGroup内部线程数为最大可用的CPU处理器数量的2倍。
在服务器端,一般有两个独立的反应器,一个反应器负责新连接的监听和接受,另一个反应器负责IO事件处理。对应到Netty服务器程序中,则是设置两个EventLoopGroup线程组,一个EventLoopGroup负责新连接的监听和接受,一个EventLoopGroup负责IO事件处理。
负责新连接的监听和接受的EventLoopGroup线程组,查询父通道(NioServerSocketChannel)的IO事件,称为Boss线程组。负责查询所有子通道(NioSocketChannel)的IO事件,并且执行Handler处理器中的业务逻辑,称为Worker线程组。
Netty中的每一种协议的通道,都有NIO(异步IO)和OIO(阻塞式IO)两个版本。
通道类型:
· NioSocketChannel:异步非阻塞TCP Socket传输通道。
· NioServerSocketChannel:异步非阻塞TCP Socket服务器端监听通道。
· NioDatagramChannel:异步非阻塞的UDP传输通道。
· NioSctpChannel:异步非阻塞Sctp传输通道。
· NioSctpServerChannel:异步非阻塞Sctp服务器端监听通道。
· OioSocketChannel:同步阻塞式TCP Socket传输通道。
· OioServerSocketChannel:同步阻塞式TCP Socket服务器端监听通道。
· OioDatagramChannel:同步阻塞式UDP传输通道。
· OioSctpChannel:同步阻塞式Sctp传输通道。
· OioSctpServerChannel:同步阻塞式Sctp服务器端监听通道。
TCP相关Chanel通道有四个,但是我们既然用到了Netty,必定是想利用它来简化Java NIO的编码。所以我们一般只使用 NioServerSocketChannel 和 NioSocketChannel。
原理:在Netty的NioSocketChannel内部封装了一个Java NIO的SelectableChannel成员。通过这个内部的Java NIO通道,Netty的NioSocketChannel通道上的IO操作,最终会落地到Java NIO的SelectableChannel底层通道。
理论上,操作系统底层的socket描述符分为两类:
在Netty中,异步非阻塞的服务器端监听通道NioServerSocketChannel,封装在Linux底层的描述符,是“连接监听类型”socket描述符,被称为父通道;而NioSocketChannel异步非阻塞TCP Socket传输通道,封装在底层Linux的描述符,是“数据传输类型”的socket描述符,被称为子通道。
Netty的Handler处理器分为两大类:第一类是ChannelInboundHandler通道入站处理器;第二类是ChannelOutboundHandler通道出站处理器。二者都继承了ChannelHandler处理器接口。
ChannelInboundHandler的默认实现为ChannelInboundHandlerAdapter,叫作通道入站处理适配器。ChanneOutboundHandler的默认实现为ChanneloutBoundHandlerAdapter,叫作通道出站处理适配器。
这两个默认的通道处理适配器,分别实现了入站操作和出站操作的基本功能。如果要实现自己的业务处理器,不需要从零开始去实现处理器的接口,只需要继承通道处理适配器即可。
和上面利用Java NIO实现的Reactor反应器模式不一样,上面的实现一个通道只绑定了一个Handler业务处理器。
在Netty中,Channel通道和Handler处理器实例是多对多的关系:一个通道的I/O事件可以被多个Handler实例处理。当然了,一个Handler处理器实例也可以被绑定多多个通道中,处理多个通道的I/O事件。
上面我们知道Channel通道和Handler处理器实例是多对多的关系,那他们之间是如何绑定的?
Netty设计了一个特殊的组件,叫作ChannelPipeline(通道流水线),它像一条管道,将绑定到一个通道的多个Handler处理器实例,串在一起,形成一条流水线。ChannelPipeline(通道流水线)的默认实现,实际上被设计成一个双向链表。所有的Handler处理器实例被包装成了双向链表的节点,被加入到了ChannelPipeline(通道流水线)中。
Netty是这样规定的:入站处理器Handler的执行次序,是从前到后;出站处理器Handler的执行次序,是从后到前。
IO事件在流水线上的执行次序,与IO事件的类型是有关系的。
除了流动的方向与IO操作的类型有关之外,流动过程中经过的处理器节点的类型,也是与IO操作的类型有关。入站的IO操作只会且只能从Inbound入站处理器类型的Handler流过;出站的IO操作只会且只能从Outbound出站处理器类型的Handler流过。
上面我们已经介绍了多个Netty中的重要组件,那么我们如何使用这些组件去启动一个服务端和一个客户端呢?
Netty给我们提供了一个非常便利的工厂类,就是ServerBootstrap(用于服务端)和Bootstrap(用于客户端)。
我们都知道,底层网络是以二进制字节报文的形式来传输数据的。在读数据时,我们需要将ByteBuf中的二进制数据转为Java程序可处理的POJO对象;而在写数据时,需要将POJO对象转为底层网络能够传输的二进制数据的ByteBuf缓冲区。
我们可以利用编码器和解码器解决上面的问题:
Encoder编码器:将一个Java类型的数据转换成底层能够传输的二进制ByteBuf缓冲数据。
Decoder解码器:将底层传递过来的二进制ByteBuf缓冲数据转换成Java能够处理的Java POJO对象。
Decoder编码器是一个InBound入站处理器,负责处理入站数据;它能将上一站Inbound入站处理器传过来的输入(Input)数据,进行数据的解码或者格式转换,然后输出(Output)到下一站Inbound入站处理器。
将二进制数据转为POJO对象,我们可以使用Netty提供的 ByteToMessageDecoder,但具体的解码逻辑需要我们自己实现;这个解码器一定要放在Pipeline流水线的第一个,因为Pipeline第一个入站处理器接收到的一定就是装着二进制数据的ByteBuf缓冲区。
如果我们在Pipeline中,需要继续对数据进行解码,例如将一个POJO对象转为另外一个POJO对象,我们可以使用Netty提供的 MessageToMessageDecoder,我们需要指定传入的POJO对象类型。当数据流向这个解码器时,如果传入对象类型不是指定的范型,那么此解码器会直接跳过不进行处理的。
1、固定长度数据包解码器——FixedLengthFrameDecoder
适用场景:每个接收到的数据包的长度,都是固定的,例如100个字节。
在这种场景下,只需要把这个解码器加到流水线中,它会把入站ByteBuf数据包拆分成一个个长度为100的数据包,然后发往下一个channelHandler入站处理器。
2、行分割数据包解码器——LineBasedFrameDecoder
适用场景:每个ByteBuf数据包,使用换行符(或者回车换行符)作为数据包的边界分割符。即 "\n"或“\r\n”
如果每个接收到的数据包,都以换行符/回车换行符作为分隔。在这种场景下,只需要把这个解码器加到流水线中,Netty会使用换行分隔符,把ByteBuf数据包分割成一个一个完整的应用层ByteBuf数据包,再发送到下一站。
3、自定义分隔符数据包解码器——DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder是LineBasedFrameDecoder按照行分割的通用版本。不同之处在于,这个解码器更加灵活,可以自定义分隔符,而不是局限于换行符。如果使用这个解码器,那么接收到的数据包,末尾必须带上对应的分隔符。
4、自定义长度数据包解码器——LengthFieldBasedFrameDecoder
这是一种基于灵活长度的解码器。在ByteBuf数据包中,加了一个长度字段,保存了原始数据包的长度。解码的时候,会按照这个长度进行原始数据包的提取。
Encoder编码器是一个OutBound出站处理器,负责处理出站数据;它将上一站OutBound出站处理器传过来的输入(Input)数据,进行编码或者格式转换,然后传递到下一站ChannelOutBoundHandler出站处理器。
注意:由于最后只有ByteBuf才能写入到通道中去,因此可以肯定通道流水线上装配的第一个编码器一定是把数据编码成了ByteBuf类型。
将一个POJO对象转为二进制数组,使用时我们需要指定传入的POJO对象类型。当数据流向这个编码器时,如果传入对象类型不是指定的范型,那么此编码器会直接跳过不进行处理的。
将一个POJO对象转为另一个POJO对象,使用时我们需要指定传入的POJO对象类型。当数据流向这个编码器时,如果传入对象类型不是指定的范型,那么此编码器会直接跳过不进行处理的。
等于ByteToMessageDecoder+MessageToByteEncoder
等于MessageToMessageDeCoder+MessageToMessageEncoder
前面的编码器和解码器相结合是通过继承完成的。继承的方式有其不足,在于:将编码器和解码器的逻辑强制性地放在同一个类中,在只需要编码或者解码单边操作的流水线上,逻辑上不大合适。
编码器和解码器如果要结合起来,除了继承的方法之外,还可以通过组合的方式实现。与继承相比,组合会带来更大的灵活性:编码器和解码器可以捆绑使用,也可以单独使用。