Netty启动流程可以简化成如下代码
// netty 中使用 NioEventLoopGroup (简称 nio boss 线程)来封装线程和 selector
Selector selector = Selector.open();
// 创建 NioServerSocketChannel,同时会初始化它关联的 handler,以及为原生 ssc 存储 config
NioServerSocketChannel attachment = new NioServerSocketChannel();
// 创建 NioServerSocketChannel 时,创建了 java 原生的 ServerSocketChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
// 启动 nio boss 线程执行接下来的操作
//注册(仅关联 selector 和 NioServerSocketChannel),未关注事件
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, attachment);
// head -> 初始化器 -> ServerBootstrapAcceptor -> tail,初始化器是一次性的,只为添加 acceptor
// 绑定端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 触发 channel active 事件,在 head 中关注 op_accept 事件
selectionKey.interestOps(SelectionKey.OP_ACCEPT);
ssc : 其实就是 ServerSocketChannel
向nio ssc 加入了accept handler (在accept事件发生后建立连接)
bind
选择器Selector的创建是在NioEventLoopGroup中完成的。
NioServerSocketChannel 与 ServerSocketChannel的创建、ServerSocketChannel注册到Seletor中以及绑定操作都是由 bind 方法完成
**服务器入口: ** io.netty.bootstrap.AbstractBootstrap#bind(int)
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
validate();
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
真正完成初始化、注册以及绑定的方法是
io.netty.bootstrap.AbstractBootstrap#doBind
注意: dobind方法在主线程中执行
private ChannelFuture doBind(final SocketAddress localAddress) {
// 负责NioServerSocketChannel和ServerSocketChannel的创建
// ServerSocketChannel的注册工作
// init由main线程完成,regisetr由NIO线程完成
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 因为register操作是异步的
// 所以要判断主线程执行到这里时,register操作是否已经执行完毕
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
// 执行doBind0绑定操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
// 如果register操作还没执行完,就会到这个分支中来
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 添加监听器,NIO线程异步进行doBind0操作
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
initAndRegisterd
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// register promise.channel().unsafe().register(this, promise)方法
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
//promise.channel().unsafe().register(this, promise)方法
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
// 此处完成了 main 到nio 线程的切换
// 当前线程是否是nio线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 向nio线程中添加任务
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 该方法中会执行doRegister
// 执行真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// 调用init中的initChannel方法 其实就是第一点中未执行的加handler
pipeline.invokeHandlerAddedIfNeeded();
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// javaChannel()即为ServerSocketChannel
// eventLoop().unwrappedSelector()获取eventLoop中的Selector
// this为NIOServerSocketChannel,作为附件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
在doRegister和invokeHandlerAddedIfNeeded操作中的完成后,会调用safeSetSuccess(promise)方法,向Promise中设置执行成功的结果。此时doBind方法中由initAndRegister返回的ChannelFuture对象regFuture便会由NIO线程异步执行doBind0绑定操作
// initAndRegister为异步方法,会返回ChannelFuture对象
final ChannelFuture regFuture = initAndRegister();
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 如果没有异常,则执行绑定操作
doBind0(regFuture, channel, localAddress, promise);
}
}
});
doBind0最底层调用的是ServerSocketChannel的bind方法
NioServerSocketChannel.doBind方法
通过该方法,绑定了对应的端口
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
// 调用ServerSocketChannel的bind方法,绑定端口
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
在绑定端口操作完成后,会判断各种所有初始化操作是否已经完成,若完成,则会添加ServerSocketChannel感兴趣的事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
最终在AbstractNioChannel.doBeginRead方法中,会添加ServerSocketChannel添加Accept事件
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
// 如果ServerSocketChannel没有关注Accept事件
if ((interestOps & readInterestOp) == 0) {
// 则让其关注Accepet事件
// readInterestOp 取值是 16
// 在 NioServerSocketChannel 创建时初始化
selectionKey.interestOps(interestOps | readInterestOp);
}
}
通过上述步骤,完成了
NioEventLoop 的重要组成部分有三个
public final class NioEventLoop extends SingleThreadEventLoop {
...
// selector中的selectedKeys是基于数组的
// unwrappedSelector中的selectedKeys是基于HashSet的
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
...
}
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
// 任务队列
private final Queue<Runnable> taskQueue;
// 线程
private volatile Thread thread;
}
Selector是在NioEventLoop的构造方法中被创建的
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
...
// 初始化selector,初始化过程在openSelector中
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 此处等同于 Selector.open()方法
// 创建了unwrappedSelector对象
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
}
NioEventLoop 中有selector 与 unwrappedSelector 两个selector,它们的区别主要在于SelectorKeys的数据结构.
这样做的主要目的是,数组的遍历效率要高于HashSet
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
...
// 获得基于数组的selectedKeySet实现
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 通过反射拿到unwrappedSelector中的selectedKeys属性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
...
// 暴力反射,修改私有属性
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
// 替换为基于数组的selectedKeys实现
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
selectedKeys = selectedKeySet;
// 调用构造函数,创建unwrappedSelector与selector
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
NioEventLoop中的线程,在首次执行任务时,才会被创建,且只会被创建一次.
代码;
public class TestNioEventLoop {
public static void main(String[] args) {
EventLoop eventLoop = new NioEventLoopGroup().next();
// 使用NioEventLoop执行任务
eventLoop.execute(()->{
System.out.println("hello");
});
}
}
进入 execute 执行任务
@Override
public void execute(Runnable task) {
// 检测传入的任务是否为空,为空会抛出NullPointerException
ObjectUtil.checkNotNull(task, "task");
// 执行任务
// 此处判断了任务是否为懒加载任务,wakesUpForTask的返回值只会为true
execute(task, !(task instanceof LazyRunnable) && wakesUpForTask(task));
}
进入上述代码的 execute 方法
private void execute(Runnable task, boolean immediate) {
// 判断当前线程是否为NIO线程
// 判断方法为 return thread == this.thread;
// this.thread即为NIO线程,首次执行任务时,其为null
boolean inEventLoop = inEventLoop();
// 向任务队列taskQueue中添加任务
addTask(task);
// 当前线程不是NIO线程,则进入if语句
if (!inEventLoop) {
// 启动NIO线程的核心方法
startThread();
...
}
// 有任务需要被执行时,唤醒阻塞的NIO线程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
进入startThread方法
private void startThread() {
// 查看NIO线程状态是否为未启动
// 该if代码块只会执行一次
// state一开始的值就是ST_NOT_STARTED
// private volatile int state = ST_NOT_STARTED;
if (state == ST_NOT_STARTED) {
// 通过原子属性更新器将状态更新为启动(ST_STARTED)
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 执行启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
进入 doStartThread ,真正创建NIO线程并执行任务
private void doStartThread() {
assert thread == null;
// 创建NIO线程并执行任务
executor.execute(new Runnable() {
@Override
public void run() {
// thread即为NIO线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 执行内部run方法
SingleThreadEventExecutor.this.run();
success = true;
}
...
});
}
通过执行 SingleThreadEventExecutor.this.run(); 执行传入的任务 (task)
该run方法是** NioEventLoop的run方法**
@Override
protected void run() {
int selectCnt = 0;
// 死循环,不断地从任务队列中获取各种任务来执行
for (;;) {
// 执行各种任务
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
}
}
}
NioEventLoop需要IO事件、普通任务以及定时任务,任务在run方法的for 循环中
@Override
protected void run() {
int selectCnt = 0;
// 死循环,不断地从任务队列中获取各种任务来执行
for (;;) {
// 执行各种任务
...
}
}
被执行,但该循环不会空转,执行到某些代码时,会被阻塞
run方法中有 Select 分支
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
会执行NioEventLoop 的 select 方法
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
long normalizedDeadlineNanos = selectDeadLineNanos - initialNanoTime();
if (nextWakeupTime != normalizedDeadlineNanos) {
nextWakeupTime = normalizedDeadlineNanos;
}
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
但需要注意的是,select方法是会阻塞线程的,当没有IO事件,但有其他任务需要执行时,需要唤醒线程
唤醒是通过execute最后的if代码块来完成的
// 有任务需要被执行时,唤醒阻塞的NIO线程
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
NioEventLoop.wakeup唤醒被selector.select方法阻塞的NIO线程
@Override
protected void wakeup(boolean inEventLoop) {
// 只有当其他线程给当前NIO线程提交任务时(如执行execute),才会被唤醒
// 通过AtomicLong进行更新,保证每次只能有一个线程唤醒成功
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
// 唤醒被selector.select方法阻塞的NIO线程
selector.wakeup();
}
}
run方法的switch语句有多条分支,具体执行分支的代码由strategy变量控制
int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
...
}
strategy 的值 由 cacluateStrategy 方法确定
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
// selectSupplier.get() 底层是 selector.selectNow();
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
该方法会根据hasTaks变量判断任务队列中是否有任务
private final IntSupplier selectNowSupplier = new IntSupplier() {
public int get() throws Exception {
return NioEventLoop.this.selectNow();
}
};
int selectNow() throws IOException {
return this.selector.selectNow();
}
也就说,当任务队列中没有任务时,才会进入SELECT分支,让NIO线程阻塞,而不是空转。若有任务,则会通过get方法调用selector.selectNow方法,顺便拿到IO事件
Java NIO空轮询BUG也就是JavaNIO在Linux系统下的epoll空轮询问题
在NioEventLoop中,因为run方法中存在一个死循环,需要通过selector.select方法来阻塞线程。但是select方法因为BUG,可能无法阻塞线程,导致循环一直执行,使得CPU负载升高
@Override
protected void run() {
...
for(;;){
...
// 可能发生空轮询,无法阻塞NIO线程
strategy = select(curDeadlineNanos);
...
if(...) {
...
} else if (unexpectedSelectorWakeup(selectCnt) ){
// 通过unexpectedSelectorWakeup方法中的rebuildSelector重建selector
// 并将selectCnt重置为0
selectCnt = 0;
}
}
}
Netty中通过selectCnt变量来检测select方法是否发生空轮询BUG
若发生空轮询BUG,那么selectCnt的值会增长是十分迅速。当selectCnt的值大于等于SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时,Netty则判断其出现了空轮询BUG,进行如下处理
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);
// 重建selector,将原selector的配置信息传给新selector
// 再用新selector覆盖旧selector
rebuildSelector();
return true;
}
通过rebuildSelector方法重建selector,将原selector的配置信息传给新selector,再用新selector覆盖旧selector。同时将selectCnt的值设置为0
NioEventLoop可以处理IO事件和其他任务。不同的操作所耗费的时间是不同的,想要控制NioEventLoop处理IO事件花费时间占执行所有操作的总时间的比例,需要通过ioRatio来控制
NioEventLoop.run方法
// 处理IO事件时间比例,默认为50%
final int ioRatio = this.ioRatio;
// 如果IO事件时间比例设置为100%
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 计算出处理其他任务的事件
// 超过设定的时间后,将会停止任务的执行,会在下一次循环中再继续执行
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
IO事件是通过NioEventLoop.processSelectedKeys()方法处理的
private void processSelectedKeys() {
// 如果selectedKeys是基于数组的
// 一般情况下都走这个分支
if (selectedKeys != null) {
// 处理各种IO事件
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
processSelectedKeysOptimized方法
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
// 拿到SelectionKeyec
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
// 获取SelectionKey上的附件,即NioServerSocketChannel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理事件,传入附件NioServerSocketChannel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
NIO中处理Accept事件主要有以下六步
代码如下
// 阻塞直到事件发生
selector.select();
Iterator<SelectionKey> iter = selector.selectionKeys().iterator();
while (iter.hasNext()) {
// 拿到一个事件
SelectionKey key = iter.next();
// 如果是 accept 事件
if (key.isAcceptable()) {
// 执行accept,获得SocketChannel
SocketChannel channel = serverSocketChannel.accept();
channel.configureBlocking(false);
// 将SocketChannel注册到selector中,并关注read事件
channel.register(selector, SelectionKey.OP_READ);
}
// ...
}
其中前三步,在NioEventLoop剖析中已经分析过了,所以接下来主要分析后三步
发生Accept事件后,会执行NioEventLoop.run方法的如下if分支
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
NioMessageUnsafe.read方法
public void read() {
...
try {
try {
do {
// doReadMessages中执行了accept获得了SocketChannel
// 并创建NioSocketChannel作为消息放入readBuf
// readBuf是一个ArrayList用来缓存消息
// private final List<Object> readBuf = new ArrayList<Object>();
int localRead = doReadMessages(readBuf);
...
// localRead值为1,就一条消息,即接收一个客户端连接
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发read事件,让pipeline上的handler处理
// ServerBootstrapAcceptor.channelRead
pipeline.fireChannelRead(readBuf.get(i));
}
...
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
NioSocketChannel.doReadMessages方法
该方法中处理accpet事件,获得SocketChannel,同时创建了NioSocketChannel,作为消息放在了readBuf中
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 处理accpet事件,获得SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 创建了NioSocketChannel,作为消息放在了readBuf中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
...
}
return 0;
}
ServerBootstrapAcceptor.channelRead
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 这时的msg是NioSocketChannel
final Channel child = (Channel) msg;
// NioSocketChannel添加childHandler,即初始化器
child.pipeline().addLast(childHandler);
// 设置选项
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
// 注册 NioSocketChannel到nio worker线程,接下来的处理也移交至nio worker线程
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
通过AbstractUnsafe.register 方法,将SocketChannel注册到了Selector中,过程与启动流程中的Register过程类似
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这行代码完成的是nio boss -> nio worker线程的切换
eventLoop.execute(new Runnable() {
@Override
public void run() {
// 真正的注册操作
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
AbstractChannel.AbstractUnsafe.register0
private void register0(ChannelPromise promise) {
try {
...
// 该方法将SocketChannel注册到Selector中
doRegister();
// 执行初始化器,执行前 pipeline 中只有 head -> 初始化器 -> tail
pipeline.invokeHandlerAddedIfNeeded();
// 执行后就是 head -> logging handler -> my handler -> tail
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
// 触发pipeline上active事件
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
AbstractNioChannel.doRegister将SocketChannel注册到Selector中
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将Selector注册到Selector中
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
...
}
}
}
HeadContext.channelActive
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
// 触发read(NioSocketChannel这里read只是为了触发channel的事件注册,还未涉及数据读取)
readIfIsAutoRead();
}
AbstractNioChannel.doBeginRead,通过该方法,SocketChannel关注了read事件
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// 这时候 interestOps是0
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
// 关注read事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
read事件的处理也是在
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
分支中,通过unsafe.read()方法处理的,不过此处调用的方法在AbstractNioByteChannel.NioByteUnsafe类中
@Override
public final void read() {
// 获得Channel的配置
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// 根据配置创建ByteBufAllocator(池化非池化、直接非直接内存)
final ByteBufAllocator allocator = config.getAllocator();
// 用来分配 byteBuf,确定单次读取大小
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 创建ByteBuf
byteBuf = allocHandle.allocate(allocator);
// 读取内容,放入ByteBUf中
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
// 触发read 事件,让pipeline上的handler处理
// 这时是处理NioSocketChannel上的handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
}
// 是否要继续循环
while (allocHandle.continueReading());
allocHandle.readComplete();
// 触发 read complete事件
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
DefaultMaxMessagesRecvByteBufAllocator.MaxMessageHandle.continueReading(io.netty.util.UncheckedBooleanSupplier)
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return
// 一般为true
config.isAutoRead() &&
// respectMaybeMoreData默认为true
// maybeMoreDataSupplier的逻辑是如果预期读取字节与实际读取字节相等,返回true
(!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
// 小于最大次数,maxMessagePerRead默认16
totalMessages < maxMessagePerRead &&
// 实际读到了数据
totalBytesRead > 0;
}