前言:2022年是我工作中最不稳定的一年,今年也在准备教资考试,以至于忽略了很久的技术学习;我司的游戏架构是基于Netty设计的异步网络通信架构,我之前对Netty的认知仅停留在应用层面,借此机会,连载两篇文章来深入学习下Netty4.0的模型;
第一篇: 本篇主要讲解Netty的线程模型,给一种直观的感受;
第二篇:源码分析,用流程图讲解Netty组件的工作流程;
本文基于Netty4,如有错误请指正
ByteBuffer支持类型化的put和get操作;若操作类型不一致会出现BufferUnderflowException异常;
可以将一个普通的Buffer转换成只读的Buffer;
Nio提供了MappedByteBuffer,可以让文件直接在对外内存中进行修改;
public static void main(String[] args) throws Exception {
RandomAccessFile randomAccessFile = new RandomAccessFile("1.txt", "rw");
FileChannel channel = randomAccessFile.getChannel();
/*
参数1:表示channel的操作,只读还是读写
参数2:操作的起始位置
参数3:操作的大小,如可操作5个字节
*/
MappedByteBuffer map = channel.map(FileChannel.MapMode.READ_WRITE, 0, 5);
map.put(0, (byte)'H');
map.put(3, (byte)'9');
randomAccessFile.close();
}
Nio支持通过多个Buffer(即Buffer数组)完成读写操作;即Scattering和Gathering;意思就是定义Buffer数组,现有API支持,直接写入数组中,若超过数组所有Buffer的容量则报错;
零拷贝:无需CPU拷贝;可以提升性能上的优势,可以更少的切换上下文,更少的CPU缓存等等;
传统IO:数据从硬盘上拷贝到内核缓冲区,再通过CPU拷贝到用户态内存空间,由用户态再拷贝到SocketBuffer中,最后写入网卡中;【4拷贝3切换】
mmap:通过内存映射,将文件映射到内核缓冲区,同时,用户空间可以攻下昂内核空间的数据;这样在进行文件网络传输时,就可以减少内核空间到用户空间的内存拷贝次数;但仍无法避免CPU拷贝;【3拷贝3切换】
sendFile(2.1):Linux2.1版本提供了系统调用函数,数据不经过用户态,直接从内核缓存区进入到SocketBuffer,同时,由于和用户态完全无关,就减少了一次上下文切换;【3拷贝2切换】
sendFile(2.4):Linux2.4版本做了修改,避免从内核缓冲区拷贝到SocketBuffer的操作,直到拷贝到网卡中,从而再减少一次数据拷贝;【3拷贝2切换】
其中mmap适合小数据量的读写,sendFile适合大文件传输;
public static void main(String[] args) throws Exception {
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("baidu.com", 9001));
String fileName = "1.txt";
FileChannel channel = new FileInputStream(fileName).getChannel();
// linux下,调用一次transferTo可以完成传输
// windows下,调用一次transferTo只能发送8mb数据,需要分段传输
channel.transferTo(0, channel.size(), socketChannel);
channel.close();
socketChannel.close();
}
Netty对JDK自带的NIO的API进行了封装,解决了上述问题;
通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动);
服务器端程序处理传入的多个请求并将他们同步分派到相应处理线程;
使用IO多路复用监听事件,当收到事件后就分发给某个线程;
采用select处理客户端的请求事件,然后通过Dispatch分发请求进行处理;如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象,由该handler对象进行后续业务处理;
如果不是建立连接的事件,则由Reactor将事件分发到连接对应的Handler来处理整个业务;
优点:
缺点:
使用场景:
客户端数量有限,业务处理非常快,例如Redis;
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
// 不能将同一个channel注册两次不同类型
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(80));
System.out.println("绑定成功");
Handler handler = new Handler();
// 循环处理selector
while (true) {
// 等待select
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 分派 dispatch
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
handler.doHandle(channel);
} else {
System.out.println("无法处理 channel op"+selectionKey.interestOps());
}
iterator.remove();
}
}
}
// 定义处理Handler
static class Handler {
public void doHandle(SocketChannel channel) throws Exception {
// 分配空间
ByteBuffer allocate = ByteBuffer.allocate(1024);
// 读取数据
channel.read(allocate);
// 业务处理-打印
System.out.println(new String(allocate.array()));
allocate.clear();
allocate.put("Hello".getBytes());
allocate.flip();
// 响应数据
channel.write(allocate);
}
}
采用select处理客户端的请求事件,然后通过Dispatch分发请求进行处理;如果是建立连接请求事件,则由Acceptor通过Accept处理连接请求,然后创建一个Handler对象,由该handler对象进行后续业务处理;
如果不是建立连接的事件,则由Reactor将事件分发到连接对应的Handler来处理,Handler只负责响应事件,具体的业务处理在read读取数据后交给后面的线程池去处理;
优点:可以充分利用CPU多核的处理能力;
缺点:多线程数据共享和访问比较复杂,reactor所有的事件监听与响应仍在单线程中运行,高并发下容易出现瓶颈;
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
// 不能将同一个channel注册两次不同类型
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(80));
System.out.println("绑定成功");
// SOCKET与HANDLE映射关系
Map<Integer, Handler> map = new HashMap<>();
ExecutorService workGroup = Executors.newFixedThreadPool(2);
// 循环处理selector
while (true) {
// 等待select
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 分派 dispatch
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
accept.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
Handler handler = map.get(channel.hashCode());
if (handler == null) {
handler = new Handler(channel, workGroup);
}
handler.doHandle();
map.put(channel.hashCode(), handler);
} else {
System.out.println("无法处理 channel op"+selectionKey.interestOps());
}
iterator.remove();
}
}
}
// 定义处理Handler
static class Handler {
SocketChannel socketChannel;
ExecutorService workGroup;
public Handler(SocketChannel socketChannel, ExecutorService workGroup) {
this.socketChannel = socketChannel;
this.workGroup = workGroup;
System.out.println("创建了socket"+socketChannel);
}
public void doHandle() throws Exception {
// 分配空间
ByteBuffer allocate = ByteBuffer.allocate(1024);
// 读取数据
socketChannel.read(allocate);
// 业务处理-打印
workGroup.submit(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " === " +new String(allocate.array()));
allocate.clear();
allocate.put("Hello".getBytes());
allocate.flip();
// 响应数据
try {
socketChannel.write(allocate);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
Reactor主线程负责建立连接,当连接建立后,将其交给Reactor子线程去处理;其可以对应多个Reactor子线程;
Reactor子线程将连接加入队列中进行监听,并创建handler进行各种事件的处理;handler通过读取数据,将业务逻辑交给worker线程池处理,最终由handler去响应结果;
worker线程池分配独立的worker线程去处理业务,并返回结果;
优点:
缺点:编程复杂度高
public static void main(String[] args) throws Exception {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open();
// 不能将同一个channel注册两次不同类型
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
serverSocketChannel.bind(new InetSocketAddress(80));
System.out.println("绑定成功");
SubReactorFactory subReactorFactory = new SubReactorFactory(2, 8);
// 循环处理selector
while (true) {
// 等待select
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 分派 dispatch
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
SocketChannel accept = channel.accept();
accept.configureBlocking(false);
subReactorFactory.dispatch(accept);
} else {
System.out.println("无法处理 channel op" + selectionKey.interestOps());
}
iterator.remove();
}
}
}
// 工厂类,用于分配boss线程和SubReactor
static class SubReactorFactory {
ExecutorService bossGroup;
ExecutorService workGroup;
int thread;
SubReactor[] subReactors;
public SubReactorFactory(int thread, int workGroupNum) {
this.bossGroup = Executors.newFixedThreadPool(thread);
this.workGroup = Executors.newFixedThreadPool(workGroupNum);
this.thread = thread;
subReactors = new SubReactor[thread];
for (int i = 0; i < thread; i++) {
subReactors[i] = new SubReactor(workGroup);
}
}
public void dispatch(SocketChannel channel) {
SubReactor subReactor = subReactors[channel.hashCode() % thread];
bossGroup.submit(() -> {
try {
subReactor.register(channel);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
// 拥有独立的Selector
static class SubReactor {
Selector selector;
Map<Integer, Handler> map = new HashMap<>();
ExecutorService workGroup;
public SubReactor(ExecutorService workGroup) {
try {
selector = Selector.open();
this.workGroup = workGroup;
} catch (Exception e) {
e.printStackTrace();
}
new Thread(() -> {
try {
this.handler();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
public void register(SocketChannel channel) throws IOException {
channel.register(selector, SelectionKey.OP_READ);
selector.wakeup();
}
// 这里之所以用select(long timeout)是因为有消息过来并未出发select返回;
public void handler() throws Exception {
while (true) {
// 这里可能会出现性能瓶颈
if (selector.select(5000) == 0) {
System.out.println("selector无事件 "+ this);
continue;
}
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 分派 dispatch
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isReadable()) {
// 不卡主线程,提高并发
SocketChannel channel = (SocketChannel) selectionKey.channel();
Handler handler = map.get(channel.hashCode());
if (handler == null) {
handler = new Handler(channel, workGroup);
}
Handler finalHandler = handler;
map.put(channel.hashCode(), finalHandler);
finalHandler.doHandle();
} else {
System.out.println("无法处理 channel op" + selectionKey.interestOps());
}
iterator.remove();
}
}
}
}
// 定义处理Handler
static class Handler {
SocketChannel socketChannel;
ExecutorService workGroup;
public Handler(SocketChannel socketChannel, ExecutorService workGroup) {
this.socketChannel = socketChannel;
this.workGroup = workGroup;
System.out.println("创建了socket" + socketChannel);
}
public void doHandle() throws Exception {
// 分配空间
ByteBuffer allocate = ByteBuffer.allocate(1024);
// 读取数据
socketChannel.read(allocate);
// 业务处理-打印
workGroup.submit(() -> {
System.out.println(Thread.currentThread().getName() + " === " + new String(allocate.array()));
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
allocate.clear();
allocate.put("Hello".getBytes());
allocate.flip();
// 响应数据
try {
socketChannel.write(allocate);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
当不指定NioEventLoopGroup中的线程数时,则取java启动参数io.netty.eventLoopThreads
,若没有配置则是cpu数*2;
由于EventLoop是一个单线程线程池,所以它可以执行定时任务等其他线程任务;
NioEventLoop内部采用串行化设计,从消息的读取->解码->编码->发送,始终由IO线程NioEventLoop负责;
每个NioEventLoop的selector上可以注册监听多个NioChannel;
每个NioChannel只会绑定在唯一的NioEventLoop上;
每个NioChannel都绑定有一个自己的ChannelPipeline上;