流程
1、首先初始化ServerSocketChannel 并将建立连接的事件Accept,注册到BoosGroup的一个事件循环的Selector上
2、接着事件循环就会轮询Channel上的建立连接事件
3、一个客户端 发来建立连接请求后,Seletor通过轮询可以发现此请求,并通过processSeleterKeys 处理处理连接请求
4、怎么处理连接请求呢?首先是为这个连接分配一个SocketChannel,并将这个Channel的读写事件注册到一个WorkerGroup的事件循环的selector上,这时连接就建立好了,并且WorkerGroup会轮询SocketChannel的读写事件。
5、当这个客户端再发送消息时,事件循环会轮询到写事件,并通过processSeleterKeys处理消息
6、processSeleterKeys通过刚刚讲的数据处理链过ChannelPipline来进行处理,可能包含先解码、再进行业务处理,再编码,再发送到SocketChannel中。
以上是服务端的具体流程,客户端也会建立一个Channel ,也有一个Seletor轮询IO事件,当消息到达时,也可以通过客户端的ChannelPipline进行处理。
Java IO的各种流是阻塞的!!
字节流 inputsteam outputstream fileInput bufferedinput
转换流 inputsteamReader outputstreamWriter
字符流 reader writer fileReader bufferReader
inputsteam每次读取多个字节就要访问一次硬盘
buffer一次性从底层输入流中读取多个字节来填充byte数组,当程序读取一个或多个字节时,可直接从byte数组中获取,当内存中的byte读取完后,会再次用底层输入流填充缓冲区数组。这种从直接内存中读取数据的方式要比每次都访问磁盘的效率高很多。!!!
socket tcp udp
服务端 serverSocket = new ServerSocket(port);来监听一个指定的端口
Socket server = serverSocket.accept(); 输入输出 accept()方法从队列中取出连接请求 阻塞b
socket bind listen accept read/write connect read/write
bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen) listen(int sockfd, int backlog); backlog最大连接个数。
客户端socket连接到服务器并发送一个请求,然后等待一个响应
Socket client = new Socket(serverName, port); client.getOutputStream() client.getInputStream() 输入输出
DatagramSocket socket = new DatagramSocket(PORT);//一个对象,代表一个Socket
DatagramPacket receivePacket = new DatagramPacket(receiveBuffer, receiveBuffer.length);
socket.receive(receivePacket);
客户端
DatagramSocket socket = new DatagramSocket(LOCAL_PORT);
socket.send(DatagramPacket);
同步阻塞的BIO、同步非阻塞的NIO、异步非阻塞的AIO
一旦有高并发的大量请求 线程池也挡不住
NIO 的非阻塞模式 面向缓冲区,无数据 可以做其他事情!!! 同一个连接并发处理多个请求
BIO 以流的方式处理数据,而 NIO以块(缓存)的方式处理数据,块 I/O 的效率比流 I/O 高很多 BIO是阻塞的,NIO则是非阻塞的。而NIO基于 Channel(通道)和 Buffer(缓冲区)
每个channel 都会对应一个Buffer Selector 对应一个线程, 一个线程对应多个channel(连接)。 thread->Selector!!->多个channel->buffer
通道channel 是双向的, NIO的Buffer 是可以读也可以写, 需要 flip 方法切换 BIO 中要么是输入流,或者是输出流,不能双向
buffer缓冲区本质上是一个可以读写数据的内存块 容量 - Capacity 写的位置 - Position 位置 - limit
clear方法清空缓冲区;compact方法只会清空已读取的数据
异步非阻塞无需一个线程去轮询所有IO操作的状态改变,在相应的状态改变后,系统会通知对应的线程来处理!!! 重操作)的架构,比如相册服务器
原生很难写 ServerSocketChannel.open().bind SocketChannel buffer各种基础类型 intbuffer ss.configureBlocking(false)默认阻塞 Selector
netty
IO复用模型有三种:select,poll,epoll 本质上都是同步I/O!!
select==>时间复杂度O(n) 无差别轮询所有流 poll==>时间复杂度O(n) 大连接数的限制,原因是它是基于链表来存储的.
epoll==>时间复杂度O(1) epoll会把哪个流发生了怎样的I/O事件通知我们
Java NIO Epoll 会导致 Selector 空轮询,最终导致 CPU 100%
while(true)循环,循环往复,不断的轮询,直到linux系统出现100%的CPU情况 select方法就应该是阻塞的,没有key事件过来,那么就不应该返回!!! 结果没有返回0
netty解决 每完成一次空的select操作进行一次计数; 若在某个周期内连续发生N次空轮询,则触发了epoll死循环bug, netty默认是512次 新建Selector替换问题Selector
Reactor 单线程 多线程(接受请求一个,处理问题多个) 主从多线程(接受请求多个,处理问题多个)
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
粘包:包大小<缓存 缓存存多个包,读取多个包 半包:包大小>缓存 需要分包MTU(最大传输单元) 多次传输
处理 1.封装成帧,固定长,浪费空间 2.分隔符进行切分 3.固定长度字段存个内容的长度信息 4.利用特定的数据格式 json xml 边界清楚
client ->mainReactor-> accept->subReactor(加入队列监听)->hander监听各种事件->worker线程池->send返回
两组线程池:BossGroup专门负责和客户端建立连接 和 WorkerGroup负责处理连接上的读写
client->BossGroup-> BossNioEventLoop(处理连接accept,注册channel到work)->WorkerGroup->WorkerNioEventLoop(处理read write)->pipeline
每个事件循环就是一个 NioEventLoop ,每个NioEventLoop都包含一个Selector,用于监听注册在其上的 Socket 网络连接(Channel)。
零拷贝技术 数据在内存中的拷贝次数为0次 正常的是2次,磁盘->内核缓存区->应用程序内存->socket缓冲区
将内核缓冲区 与 应用程序内存 和Socket缓冲区建立了地址映射,这样数据在内存中的拷贝次数就是0次,减少了拷贝次数,可以大幅提升IO性能。
同步非阻塞 IO多路复用 thread->selector->轮询->多个channel->buffer->client
一个事件循环内维护了一个多路复用器,selector,和一个任务队列taskQueue。
Bootstrap 引导类Netty应用由一个Bootstrap开始,主要是用来配置整个 Netty 程序、设置业务处理类(Handler)、绑定端口、发起连接等
ChannelFuture对象作为 异步操作结果的占位符 可确定异步执行的结果
netty demo
EventLoopGroup group = new NioEventLoopGroup();
try {
ServerBootstrap sb = new ServerBootstrap();
sb.group(group) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定使用的channel
.localAddress(this.port)// 绑定监听端口
.childHandler(new ChannelInitializer() { // 绑定客户端连接时候触发操作
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("connected...; Client:" + ch.remoteAddress());
// ByteBuf byteBuf= Unpooled.copiedBuffer(“$”.getBytes());//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
// ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));//防止粘包处理在消息末尾使用换行符对消息进行分割,或者使用其他特殊字符来对消息进行分割;
ch.pipeline().addLast(new EchoServerHandler()); // 客户端触发操作
}
});
ChannelFuture cf = sb.bind().sync(); // 服务器异步创建绑定
System.out.println(NettyServerHandler.class + " started and listen on " + cf.channel().localAddress());
cf.channel().closeFuture().sync(); // 关闭服务器通道
} finally {
group.shutdownGracefully().sync(); // 释放线程池资源
}
}
利用ByteBuf作为缓冲区 利用Channel进行读写都要经过缓冲区
写索引 读索引 当writerIndex==readerIndex时 :代表无数据可以读
线程数Netty 默认是 CPU 处理器数的两倍,bind 完之后启动
断线重连测试 当客户端发现无法连接到服务器端,所以一直尝试重连 时间超过阈值(60s)
实现心跳机制 户端和服务器之间定期发送的一种特殊的数据包