文章是基于netty-4.1.95.Final版本讲解,更多是解析源码,挖掘设计思路,学习代码灵活运用。
如果你阅读完本文,你会发现很多细节让人为之拍案叫绝~~
本文共五万多字为第一篇Netty文章(后续还有哦),全网最详细的Netty源码解析
官网地址:https://netty.io/
Netty的描述:
Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
关键字:asynchronous、event-driven
IO模式分为:同步阻塞、同步非阻塞、异步阻塞、异步非阻塞
一般情况下异步操作都是异步非阻塞模式。在开发程序时,主线程按顺序处理操作,一般只有当前流程执行完,下一个才能执行,这种都是同步操作。那么如何改成异步?把每个流程改成回调即可。主线程将回调注册完成后继续处理自己的事情,而回调操作在特定事件完成后被调用,此时需要用一个其他线程来完成,那么线程池就非常重要。
在Java中有哪些回调类?
Netty使用Future类在任务完成时做回调操作,如果回调的方法不止一个怎么办?在juc的Future基础上扩展,增加监听器的功能。当任务完成时回调一组监听器即可。这就是Promise类的作用。
简单看一下Promise类的方法:
Promise使用了什么设计模式?观察者模式。通过注册回调函数(Listener)来监听事件。当事件完成时,Promise会通知所有注册的Listener
Java标准库的CompletableFuture类跟Promise类似,都有注册回调,然后在任务完成时通知的功能。
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Test1 {
public static void main(String[] args) {
// 为何不使用这种方式创建线程池?因为创建LinkedBlockingQueue工作队列时没有传入capacity,默认是Integer.MAX_VALUE,
// 那么队列可以认为是没有边界的,在资源受限的情况下不可取,所以可以直接自己修改
// ExecutorService es = Executors.newFixedThreadPool(100);
// 核心线程数:保持活跃的线程数,即使线程是空闲的,也不会被回收,除非设置了allowCoreThreadTimeout为true
// 最大线程数:线程池允许创建的最大线程数,当工作队列已满且活动线程数达到最大线程数时,新任务会根据拒绝策略去阻塞或拒绝
// 线程空闲时间:超过这个空闲时间后,空闲线程会被销毁,直到线程数等于核心线程数
// 工作队列:暂时存储未执行的任务。有多个队列可以使用,LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue等
// 线程工厂:可以自定义线程的创建方式,例如给线程命名(实际在业务中非常重要,寻找问题的得力帮手)等
// 拒绝策略:当线程池和工作队列都已满时,决定如何处理新提交的任务。CallerRunsPolicy(让调用线程池execute的线程去执行)、AbortPolicy(抛出异常)、DiscardPolicy(直接拒绝)、DiscardOldestPolicy(拒绝等待时间最久的任务)
ExecutorService executorService = new ThreadPoolExecutor(5, 20,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadFactory() {
final AtomicInteger atomicInteger = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("业务线程-" + atomicInteger.incrementAndGet());
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy());
// CompletableFuture异步回调类,支持链式调用
// supplyAsync: 有返回值异步任务,可以采用定制的线程池,默认采用ForkJoinPool线程池
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + ":supplyAsync方法");
return "hello";
}, executorService);
// thenAccept: 在上一个任务完成后执行,可以处理其结果,执行给定的consumer,无返回值
// thenRun: 在上一个任务完成后执行,不处理其结果,执行给定的Runnable,无返回值
// join: 等待上一个任务执行完成,获取最终的结果值
completableFuture.thenAccept(str -> System.out.println(Thread.currentThread().getName() + ":" + str))
.thenRun(() -> System.out.println(Thread.currentThread().getName() + ":exit")).join();
// 打印结果:
// 业务线程-1:supplyAsync方法
// 业务线程-1:hello
// 业务线程-1:exit
// main:主线程结束
System.out.println(Thread.currentThread().getName() + ":主线程结束");
}
}
thenAccept:注册Consumer任务
thenRun:注册Runnable任务
…
CompletableFuture支持很多函数式编程的接口类,功能更加丰富,为何Netty不使用此类作为回调类?
Netty给出一些例子,我们看个简单的:
创建两个对象:bossGroup和workerGroup。类是NioEventLoopGroup,我们就从这个类入手,深入了解。
从继承树上可以看到NioEventLoopGroup继承了两条路线:EventExecutor和EventLoop
EventExecutorGroup:事件执行器组。执行器是线程或线程池。
EventExecutor:继承EventExecutorGroup。一个人也可以是一个组,一个事件执行器也可以是一个组
AbstractEventExecutorGroup:继承EventExecutorGroup。使用模板方法设计模式,定义算法的骨架,将某些步骤延迟到子类实现。
MultithreadEventExecutorGroup:多线程事件执行器组。使用多个工作线程(EventExecutor)来执行任务和处理事件
既然有多线程的,有没有单线程的?
SingleThreadEventExecutor:继承AbstractScheduledEventExecutor,支持延迟任务和周期性任务。实现OrderedEventExecutor,表示该执行器是有顺序的,确保相同的任务或事件按照提交的顺序执行。
小结:事件执行器是一个处理任务或事件的组件,支持多线程和单线程模式,分别实现EventExecutorGroup和EventExecutor
EventLoopGroup:事件循环组。继承EventExecutorGroup,表示要使用多线程模式。提供注册Channel和ChannelPromise的功能。next方法重写EventExecutorGroup的方法,用于获取下一个可用的EventLoop对象。
EventLoop:事件循环。
小结:事件循环器是将事件注册,按一定顺序获取事件,交由事件执行器循环处理的。EventLoop使用单线程模式,EventLoopGroup使用多线程模式。
类之间的关系如下,EventLoop使用EventExecutor执行事件,EventLoopGroup由EventExecutorGroup执行一组事件。
关键继承的类:SingleThreadEventLoop、SingleThreadEventExecutor、EventLoop
SingleThreadEventLoop:继承SingleThreadEventExecutor,拥有单线程执行任务的功能
NioEventLoop:从命名来看是Nio + EventLoop。EventLoop是管理(注册)事件(Channel)的。Nio是Java标准库中的New IO,使用Selector + ByteBuffer + Channel的架构实现高性能网络编程,由Selector管理Channel。那么NioEventLoop也要集成Selector,管理自己的channel对象。
NioEventLoopGroup:继承MultithreadEventLoopGroup,拥有多线程执行的功能。
小结:NioEventLoopGroup管理一组NioEventLoop,交由事件执行器组(EventExecutorGroup)处理。NioEventLoop由selector管理事件,交由事件执行器(EventExecutor)处理。
bossGroup的线程数为1,workerGroup使用的是空的构造方法,线程数为0
SelectorProvider顾名思义是selector提供者,如果你是Windows系统provider变成WEPollSelectorProvider。要从Hotspot源码中寻找到Linux包,创建的是EPollSelectorProvider对象。最后会创建EPollSelectorImpl(Selector子类)对象返回。
注:epoll是非常重要的功能,在后面会详细解释,现在只要记住Linux肯定会使用epoll参与网络事件
SelectorProvider使用了单例模式,Holder.INSTANCE静态final变量确保只有一个SelectorProvider对象存在
DefaultSelectStrategyFactory是默认的SelectStrategyFactory的子类,从类名来看用于select决策的工厂类,会决定是否调用selector.select的类。
RejectedExecutionHandlers.reject():拒绝执行的处理器。默认是抛出RejectedExecutionException异常。
如果nThreads为0,使用默认的线程数为 CPU的核心数 * 2。为啥要使用这个值?
程序的性能瓶颈是网络、磁盘、CPU等等,可以归类为两种情况:IO密集型、CPU密集型
两种性能瓶颈的解决方法:
那么Netty用于网络编程中,明显属于IO密集型。
DefaultEventExecutorChooserFactory是默认的选择器工厂类
为何要判断EventExecutor的数量是否是二次幂?
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// 执行器数量是否是二次幂
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// PowerOfTwoEventExecutorChooser
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
// GenericEventExecutorChooser
public EventExecutor next() {
return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
}
继续调用super来到父类:MultithreadEventExecutorGroup。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// nThreads不能小于0
checkPositive(nThreads, "nThreads");
// 从目前链路来看,executor为空
if (executor == null) {
// ThreadPerTaskExecutor是给每个任务创建本地线程,不使用线程池
// DefaultThreadFactory是默认的线程工厂,newThread方法创建Thread对象(FastThreadLocalThread对象)
// 此方法没有限制线程数,可能是因为线程用完即弃,每次任务都需要创建新线程执行。
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// children是group中的执行器数组
// 每个执行器只使用一个线程,保证TLS机制,线程安全且高效
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建执行器对象。newChild是抽象方法,必须由子类实现。此处使用了模板设计模式
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// success为false,说明创建对象失败
if (!success) {
// 将已经创建的执行器依次关闭
for (int j = 0; j < i; j ++) {
// 优雅地关闭执行器。就是先将任务执行完,再关闭
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
// 等待执行器关闭
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
// 如果被中断,就设置中断标志位,退出循环
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 根据执行器数量决定使用哪种选择器
chooser = chooserFactory.newChooser(children);
// 监听执行器停止的监听器对象
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
// 当所有执行器结束后,将回调对象terminationFuture设置为success,然后会唤醒所有挂在此Future后的监听器
if (terminatedChildren.incrementAndGet() == children.length) {
// terminationFuture属于当前执行器组的
terminationFuture.setSuccess(null);
}
}
};
// 给每个执行器添加监听器
for (EventExecutor e: children) {
// e.terminationFuture()是属于各自执行器的terminationFuture
e.terminationFuture().addListener(terminationListener);
}
// 将执行器数组改成只读不可修改的Set集合。
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
这个构造方法初始化一些操作:创建执行器、获取chooser、添加监听器等
小结:EventExecutorGroup管理一组EventExecutor,每个EventExecutor独享一个线程,通过chooser选择EventExecutor执行任务。
在上面可以看到newChild是创建EventExecutor的核心方法,在MultithreadEventExecutorGroup中是抽象方法,就要由子类实现。此处使用了模板设计模式。
子类是NioEventLoopGroup,看下newChild实现
创建NioEventLoop:初始化参数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory taskQueueFactory, EventLoopTaskQueueFactory tailTaskQueueFactory) {
// 调用父类构造器,传入必要参数
// newTaskQueue:生成 无界的多生产者-单消费者队列(MPSC)
super(parent, executor, false, newTaskQueue(taskQueueFactory), newTaskQueue(tailTaskQueueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// 调用provider.openSelector()获取selector对象,若是Linux系统,就是EPollSelectorImpl对象。
final SelectorTuple selectorTuple = openSelector();
// 两个对象是一样的
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
// parent:NioEventLoopGroup对象
// executor:ThreadPerTaskExecutor对象,每个任务起一个线程
// addTaskWakesUp:此变量的意思是添加任务时唤醒EventLoop,但根据上下文意思相反。true不唤醒,false唤醒
// taskQueue:任务队列,存放等待执行的任务
// tailTaskQueue:尾部任务队列,在事件处理之后、准备下一次轮询之前的任务。通常是周期性或延迟的任务
// rejectedExecutionHandler:如果任务添加到任务队列失败,就会采取拒绝操作。默认是RejectedExecutionHandlers.reject()拒绝策略
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Queue<Runnable> tailTaskQueue,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
// 默认等待的任务数为16
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
// 当前EventExecutor设置到当前Thread的本地变量中(TLS),此处在后面可以详细说一下
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
// EventExecutor的父类是EventExecutorGroup。从上下文来看parent是NioEventLoopGroup对象。
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
小结:NioEventLoop创建过程中,主要做了变量初始化
很多人会对这段代码有点疑问,不知道啥意思。实际这里是使用TLS机制将执行器与线程进行绑定。
Netty使用FastThreadLocal来代替Java原生的ThreadLocal,为啥要这样做呢?
在Java线程中,都有一个ThreadLocalMap实例变量(如果不使用ThreadLocal,不会创建map变量)。ThreadLocal只有一个变量,设置TLS数据时,给每个线程创建ThreadLocalMap变量。该map使用线性探测的方式解决hash冲突的问题,如果没有空闲的位置,就不断往后尝试,找到空闲的位置,插入entry。那么在发生hash冲突时就会影响效率。
FastThreadLocal解决了hash冲突的问题,下面根据源码看看如何解决的
返回新的Executor,将eventExecutor映射到当前线程中。
// executor:ThreadPerTaskExecutor对象,是Java基础库的Executor接口的实现类
// eventExecutor:SingleThreadEventExecutor对象
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(executor, "executor");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 创建新的执行器
return new Executor() {
@Override
public void execute(final Runnable command) {
// execute会创建新线程来执行任务
// apply:eventExecutor绑定到当前线程的本地变量中。此方法要返回一个Runnable对象。
executor.execute(apply(command, eventExecutor));
}
};
}
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
ObjectUtil.checkNotNull(command, "command");
ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
// 返回新的Runnable对象
return new Runnable() {
@Override
public void run() {
// 设置eventExecutor到当前线程的本地变量中
setCurrentEventExecutor(eventExecutor);
try {
// 运行command任务
command.run();
} finally {
// 将线程的本地变量情况,help GC
setCurrentEventExecutor(null);
}
}
};
}
private static void setCurrentEventExecutor(EventExecutor executor) {
// mappings:FastThreadLocal对象,直接使用static final修饰。此变量会被多线程共享。
mappings.set(executor);
}
// 泛型是EventExecutor,线程本地变量存放的是EventExecutor对象
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();
FastThreadLocal是ThreadLocal的变种,具有更高的性能。作用也类似,ThreadLocal也是全局变量,只不过Thread有ThreadLocalMap的变量副本,ThreadLocal就是管理这些变量增删改查的作用。那么FastThreadLocal也是全局变量,管理线程本地变量。所以mappings变量是全局变量。
public final void set(V value) {
// UNSET是默认值,如果不是这个值就添加,否则就删除
if (value != InternalThreadLocalMap.UNSET) {
// 若当前线程是FastThreadLocalThread对象,就返回自定义高性能的map,反之就返回ThreadLocal中的变量
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 将value设置到threadLocalMap中。
setKnownNotUnset(threadLocalMap, value);
} else {
remove();
}
}
InternalThreadLocalMap是代替ThreadLocalMap的。若线程属于FastThreadLocalThread就返回快的map,否则就从ThreadLocal中获取慢的map。
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
// 获取快的map
return fastGet((FastThreadLocalThread) thread);
} else {
// 获取慢的map
return slowGet();
}
}
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
// 从FastThreadLocalThread中获取threadLocalMap变量,跟Thread一样,有threadLocalMap变量
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();
// 若为空,就创建InternalThreadLocalMap对象传入
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
// 将InternalThreadLocalMap放到ThreadLocalMap中
private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =
new ThreadLocal<InternalThreadLocalMap>();
private static InternalThreadLocalMap slowGet() {
// 如果ThreadLocalMap没有InternalThreadLocalMap对象,就设置新的对象
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
setKnownNotUnset方法:将值设置到FastThreadLocalMap的indexedVariables数组中
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
// 在indexedVariables数组的index位置,设置value
if (threadLocalMap.setIndexedVariable(index, value)) {
// 将当前FastThreadLocal添加到remove列表
addToVariablesToRemove(threadLocalMap, this);
}
}
private final int index;
public FastThreadLocal() {
// index是InternalThreadLocalMap中的nextIndex的数值,每调用一次nextVariableIndex,数值就加1
index = InternalThreadLocalMap.nextVariableIndex();
}
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
// VARIABLES_TO_REMOVE_INDEX:数组下一个空闲位置,由于是常量,此位置保持不变
// 从indexedVariables数组中获取index位置的元素
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
// remove列表,set集合用于去重
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
// 若元素为空,设置新的对象。
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(VARIABLES_TO_REMOVE_INDEX, variablesToRemove);
} else {
// 若元素不为空,强转为Set>类
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
// 往列表中添加FastThreadLocal对象
variablesToRemove.add(variable);
}
// 获取数组下一个空闲位置
public static final int VARIABLES_TO_REMOVE_INDEX = nextVariableIndex();
小结:
我们还没了解bossGroup和workerGroup是干啥的,就通过ServerBootstrap深入了解。
AbstractBootstrap:这是个泛型类定义。泛型B继承AbstractBootstrap,B是AbstractBootstrap的子类。泛型C是Channel的子类
作为Netty的引导基类,提供了一些通用的方法和属性,可以用于配置和启动网络程序。
AbstractBootstrap支持链式调用,怎么实现的呢?很简单,方法返回当前对象即可。
为何AbstractBootstrap定义了bind方法,这不是ServerBootstrap才有的吗?实际上Bootstrap也即客户端也有bind操作。如果调用bind方法,客户端会指定端口号,否则由内核指定端口号。客户端也不常用。
ServerBootstrap是对服务端的配置和启动。服务端有bind、listen和accept操作,分别是绑定IP和端口号、监听端口号、接收客户端连接。bind已经在AbstractBootstrap实现,listen实际已经在nio的bind方法中调用,那么只剩下accept操作。
创建内部类:ServerBootstrapAcceptor。用于处理accept操作。当然此类实现ChannelInboundHandlerAdapter,表示accept是入站操作,调用channelRead给新连接做一些统一处理
group方法:
channel方法:
handler和childHandler方法:
小结:ServerBootstrap是对服务端以及新连接的客户端进行配置。Bootstrap就是对客户端配置以及启动。每个channel都有一个pipeline,pipeline有多个处理器,在事件完成时被按顺序调用。
我们说的事件有哪些呢?
我们就以server端的bind切入,关于Java标准库中的知识点在Channel篇中详解。
bind方法支持多种传参方式,最终都是IP + port的方式,然后封装成InetScoketAddress对象。
doBind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
// 初始化channel,并将channel注册到selector,返回Future对象。Future包含channel以及事件完成与否。
final ChannelFuture regFuture = initAndRegister();
// 从Future获取channel对象
final Channel channel = regFuture.channel();
// 是否有异常,如果有就立即返回。
if (regFuture.cause() != null) {
return regFuture;
}
// 注册操作是否完成,成功或失败都算完成
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
// 这里我们知道注册已经完成,创建新的Promise,用于下一步操作的回调
ChannelPromise promise = channel.newPromise();
// 注册成功就注册到selector,否则就获取异常对象。
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else { // 表示注册未完成
// Registration future is almost always fulfilled already, but just in case it's not.
// 等待注册的promise
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 为regFuture注册监听器。当注册完成时会调用监听器
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
// 是否注册失败
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
// 注册成功,执行bind操作
doBind0(regFuture, channel, localAddress, promise);
}
}
});
// 返回bind操作的promise
return promise;
}
}
initAndRegister方法
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 通过channel工厂创建对象,在之前看过这个工厂类是ReflectiveChannelFactory(通过反射创建对象)
channel = channelFactory.newChannel();
// 此方法在AbstractBootstrap中是抽象方法,由子类实现。此处使用了模板方法模式。
// 对channel做初始化操作,比如在pipeline中添加设置的处理器
init(channel);
} catch (Throwable t) {
// 出现异常,要将channel关闭
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
// promise设置为failure
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
// 如果channel创建失败,就给一个FailedChannel对象
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// config():由于我们是从ServerBootstrap进来的,所以是ServerBootstrapConfig对象
// group():bossGroup
// 将channel注册到selector,也会将EventLoop注册到channel中。
ChannelFuture regFuture = config().group().register(channel);
// 如果注册失败,将channel关闭
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
doBind0方法:doBind是逻辑入口点,而doBind0是具体实现
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// 将绑定任务添加到channel绑定的EventLoop的任务队列(taskQueue)中,不会直接执行
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 注册操作是否成功
if (regFuture.isSuccess()) {
// 若成功,就执行绑定操作,在返回的Future中添加监听器。这个监听器判断是否成功,失败就关闭channel
// 既然将channel绑定到指定的EventLoop中,在执行bind操作时也要考虑当前线程是不是绑定的EventLoop的线程
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
// 失败的话就传递异常对象
promise.setFailure(regFuture.cause());
}
}
});
}
register方法:config().group().register(channel)
现在需要知道EventExecutor是哪个类?在NioEventLoopGroup创建时,newChild是创建EventLoop的核心方法
NioEventLoopGroup ==> MultithreadEventExecutorGroup
NioEventLoop ==> SingleThreadEventExecutor
结论:EventExecutor是SingleThreadEventExecutor类
register是EventLoopGroup定义的方法,SingleThreadEventLoop同时继承EventLoopGroup和SingleThreadEventExecutor,方法实现就在这个类中
为啥Channel要有Unsafe类?在Java标准库中也有Unsafe类,涉及CAS、屏障之类的native方法,要与JVM交互,设计人员认为开发者不懂JVM和内核底层,无法正确使用这些功能,遂将此类命名为Unsafe,表示此类是不安全的,不要盲目使用。那么Netty设计思路也是一样,Channel最终要与Java标准库的Channel交互,他们认为Java库是不安全的,不要盲目使用。所以Netty的Unsafe类是与Java的Nio进行交互的。
public ChannelFuture register(Channel channel) {
// DefaultChannelPromise顶级父类接口是ChannelFuture,相比于Future增加了channel方法,返回绑定的channel
// 专属于Channel异步结果操作。promise是register结果
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 根据上下文,channel是NioServerSocketChannel
promise.channel().unsafe().register(this, promise);
return promise;
}
由于register、bind、read、write是客户端和服务端通用的事件,所以他们会在父类实现。那么register方法在AbstractUnsafe类中实现的。AbstractUnsafe是AbstractChannel的内部类。
在注册事件的前后做一些额外的操作
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
// 是否已经注册。若已注册,设置异常信息返回
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
// eventLoop是否属于NioEventLoop类。若不是,设置异常信息返回
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// this.eventLoop是AbstractChannel中volatile修饰的属性,保证属性在读写时都是最新值。
// 由于此时是store操作,JVM会加上store load屏障,既保证写缓存立即更新到内存,又保证读缓存从内存读取新值
// 注意eventLoop是此时被赋值,后续会通过channel.eventLoop()获取到此对象
AbstractChannel.this.eventLoop = eventLoop;
// eventLoop父类是SingleThreadEventExecutor,有volatile修饰的thread属性,存放第一次运行时的线程.
// inEventLoop是判断当前线程与EventExecutor映射的线程是否同一个。若是,就直接调用register0执行注册功能,
// 否则放到eventLoop的任务队列中,等待被调度执行。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 将任务添加到eventLoop专属的任务队列
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) { // 出现异常,关闭Channel
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
// 关闭channel
closeForcibly();
// closeFuture设置为success,处理已注册的监听器
closeFuture.setClosed();
// 将register的promise设置为failure
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 采用模板设计模式,doRegister是抽象方法,由子类实现
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 在通知promise之前,触发执行ChannelHandler的handlerAdded方法
pipeline.invokeHandlerAddedIfNeeded();
// 将ChannelPromise标记为成功,顺便通知已经注册的监听器
safeSetSuccess(promise);
// 触发通道注册完成事件,执行ChannelInboundInvoker.fireChannelRegistered方法
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
// channel是否活跃。
// NioServerSocketChannel是确保channel开启,且已经绑定端口
// NioSocketChannel是确保channel开启,且已经与服务器建立连接
// 若从bind之前来看,此处是false。bind之后就是true了
if (isActive()) {
// 是否是第一次注册
if (firstRegistration) {
// 若通道是第一次注册,触发通道激活事件,执行ChannelInboundInvoker.fireChannelActive方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
// 若通道不是第一次注册,并且之前已经设置自动读取的选项(config.setAutoRead(true),默认autoRead=1),
// 则开始读取操作且执行入站操作。
// server的read事件是OP_ACCEPT,client的read事件是OP_READ
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
doRegister方法由子类实现,既然都是Nio的channel,而且server和client都有注册功能,所以此方法在他们共同的父类AbstractNioChannel中被实现
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// 将当前保存的Java的channel对象注册到selector中,但不注册感兴趣的事件。为何这样设计?注册channel和事件都要额外的处理
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {// 异常说明因特定情况取消该channel的监听
if (!selected) { // 只执行一次selectNow
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow(); // 执行selectNow,不等待执行从epoll返回可用事件
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
register过程已经详解,现在了解bind过程
由于bind是server和client共有的功能,所以此方法实现在父类
为何事件要在pipeline中操作?pipeline可以注册handler,handler有netty原生的,也可以由开发者自定义。在操作事件的过程中,可以触发handler。那么pipeline提供高可扩展性,并且能够灵活定制事件处理流程。
// localAddress:bind肯定是绑定本地IP和端口。
// promise:当注册channel完成时,就是DefaultChannelPromise。若未注册完成,就是PendingRegistrationPromise
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 在pipeline中处理事件
return pipeline.bind(localAddress, promise);
}
pipeline在AbstractChannel的构造器中创建,默认是DefaultChannelPipeline类
在pipeline中有两个特殊的上下文(context):HeadContext和TailContext。处于管道的头部和尾部,分别处理入站和出站的操作。他们父类都是AbstractChannelHandlerContext
出站入站怎么定义的?消息从应用程序发送到网络的叫出站事件,反之消息从网络发送到应用程序的叫做入站事件
server:
client:
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// bind是出站事件,自然由tail调用
return tail.bind(localAddress, promise);
}
AbstractChannelHandlerContext实现这个bind方法
headContext的bind是实际的操作,之前的handler是对bind的补充。
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
// promise是否被取消
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// 1、出站事件,由tail往head遍历
// 2、找到没有被@Skip注解修饰的handlerContext的bind方法
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
// 获取处理器中的事件执行器
EventExecutor executor = next.executor();
// 当前线程是否与事件执行器中的线程一致
if (executor.inEventLoop()) {
// 若一样,直接执行invokeBind方法
next.invokeBind(localAddress, promise);
} else {
// 否则放入执行器的任务队列中,等待被调度执行
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
// 是否执行了ChannelHandler的handlerAdded(ChannelHandlerContext)方法
if (invokeHandler()) {
try {
// DON'T CHANGE
// Duplex handlers implements both out/in interfaces causing a scalability issue
// see https://bugs.openjdk.org/browse/JDK-8180450
// 获取当前Context中的handler
final ChannelHandler handler = handler();
// 用局部变量接收全局变量,防止head在操作时发生改变,相当于是一个变量副本
final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
// 是否等于headContext
if (handler == headContext) {
// 执行headContext的bind方法。若从tail找到head,就会执行head的bind方法。实际的绑定操作在headContext中完成,之前都是对bind的补充。
headContext.bind(this, localAddress, promise);
} else if (handler instanceof ChannelDuplexHandler) {
// ChannelDuplexHandler既实现出站又实现入站
((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
} else {
// 普通的出站handler
((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
}
} catch (Throwable t) {
// promise设置异常信息
notifyOutboundHandlerException(t, promise);
}
} else {
// 未执行handlerAdded方法
// 这又会回到之前的bind方法中,为啥要这样做呢?总体是递归的过程,目的是执行完符合条件的handler
bind(localAddress, promise);
}
}
目前没有自定义的handler实现bind方法,所以直接查看headContext的源码
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
// bind方法是客户端和服务端共有的事件,所以在父类实现
unsafe.bind(localAddress, promise);
}
unsafe的父类是AbstractUnsafe抽象类,是AbstractChannel的内部类。由于子类的实现细节不同,使用模板设计模式先定义好执行步骤,实际的绑定操作doBind方法由子类实现。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 确保当前线程属于当前的eventLoop
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
// isActive由子类实现,怎样才算激活?
// NioServerSocketChannel:channel开启,并且已经绑定端口
// NioSocketChannel:channel开启,并且已经与服务器建立连接
boolean wasActive = isActive();
try {
// 子类实现绑定细节
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
// 第一次激活要触发响应的操作,调用ChannelInboundHandler.channelActive方法
if (!wasActive && isActive()) {
// 放入执行器的任务队列,等待被调度执行
invokeLater(new Runnable() {
@Override
public void run() {
// 此方法必须在第一次激活时调用
pipeline.fireChannelActive();
}
});
}
// 由于没有放入eventLoop的任务队列,bind操作又是阻塞操作,所以此处直接设置promise为success,并触发监听器
safeSetSuccess(promise);
}
doBind方法就是给socket绑定IP和端口号,详细的内容在后续解析。这里着重看下fireChannelActive实现过程
public final ChannelPipeline fireChannelActive() {
// 调用handlerContext的invokeChannelActive静态方法,传入headContext,很明显是一个入站方法
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
// 为啥变量名是next?因为此方法与其他一些方法组成递归操作
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
// 是否在事件循环中?
if (executor.inEventLoop()) {
// 若是,就直接执行方法
next.invokeChannelActive();
} else {
// 否则放入任务队列等待执行
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
private void invokeChannelActive() {
// 已经执行handlerAdded方法
if (invokeHandler()) {
try {
// DON'T CHANGE
// Duplex handlers implements both out/in interfaces causing a scalability issue
// see https://bugs.openjdk.org/browse/JDK-8180450
final ChannelHandler handler = handler();
final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
if (handler == headContext) {
// 若handler为headContext,执行此方法
headContext.channelActive(this);
} else if (handler instanceof ChannelDuplexHandler) {
// 双工通信的处理器
((ChannelDuplexHandler) handler).channelActive(this);
} else {
// 入站处理器
((ChannelInboundHandler) handler).channelActive(this);
}
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// 找到合适的处理器来执行
fireChannelActive();
}
}
还是跟之前的一样,此方法也由headContext实现
ctx.fireChannelActive会将事件传递给下一个ChannelInboundHandler进行处理,若下一个处理器没有继续往下传递,那么此方法只会传递一次。否则会一直传递下去
public void channelActive(ChannelHandlerContext ctx) {
// 由于一开始传入的是head,在DefaultChannelPipeline的构造器中,所以从head的下一个找到合适的处理器执行。
ctx.fireChannelActive();
// 若已经配置autoRead为true,则执行读取操作
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
// 默认配置为true
if (channel.config().isAutoRead()) {
// 服务端的read是accept事件,客户端的read是read事件
channel.read();
}
}
根据上下文,我们还在server的bind流程中,那么read方法要在server的channel中查看源码。又由于server和client都有read操作,所以此方法的算法逻辑还是在父类AbstractChannel中实现。
public Channel read() {
// read操作在pipeline中完成
pipeline.read();
return this;
}
public final ChannelPipeline read() {
// read是出站操作,要从后往前执行
tail.read();
return this;
}
public ChannelHandlerContext read() {
// 遍历寻找read方法未被@Skip修饰的处理器
final AbstractChannelHandlerContext next = findContextOutbound(MASK_READ);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 执行invokeRead方法
next.invokeRead();
} else {
// 使用这种封装Runnable的方式,避免重复添加任务
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
// 将next.invokeRead()方法放到Runnable中,添加到对应的eventLoop的等待队列
executor.execute(tasks.invokeReadTask);
}
return this;
}
private void invokeRead() {
// 是否具备执行handler的条件
if (invokeHandler()) {
try {
// DON'T CHANGE
// Duplex handlers implements both out/in interfaces causing a scalability issue
// see https://bugs.openjdk.org/browse/JDK-8180450
final ChannelHandler handler = handler();
final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
if (handler == headContext) {
// 执行headContext的read方法,这是实际读取事件的实现。
headContext.read(this);
} else if (handler instanceof ChannelDuplexHandler) {
// 双工处理器
((ChannelDuplexHandler) handler).read(this);
} else {
// 出站处理器
((ChannelOutboundHandler) handler).read(this);
}
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
// 未符合要求,寻找下一个handler
read();
}
}
headContext的read方法实现读取事件的操作
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
Unsafe对象对于服务端和客户端是不同的,对象的创建是在newUnsafe方法中,此方法是子类实现。方法的实现方是下面两个类中,分别对应客户端和服务端。为何会这样定义?服务端就是接收客户端连接,将fd 封装成channel对象。客户端就是与服务端进行读写操作,都是字节流操作。Netty认为服务端是channel交互,是个对象,也即message。而客户端是字节流操作,就是byte。
对于服务端来说,newUnsafe的实现在AbstractNioMessageChannel中,下图中unsafe是NioMessageUnsafe对象。
但是方法逻辑还是在AbstractUnsafe中
public final void beginRead() {
// 确保当前线程属于当前的事件循环的执行线程
assertEventLoop();
try {
// 抽象方法,由子类实现
doBeginRead();
} catch (final Exception e) {
// 出现异常,添加任务:触发异常捕捉操作,执行ChannelHandler的exceptionCaught方法
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
服务端的unsafe类是AbstractNioMessageChannel类
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
// 调用父类的doBeginRead方法
super.doBeginRead();
}
注册accept事件,epoll会对server的channel进行监听。若有事件准备完毕,epoll会将已经准备好的事件返回。
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
// SelectionKey是channel感兴趣的事件集合
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
// 表示read进行中
readPending = true;
// 获取感兴趣的事件集
final int interestOps = selectionKey.interestOps();
// 是否已经注册读取事件
if ((interestOps & readInterestOp) == 0) {
// 若没有注册,就往事件集注册read事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}
服务端没有read操作,那这个readInterestOp是啥?从AbstractNioChannel溯源。此类的构造器中对readInterestOp赋值,由构造器传入。
子类AbstractNioMessageChannel也是传入的参数,继续往子类查找
子类NioServerSocketChannel传入的是ACCEPT事件,事实上服务端的读取事件就是ACCEPT事件。
小结:
在bind操作时,一直都是主线程执行代码,直到在AbstractUnsafe.register方法中,执行注册操作时判断是否在事件循环中。inEventLoop是判断eventLoop的工作线程与当前线程是否一致,若一致直接执行register0操作,反之调用eventLoop的execute方法传入Runnable对象。
execute方法在SingleThreadEventExecutor实现,因为这个方法是JUC的Executor的接口方法,而只有STEE实现了方法。
wakesUpForTask:STEE中为恒定值true。
private void execute0(@Schedule Runnable task) {
ObjectUtil.checkNotNull(task, "task");
execute(task, wakesUpForTask(task));
}
immediate:根据上下文为true
private void execute(Runnable task, boolean immediate) {
// 由于当前是主线程,所以inEventLoop为false
boolean inEventLoop = inEventLoop();
// 往taskQueue添加任务
addTask(task);
// 此时为false
if (!inEventLoop) {
// 启动线程,核心方法
startThread();
// 当前事件执行器的状态为ST_SHUTDOWN、ST_TERMINATED时,说明执行器已经关闭
if (isShutdown()) {
boolean reject = false;
try {
// 从taskQueue中删除任务
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
// 任务删除成功,可以执行拒绝策略
if (reject) {
reject();
}
}
}
// 根据之前的源码得知addTaskWakesUp默认为false,所以if判断为true
if (!addTaskWakesUp && immediate) {
// eventLoop是NioEventLoop的对象,wakeup在此类重写。唤醒epoll返回准备好的事件
wakeup(inEventLoop);
}
}
STEE事件执行器使用state表示执行状态,有五种状态:未启动、已启动、关闭中、已关闭、已终止。state变量使用volatile修饰是为了保证变量在读写时都能获取到最新值。从JMM角度来说,volatile写操作使用store load屏障,也即全屏障。CPU先清空invalidate queue,让缓存行对应的变量值失效,然后更新变量值,放入store buffer,立即将buffer数据刷新到内存,同时将最新值发送给其他CPU的invalidate queue中。
startThread是核心方法,使用executor启动了事件循环的任务
private void startThread() {
// state是否为未启动状态
if (state == ST_NOT_STARTED) {
// 通过CAS将旧值ST_NOT_STARTED,改成新值ST_STARTED。若旧值不是未启动状态,更新失败返回false
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
// 启动线程
doStartThread();
success = true;
} finally {
if (!success) {
// 启动失败,将状态改成ST_NOT_STARTED
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
executor是juc的Executor对象,让当前的EventExecutor与当前线程映射
private void doStartThread() {
// 确保thread变量为空
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
// 将当前执行线程对象赋给thread变量。inEventLoop方法就是判断thread变量与当前执行线程是否相同
thread = Thread.currentThread();
// 线程是否被中断
if (interrupted) {
// 若被中断,就在当前执行线程中添加中断标志位。
thread.interrupt();
}
boolean success = false;
// 更新最近执行时间
updateLastExecutionTime();
try {
// 若使用this.run(),会调用到当前Runnable对象的run方法,造成无限递归调用,导致栈溢出异常
// 使用SingleThreadEventExecutor.this.run()明确调用SingleThreadEventExecutor对象的run方法
// run是抽象方法,由子类实现。
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally { // run方法执行结束,当前事件执行器同样结束
// for循环确保状态流转成功
for (;;) {
int oldState = state;
// state转成ST_SHUTTING_DOWN
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks. At this point the event loop
// is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
// graceful shutdown with quietPeriod.
for (;;) {
// 清空三个任务队列:tailTasks、taskQueue、scheduledTaskQueue
if (confirmShutdown()) {
break;
}
}
// Now we want to make sure no more tasks can be added from this point. This is
// achieved by switching the state. Any new tasks beyond this point will be rejected.
// state状态流转为ST_SHUTDOWN
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
break;
}
}
// We have the final set of tasks in the queue now, no more can be added, run all remaining.
// No need to loop here, this is the final pass.
// 再调用一次,清空队列中剩余任务
confirmShutdown();
} finally {
try {
// STEE是空方法,NioEventLoop重写方法,将selector关闭
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
// 清空ThreadLocal本地变量,防止内存溢出风险
FastThreadLocal.removeAll();
// state状态流转为ST_TERMINATED。本质上还是putIntVolatile方法原子性修改状态值
// 为啥不使用CAS操作?此状态是最后一个状态值,无论旧值是啥,直接设置成ST_TERMINATED即可,所以CAS没有必要
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
// threadLock是CountDownLatch对象,若调用了awaitTermination,当前执行器的线程会被阻塞
// 唤醒等待的线程
threadLock.countDown();
// 丢弃taskQueue中的任务,记录任务数,打印日志
int numUserTasks = drainTasks();
if (numUserTasks > 0 && logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + numUserTasks + ')');
}
// terminationFuture设置成功,触发监听器
terminationFuture.setSuccess(null);
}
}
}
}
});
}
run方法是子类实现的,那么子类是NioEventLoop
protected void run() {
// 记录调用select的次数
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// selectStrategy默认是DefaultSelectStrategy类,判断taskQueue或tailTasks是否有任务,若有执行selectNowSupplier.get()方法
// 此方法返回selectNow(),也即调用selector.selectNow(),epoll_wait不阻塞立即返回。
// 若没有任务,就返回SelectStrategy.SELECT
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE: // 跳过
continue;
case SelectStrategy.BUSY_WAIT: // 繁忙等待,无实现
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // 阻塞等待一定时间
// 获取scheduledTaskQueue下一个任务,得到其截至时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
// 若没有任务,返回-1
if (curDeadlineNanos == -1L) {
// 任务基本都是事件处理过程中添加的,若没有任务说明没有事件,可以无限期等待
curDeadlineNanos = NONE; // nothing on the calendar
}
// 设置下一次唤醒时间
nextWakeupNanos.set(curDeadlineNanos);
try {
// 确保taskQueue和tailTasks中没有任务,否则不会让select进入阻塞,下个轮次再执行select
if (!hasTasks()) {
// 若没有任务,调用select进入阻塞。若一直阻塞怎么办?其他线程给当前EventLoop添加任务时会唤醒epoll中断返回
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
// 表明已经唤醒,设置AWAKE。单线程运行,不会有多线程竞争nextWakeupNanos的情况。
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
// select出现异常,获取新的selector
rebuildSelector0();
// 重置次数清零
selectCnt = 0;
// Thread.sleep(1000)让线程暂停一段时间,避免for循环连续操作select时异常依然存在,导致CPU资源被过多占用
handleLoopException(e);
continue;
}
// select轮次增加
selectCnt++;
// 新轮次将cancelledKeys清空
cancelledKeys = 0;
needsToSelectAgain = false;
// 表示 I/O 操作和非 I/O 操作之间比例的参数
final int ioRatio = this.ioRatio;
// 是否已经运行任务
boolean ranTasks;
// 若ioRatio为100,表示所有时间片都给I/O操作
if (ioRatio == 100) {
try {
// strategy是已经准备好的事件数
if (strategy > 0) {
// 处理已经准备的事件
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
// 若按照ioRatio的定义,所有时间片都给IO操作,此处不会调用。那么非IO操作一直无法执行。
// 但是一直执行task,任务队列始终都有任务,IO操作就无法处理。所以非必要不讲ratio设置为true
ranTasks = runAllTasks();
}
} else if (strategy > 0) { // ioRatio非100,且有准备好的事件
// 记录IO操作开始时间
final long ioStartTime = System.nanoTime();
try {
// 处理IO操作
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// 计算IO操作处理时间
final long ioTime = System.nanoTime() - ioStartTime;
// 通过IO操作时间片和ioRatio计算非IO操作运行执行时间
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
// 无IO操作,只执行最小的任务数
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
// 若未执行任务,且准备好的事件数为0,且次数已经达到三次,记录日志
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
// 轮次清零
selectCnt = 0;
// 当select过早返回的次数达到一定阈值时,会重建selector对象,清楚无效的key,提高selector执行IO操作的效率
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) { // SelectionKey被取消
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
} finally {
// Always handle shutdown even if the loop processing threw an exception.
try {
// 若处于关闭状态,关闭selector
if (isShuttingDown()) {
closeAll();
// 将任务队列剩下的任务执行完
if (confirmShutdown()) {
return;
}
}
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
}
}
}
有个疑问:调用select如果一直阻塞怎么办?其他线程调用EventLoop的execute添加任务时,会顺便调用selector.select唤醒epoll返回。
run方法代码比较多,但核心代码就那些,更多的是异常处理。
我们先着重看下IO操作处理过程
SelectorImpl(Linux系统是EPollSelectorImpl)用HashSet存放epoll返回的事件集。HashSet本质上用数组存储元素,通过hash计算索引位置插入元素(HashSet维护HashMap,HashMap本质上就是数组)。哈希不冲突时添加和删除的时间复杂度都是O(1),但出现哈希冲突时复杂度变差,最坏情况下为O(N)也即需要遍历数组。Netty追求高性能,觉得可以对其优化。
如果SelectionKey被cancel,但Set未删除,在处理key会不会有问题?cancel时会把SelectionKey的valid标识设置为false,处理key时跳过key即可。
结合优化操作,再看run方法中的代码逻辑有更深一层的理解
private void processSelectedKeys() {
// SelectedSelectionKeySet对象,优化的Set集合,若此对象不为空,说明可以遍历优化的集合处理
if (selectedKeys != null) {
// 遍历优化的Set集合处理IO事件
processSelectedKeysOptimized();
} else {
// 使用正常的方式处理事件,获取selector内置的Set集合(HashSet对象)
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
// 遍历自定义的SelectedSelectionKeySet
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// help GC
selectedKeys.keys[i] = null;
// SelectionKey允许存放一个Object对象,好处在于不需要使用Map维护SelectionKey与附件的关系
final Object a = k.attachment();
// channel是Java原生的(SelectableChannel)还是 Netty自定义的(AbstractNioChannel)
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 当取消的SelectionKey达到256个时,needsToSelectAgain设为true
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 未遍历的全部清空
selectedKeys.reset(i + 1);
// 重新调用selector.select()获取事件集
selectAgain();
// 从头开始遍历
i = -1;
}
}
}
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
// Set为空时,调用iterator方法会创建Iterator对象,但此对象无用,增加内存和垃圾回收的负担,所以要先判断是否为空
if (selectedKeys.isEmpty()) {
return;
}
// 获取Iterator对象
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
// 删除key是为了避免重复处理。若使用for循环遍历集合,再做删除操作会出现ConcurrentModificationException异常
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// 没有key跳出循环
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
// 无法从头遍历,需要重新创建Iterator对象
i = selectedKeys.iterator();
}
}
}
}
NioTask是给开发者扩展的类,这里不阐述此类情况。只详细阐述AbstractNioChannel的事件如何处理?对事件分发
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 获取channel绑定的Unsafe对象
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
// 获取channel绑定的EventLoop对象
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
// 事件被取消,直接跳过
return;
}
try {
// 获取准备好的事件
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
// 是否有CONNECT事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
// 由于已经完成连接,此事件只会触发一次,又由于select在不阻塞时一直返回null,所以要删除此事件
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
// 完成connect后续操作
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
unsafe.forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// server端是ACCEPT事件,client端是READ事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
上面是IO操作的处理过程,非IO操作的过程如下,我们只看有超时时间的方法
protected boolean runAllTasks(long timeoutNanos) {
// 将schedule队列中已经超时的任务放入到taskQueue队列中
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
// 若任务为空,直接执行tailTasks中的任务
if (task == null) {
afterRunningAllTasks();
// 返回false说明没有执行全部的任务队列
return false;
}
// 计算截止时间
final long deadline = timeoutNanos > 0 ? getCurrentTimeNanos() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
// 执行任务
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
// 每执行64个任务判断是否到了截止时间,若已经超时就结束执行。为啥这样操作?因为频繁调用nanoTime()影响性能
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = getCurrentTimeNanos();
if (lastExecutionTime >= deadline) {
break;
}
}
// 从taskQueue中取出任务
task = pollTask();
// 任务为空结束执行
if (task == null) {
lastExecutionTime = getCurrentTimeNanos();
break;
}
}
// 执行tailTasks中的任务
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
// 返回true说明执行了全部队列中的任务
return true;
}
小结:事件循环就是循环调用select并处理事件集