Java I/O涉及的内部包括三部分:
java本身提供的IO操作库、进行IO操作涉及的计算机操作系统的原理 、进行网络I\O操作涉及的计算机网络知识
。先有知识储备,对IO的操作才会有更好的理解,当然知识怎么用,可以扩展一下它的底层原因。
TCP三次握手过程
- 第一次握手:主机A通过向主机B 发送一个含有同步序列号的标志位的数据段给主机B,向主机B 请求建立连接,通过这个数据段, 主机A告诉主机B 两件事:我想要和你通信;你可以用哪个序列号作为起始数据段来回应我。
- 第二次握手:主机B 收到主机A的请求后,用一个带有确认应答(ACK)和同步序列号(SYN)标志位的数据段响应主机A,也告诉主机A两件事:我已经收到你的请求了,你可以传输数据了;你要用那个序列号作为起始数据段来回应我
- 第三次握手:主机A收到这个数据段后,再发送一个确认应答,确认已收到主机B 的数据段:"我已收到回复,我现在要开始传输实际数据了,这样3次握手就完成了,主机A和主机B 就可以传输数据了。
TCP建立连接要进行3次握手,而断开连接要进行4次
- 第一次: 当主机A完成数据传输后,将控制位FIN置1,提出停止TCP连接的请求 ;
- 第二次: 主机B收到FIN后对其作出响应,确认这一方向上的TCP连接将关闭,将ACK置1;
- 第三次: 由B 端再提出反方向的关闭请求,将FIN置1 ;
- 第四次: 主机A对主机B的请求进行确认,将ACK置1,双方向的关闭结束.。
简单的说操作系统分为两部分:
内核态和用户态
,用户态执行用户的程序,内核态执行用户程序执行过程中涉及对操作系统资源访问的操作(比如:I/O访问数据资源,进程切换等)
我们所说的 socket 编程,是站在传输层的基础上,所以可以使用 TCP/UDP 协议,可以在这个协议的基础上自定义一些应用层协议比如RPC
NIO是一种非阻塞式的IO,核心组件是
缓冲区
,通道
,selector
缓冲区
是用来保存要进行读写的数据的
通道
是用来传输缓冲区数据的,需要把缓冲区数据放到channel里
selector
是通道的注册中心,用来注册通道,然后监听通道的事件的。
selector是非阻塞IO的核心,通过一个注册中心对事件的监听获得发生指定事件的所有通道,然后对这些通道进行处理,完成了高并发的事件处理,而且还是单线程的
服务端
public class NIOServer {
public static void main(String[] args) throws Exception{
//创建ServerSocketChannel -> ServerSocket
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//得到一个Selecor对象
Selector selector = Selector.open();
//绑定一个端口6666, 在服务器端监听
serverSocketChannel.socket().bind(new InetSocketAddress(6666));
//设置为非阻塞
serverSocketChannel.configureBlocking(false);
//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1
//循环等待客户端连接
while (true) {
//这里我们等待1秒,如果没有事件发生, 返回
if(selector.select(1000) == 0) { //没有事件发生
System.out.println("服务器等待了1秒,无连接");
continue;
}
//如果返回的>0, 就获取到相关的 selectionKey集合
//1.如果返回的>0, 表示已经获取到关注的事件
//2. selector.selectedKeys() 返回关注事件的集合
// 通过 selectionKeys 反向获取通道
Set<SelectionKey> selectionKeys = selector.selectedKeys();
System.out.println("selectionKeys 数量 = " + selectionKeys.size());
//遍历 Set, 使用迭代器遍历
Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
while (keyIterator.hasNext()) {
//获取到SelectionKey
SelectionKey key = keyIterator.next();
//根据key 对应的通道发生的事件做相应处理
if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接
//该该客户端生成一个 SocketChannel
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());
//将 SocketChannel 设置为非阻塞
socketChannel.configureBlocking(false);
//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel
//关联一个Buffer
socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..
}
if(key.isReadable()) { //发生 OP_READ
//通过key 反向获取到对应channel
SocketChannel channel = (SocketChannel)key.channel();
//获取到该channel关联的buffer
ByteBuffer buffer = (ByteBuffer)key.attachment();
channel.read(buffer);
System.out.println("form 客户端 " + new String(buffer.array()));
}
//手动从集合中移动当前的selectionKey, 防止重复操作
keyIterator.remove();
}
}
}
}
客户端
public class NIOClient {
public static void main(String[] args) throws Exception{
//得到一个网络通道
SocketChannel socketChannel = SocketChannel.open();
//设置非阻塞
socketChannel.configureBlocking(false);
//提供服务器端的ip 和 端口
InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
//连接服务器
if (!socketChannel.connect(inetSocketAddress)) {
while (!socketChannel.finishConnect()) {
System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");
}
}
//...如果连接成功,就发送数据
String str = "hello, 尚硅谷~";
//Wraps a byte array into a buffer
ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
//发送数据,将 buffer 数据写入 channel
socketChannel.write(buffer);
System.in.read();
}
}
selector功能是把对象注册进来,然后对对象的事件进行监听。底层实现是调用了系统的select,poll.epoll对象来完成的。
netty的服务端框架结构图如下:可看出来BossGroup和WorkerGroup基本结构是一样的都是一个NioEventGroup,名不一样罢了
1
为社么声明两个一样的Group
,因为服务端采用了主从多线程的架构,Boss线程负责接收连接过来的socket对象,这里称之为
NioSocketChannel,这个channel随后会被注册到Woker的一个线程的selector中,在那个线程里完成对这个channel的事件监听处理。
2
的结构
group内部放置了两个重要的对象selector
和TaskQueue
和一个重要的方法NiOevenLoop
selector用注册需要被监听的channel对象
TaskQueue
用来保存一些耗时操作的线程对象,等待启动一个线程单独处理
NiOevenLoop
事件循环,就是不停的循环,获得事件触发的对象
,处理这些对象
,runalltask
,处理那些在对象处理过程中加载到Task队列的线程’
ChannelHandler
用来给用户自定义处理事件激活后的channel对象的业务逻辑的
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。Netty的channel是对NIOchannel的扩展
ChannelHandler分为两类:ChannelInboundHandler
和ChannelOutboundHandler
,其中in的表示接收数据进行处理,也就是读数据,而out就是发送数据进行处理。in的处理过程是从头到尾,而out是从尾部到头部,因此添加自定义handler的时候要注意顺序
每个ChannelHandler
都会和一个ChannelHandlerContext
关联,也就是实现ChannelHandler接口的所有方法里会有ChannelHandlerContext参数,这个参数的重要性在于可以获得当前channel对象,调用channel对象的相关方法
。
另外服务端还有一个引导启动类
ServerBootstrap
,用来把定义的boss.work和handler组装起来完成一个服务端对象构建。
启动类的启动是一个异步操作bind(port).sync()
,因此会立马返回结果,在Netty中异步返回的结果被定义为了ChannelFuture
在返回的ChannelFuture基础上定义一些操作就是所谓的回调操作比如:channelFuture.channel().closeFuture().sync();
public class NettyServer {
public static void main(String[] args) throws Exception {
//创建BossGroup 和 WorkerGroup
//说明
//1. 创建两个线程组 bossGroup 和 workerGroup
//2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
//3. 两个都是无限循环
//4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
// 默认实际 cpu核数 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
try {
//创建服务器端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
//使用链式编程来进行设置
bootstrap.group(bossGroup, workerGroup) //设置两个线程组
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
// .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
//给pipeline 设置处理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
System.out.println(".....服务器 is ready...");
//绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
//启动服务器(并绑定端口)
ChannelFuture cf = bootstrap.bind(6668).sync();
//给cf 注册监听器,监控我们关心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
} else {
System.out.println("监听端口 6668 失败");
}
}
});
//对关闭通道进行监听
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
客户端就需要定义一个boss处理所有的IO操作即可,因为客户端不需要接收多个channel对象,只需要对自己连接产生是channel对象进行处理即可,它的启动类
Bootstrap
,会自动给selector配置接收channel对象的IO操作的事件,(服务端也是自动给boss配置aceept事件,worker配置IO事件)
public class NettyClient {
public static void main(String[] args) throws Exception {
//客户端需要一个事件循环组
EventLoopGroup group = new NioEventLoopGroup();
try {
//创建客户端启动对象
//注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//设置相关参数
bootstrap.group(group) //设置线程组
.channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
}
});
System.out.println("客户端 ok..");
//启动客户端去连接服务器端
//关于 ChannelFuture 要分析,涉及到netty的异步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//给关闭通道进行监听
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
总结:netty的重要类
- 重写了NIO的
ByteBuffer
和Channel
以及它的实现(NioServerSocketChannel,NioSocketChannel,SocketChannel)Bootstrap
和ServerBootstrap
启动类EventLoopGroup
事件循环组类,用来持续获得selector得到的监听对象,然后进行处理的ChannelPipeline
一个处理对象的管道,用来集中管理处理过程(处理过程包括自定义handler,编解码处理器,心跳处理器
)ChannelInboundHandler
和ChannelOutboundHandler
自定义处理过程ChannelHandlerContext
获得当前channel的对象ChannelFuture
,Channel操作的返回结果,用来进行回调操作schedule
待执行的任务队列,存在于eventLoop里。
netty的执行流程
1.server启动,netty从parentGroup中选出一个NioEventLoop对指定的port进行监听
2.client启动,netty从eventLoopGroup中选出一个NioEventLoop连接server,并处理server发来的数据(客户端)
3.client连接指定server的port,并创建channel
4.netty从childGroup中选出一个NioEventLoop与该channel绑定,用于处理该channel中的所有操作
5.client通过channel发送数据包
6.pipeline中的处理器依次对channel中的数据包进行处理
7.server如果需要向client发送数据,则需要将数据通过pipeline中的处理器处理形成ByteBuf数据包
8.server通过channel向client发送数据包
9.pipeline中的处理器依次对channel中的数据包进行处理(客户端)
数据拷贝在操作系统层面上看,就是CPU从IO设备上读取数据给用户的过程,这个过程发生了如下几步的演变:
初期,由CPU单独完成整个工作,通过磁盘进行数据拷贝,磁盘拷贝到内核态的内存中(内核态内存和用户态内存分离的),通知CPU然后拷贝到用户的内存中进行操作,操作完成后需要通知CPU将数据保存起来或发送出去,这时候就反向执行这个过程,CPU复制数据到内核态,内核态进行数据保存或者发送
如果什么都给CPU做,CPU会很累,上面的方式CPU要完成数据读写到内核态,然后从内核态读写到用户态,导致程序的执行效率低
因此产生了DMA模块,CPU中分离出一个小弟,单独负责数据I/O处理,CPU只需发送命令给DMA,DMA就会进行数据读写,读写完成后通知cpu,CPU就把数据复制到用户内存就可以了
上面操作DMA帮助分担了一部分数据拷贝工作,但是CPU还是需要来会把数据复制给用户,十分的消耗资源,
因此提出了MMAP(内存映射的概念),使得用户通过一个虚拟的内存映射直接可以操作内核缓冲区数据,这样就不用把数据再拷贝到用户内存了
.> 上述过程CPU不需要来回从内核和用户两种状态之间复制数据了,但是写数据的时候,仍然需要CPU在内核中拷贝数据到一个新的区域,用来保存修改后的数据,因此只减少了一次CPU拷贝
为了再次减少CPU的复制操作,产生了SendFile技术,只将文件描述符发送到新的缓冲区,便于表示数据的来源,方便下次数据读取,而数据直接使用DMA发送到IO设备,
(每个进程,当打开一个文件后,内核会为其建立一个打开文件的数组 (数组的前三个为stdin,stdout,stderr),然后返回打开文件位于数组的索引值(下标),该所以只即为文件描述符,只要文件不关闭,用户便可以根据该描述符对文件进行访问和操作。)
总结:所谓的零拷贝指定是CPU的拷贝次数为0.
sendfile+DMA技术可以实现真正的零拷贝
零拷贝的优化过程经历了从两次拷贝到mmap的依次拷贝,然后到sendfile的0次拷贝。mmap适合小规模数据拷贝,sendfile适合大规模数据的拷贝
tcp的粘包拆包问题:TCP在数据传输的过程中因为TCP是面向流,没有边界,而操作系统在发送TCP数据时,会通过缓冲区来进行优化,导致每次传输的数据可能是完整的,残缺的或者多的。
虽然这样获得的流对象是不完整的没法立马进行解析
UDP则是面向消息传输的,是有保护消息边界的,接收方一次只接受一条独立的信息,所以不存在粘包问题。
解决办法:
- Netty中
定义一个新的类,对传输数据进行封装,给这个类设置字符流长度属性
。 自定义编解码器,在编码解码的过程中获得一个整形这个整形肯定是字符长度,根据这个长度从缓冲数据里获取数据,保证读取的对象是一个完整可解析对象(这种情况要保证缓冲区够长,至少能方下一个完整的对象,所以得设置一下缓冲区大小,设置代码如下
。)
.option(ChannelOption.SO_RCVBUF, 1024*4)
.option(ChannelOption.RCVBUF_ALLOCATOR,new FixedRecvByteBufAllocator(4096))
//FixedRecvByteBufAllocator也可以换为AdaptiveRecvByteBufAllocator
管道里添加HTTP编解码器
public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//向管道加入处理器
//得到管道
ChannelPipeline pipeline = ch.pipeline();
//加入一个netty 提供的httpServerCodec codec =>[coder - decoder]
//HttpServerCodec 说明
//1. HttpServerCodec 是netty 提供的处理http的 编-解码器
pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
//2. 增加一个自定义的handler
pipeline.addLast("MyTestHttpServerHandler", new TestHttpServerHandler());
System.out.println("ok~~~~");
}
}
配置websocket相关的编解码器
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//因为基于http协议,使用http的编码和解码器
pipeline.addLast(new HttpServerCodec());
//是以块方式写,添加ChunkedWriteHandler处理器
pipeline.addLast(new ChunkedWriteHandler());
/*
说明
1. http数据在传输过程中是分段, HttpObjectAggregator ,就是可以将多个段聚合
2. 这就就是为什么,当浏览器发送大量数据时,就会发出多次http请求
*/
pipeline.addLast(new HttpObjectAggregator(8192));
/*
说明
1. 对应websocket ,它的数据是以 帧(frame) 形式传递
2. 可以看到WebSocketFrame 下面有六个子类
3. 浏览器请求时 ws://localhost:7000/hello 表示请求的uri
4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 保持长连接
5. 是通过一个 状态码 101
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/hello2"));
//自定义的handler ,处理业务逻辑
pipeline.addLast(new MyTextWebSocketFrameHandler());
}
});
RPC远程调用,就是通过
动态代理的方法,直接代理接口生成一个代理对象
,在代理对象内部对方法进行扩展,扩展的内容就是连接服务器,发送请求调用的函数名。(服务端会接收到名字后完成调用返回结果)
检测远程端是否存活,或者活跃
添加心跳处理器
public class MyServer {
public static void main(String[] args) throws Exception{
//创建两个线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.handler(new LoggingHandler(LogLevel.INFO));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//加入一个netty 提供 IdleStateHandler
/*
说明
1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
5. 文档说明
triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
* read, write, or both operation for a while.
* 6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理
* 通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
*/
pipeline.addLast(new IdleStateHandler(7000,7000,10, TimeUnit.SECONDS));
//加入一个对空闲检测进一步处理的handler(自定义)
pipeline.addLast(new MyServerHandler());
}
});
//启动服务器
ChannelFuture channelFuture = serverBootstrap.bind(7000).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
进一步处理心跳事件的检查结果
public class MyServerHandler extends ChannelInboundHandlerAdapter {
/**
*
* @param ctx 上下文
* @param evt 事件
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent) {
//将 evt 向下转型 IdleStateEvent
IdleStateEvent event = (IdleStateEvent) evt;
String eventType = null;
switch (event.state()) {
case READER_IDLE:
eventType = "读空闲";
break;
case WRITER_IDLE:
eventType = "写空闲";
break;
case ALL_IDLE:
eventType = "读写空闲";
break;
}
System.out.println(ctx.channel().remoteAddress() + "--超时时间--" + eventType);
System.out.println("服务器做相应处理..");
//如果发生空闲,我们关闭通道
// ctx.channel().close();
}
}
}