我们知道netty的启动是需要实力话两个EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
EventLoopGroup 继承EventExecutorGroup , EventExecutorGroup 就是一个线程池,继承了线程包中的ScheduledExecutorService,我们都知道,改接口有定时执行的功能
它本身继承了4个重要接口,以及它们各自的功能:
它优化了JDK提供的关于中断的方法:标记过时(shutdown,shutdownNow),并且新加了几个方法
boolean isShuttingDown();
Future<?> shutdownGracefully();
Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);
增加了一个中断状态监听的方法
Future<?> terminationFuture()。注意这个Future是netty的.
可以在这个future上面增加一些监听器(addListener),然后当中断的时候就会触发。
增加了next方法。返回的是由它管理的EventExecutor。这个组的味道已经出现了。这个也是后面实现轮询内部成员的一个关键方法。
这个接口的功能就是:线程池,管理一组EventExecutor,调度任务,中断监听。
public interface EventLoopGroup extends EventExecutorGroup {
/**
* Return the next {@link EventLoop} to use
* next 方法是把指针指向下一个线程,因为netty基于NIO,NIO之所以非阻塞,是因为用到了ForkJoinPool这种线程池,这种线程池
* 支持线程任务分发然后合并,任务盗取等任务,有兴趣的同学可以去专门了解下java的并非变成ForkJoinPool
* next 就是本线程线程绝对该任务可以发给下一个线程处理,这个过程就是netty的事件循环组的意义
*/
@Override
EventLoop next();
/**
* Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
* will get notified once the registration was complete.
*/
ChannelFuture register(Channel channel);
/**
* Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
* {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
*/
ChannelFuture register(ChannelPromise promise);
/**
* Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
* will get notified once the registration was complete and also will get returned.
*
* @deprecated Use {@link #register(ChannelPromise)} instead.
*/
@Deprecated
ChannelFuture register(Channel channel, ChannelPromise promise);
}
这个接口负责提供EventExecutor (事件执行器) ,用什么来提供,用next方法提供(指向下一个线程去执行),并且可以优雅的关闭.
这个类没有任何的实现,仅仅只是把所有的提交任务的方法做了一下默认实现:就是调用一些next()方法轮询出来一个Executor然后去执行,比如:
@Override
public void execute(Runnable command) {
next().execute(command);
}
其它的所有方法都没有实现,也确实挺抽象的。
不过这个抽象类的作用还是非常大的,之前在分析组和成员关系的时候,组也具备了一些成员关系的功能,但是它的执行就是通过成员去完成的,而且这些功能本身也确实不需要组去关心的。而netty就是把这些功能放在了这个类里面,也放的特别合适,它没有实现任何有关于的组的管理功能,实际上就是交给子类去完成了,而子类在实现管理功能的时候,也就不需要关心这些功能了(提交任务),父类都已经实现了。
听名字就可以听出来一些味道了:多线程事件执行器组。
3个关键词:多线程,事件执行器,组。
这个里面才是真正实现了组的管理功能,看一下它内部所有的属性和方法:
private final EventExecutor[] children;
private final Set<EventExecutor> readonlyChildren;
private final AtomicInteger terminatedChildren;
private final Promise<?> terminationFuture;
private final EventExecutorChooser chooser;
先看一下内部属性吧:
关于它的构造器和一个抽象方法,我觉得也是比较有趣的点。
它的所有的构造器都是protected的,说明不是对外开放的,由子类去调用的。
最有趣的在于构造器的最后一个参数居然是:Object… args。在构造器里面,这种写法确实还是比较少见的。
这个类仅有一个抽象方法:
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
就是在初始化成员的时候需要调用的方法,而且它的最后一个参数也是 Object… args。其实构造器的对象数组的入参和这个入参是相呼应的,就是同一个
这个构造器当时在定义的时候,开发人员估计费了不少的心思啊,哈哈。
那为什么要这样写呢?那肯定是为了复用,复用这些组的管理功能。它所管理的Executor到底需要哪些参数,以及如何来创建。它不关心,它也没法关心;所以留到了子类,同时它希望对于具体的子类而已,那个对象数组不能暴露给框架的使用人员,希望他们具体化,不能这么抽象的使用,所以构造器全部protected。
这个类非常简单,提供了3个构造器,实现了newChild方法,并且明确了成员就是DefaultEventExecutor。
当我学习到这边的时候,就基本非常明确了,它就是一个线程池,我第一个想到的就是和ThreadPoolExecutor进行比较,之前的文章以及比较过了。但是在比较之前,想了一下,我好像还并不知道任务是怎么执行的,也就是EventExecutor的具体实现了。
事件执行器EventExecutor 事件执行器,也是一个借口,其中一个抽象类
AbstractScheduledEventExecutor ,支持了scheuduling(线程的定时调度,或者定时执行)
SingleThreadEventExecutor 继承AbstractScheduledEventExecutor ,实现OrderedEventExecutor接口,任务可以顺心执行,
其实我们看源码发现OrderedEventExecutor 就是一个空的接口,但是怎么保证线程能按照顺序执行,‘
private final Queue taskQueue; 定义了一个taskQueue,
netty提供的一个默认实现DefaultEventExecutor ,继承了SingleThreadEventExecutor, 里面的任务执行方法run
它继承了EventExecutorGroup,说明它也拥有上面说的所有功能。
重点关注一下,它新定义的方法吧:
它基本上新定义了3类接口:
@Override
protected void run() {
for (;;) {
Runnable task = takeTask();
if (task != null) {
task.run();
updateLastExecutionTime();
}
if (confirmShutdown()) {
break;
}
}
}
执行方式,循环一直跑,只要taskTask()里面有任务,就拿出来执行,如果confirmShutdown()就跳出循环,这个地方大家一定理解是单线程,因为是run方法
这个类一方面实现了EventExecutor,另一方也继承了JDK提供的AbstractExecutorService。后面这个类更多的是提供提交一些任务的默认实现,也没有做具体的业务实现。
AbstractEventExecutor它本身也没有做太多事情,只是把接口和抽象类整合在了一起。稍微梳理一下吧:
parent:定义了父对象。
selfCollection:返回一个属于自己的迭代器。
promise,future的方法都做了实现,如下:
关于定时任务的,都直接抛了异常,不支持。
提供了一个EventExecutorGroup的一个构造器。
看这个名字基本也能猜出来,它实现了调度任务:
内部有一个优先级队列,最早执行的会放在队首,每次提交任务的时候会进行调整。它这个不是线程安全的队列。因此它在添加调度任务的时候,如果不是事件循环线程的话,会提交一个新的普通任务取提交任务,保证线程安全:
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduleFromEventLoop(task);
} else {
final long deadlineNanos = task.deadlineNanos();
// task will add itself to scheduled task queue when run if not expired
if (beforeScheduledTaskSubmitted(deadlineNanos)) {
execute(task);
} else {
lazyExecute(task);
// Second hook after scheduling to facilitate race-avoidance
if (afterScheduledTaskSubmitted(deadlineNanos)) {
execute(WAKEUP_TASK);
}
}
}
return task;
}
所有被提交的调度任务都会被封装成ScheduledFutureTask,这个对象里面有几个关键点:
当这个类被内存加载的时候,会生成当前时间的一个纳秒时间戳。以后所有的时间计算都会以它作为开始时间的标准,比如获取当前纳米戳=获取当前纳秒时间戳的然后减去开始时间就是当前时间:
private static final long START_TIME = System.nanoTime();
static long nanoTime() {
return System.nanoTime() - START_TIME;
}
deadlineNanos:这个任务应该被执行的纳秒时间戳。都是通过比较这个属性值来判断当前任务是不是到时间去执行了。
periodNanos:调度任务的执行周期。为0的话,代表只执行一次。否则,就认为是周期不断执行的任务,间隔就是periodNanos。
关键在于它的run方法,解决了如何执行周期任务。当一个周期任务执行完了,它会把periodNanos加到deadlineNanos上面取,作为新的执行时间,然后重新加入队列。然后就可以再执行了:
@Override
public void run() {
assert executor().inEventLoop();
try {
if (delayNanos() > 0L) {
// Not yet expired, need to add or remove from queue
if (isCancelled()) {
scheduledExecutor().scheduledTaskQueue().removeTyped(this);
} else {
scheduledExecutor().scheduleFromEventLoop(this);
}
return;
}
if (periodNanos == 0) {
if (setUncancellableInternal()) {
V result = runTask();
setSuccessInternal(result);
}
} else {
// check if is done as it may was cancelled
if (!isCancelled()) {
runTask();
if (!executor().isShutdown()) {
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) {
scheduledExecutor().scheduledTaskQueue().add(this);
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}
单线程事件执行器,意思以及很明确了,用一个线程去执行所有的事件。
这个类的属性和方法有点多,生成的图不好看,IDEA也不能删除其中几个。那就没有图了,调几个重要的:
private final Queue<Runnable> taskQueue;
private volatile Thread thread;
private final boolean addTaskWakesUp;
private final int maxPendingTasks;
简单说一下这几个属性吧:
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
重点可以看一下后面两行,它必须为true,才会去执行wakeUp方法。所以对于它的描述,实现是看不明白。不解释了,有明白的可以分享一下。
maxPendingTasks:队列的最大容量。
从功能上来讲,这个类主要做了这么几件事情:
任务队的创建。
循环线程的启动,当提交第一个任务的时候就会启动线程:
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
}
//其它的省略一下
}
});
}
启动以后,它重点就做了一个事情,执行run方法。这是个抽象方法。也就是真正的循环体这边也没有实现。
任务的提交。实现了execute方法,这也是非常关键的方法。
提供了一些从队列里面取任务的方法,供子类用,毕竟它定义队列成private了,比如:
非常简单不分析了
组和成员之间的关系有如下特点:
线程池就不多说大家比较熟悉,可以查看这个线程池文章《08_线程池,居然帮助帮朋友面试过了》
图看不清看下面的连接:
https://www.processon.com/view/link/62b698da5653bb10419639fb