• 解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-下


    接上文解决Netty那些事儿之Reactor在Netty中的实现(创建篇)-上

    Netty对JDK NIO 原生Selector的优化

    首先在NioEventLoop中有一个Selector优化开关DISABLE_KEY_SET_OPTIMIZATION,通过系统变量-D io.netty.noKeySetOptimization指定,默认是开启的,表示需要对JDK NIO原生Selector进行优化。

    1. public final class NioEventLoop extends SingleThreadEventLoop {
    2.    //Selector优化开关 默认开启 为了遍历的效率 会对Selector中的SelectedKeys进行数据结构优化
    3.     private static final boolean DISABLE_KEY_SET_OPTIMIZATION =
    4.             SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization"false);
    5. }

    如果优化开关DISABLE_KEY_SET_OPTIMIZATION是关闭的,那么直接返回JDK NIO原生的Selector

    1. private SelectorTuple openSelector() {
    2.         ..........SelectorProvider创建JDK NIO  原生Selector..............
    3.         if (DISABLE_KEY_SET_OPTIMIZATION) {
    4.             //JDK NIO原生Selector ,Selector优化开关 默认开启需要对Selector进行优化
    5.             return new SelectorTuple(unwrappedSelector);
    6.         }
    7. }

    下面为Netty对JDK NIO原生的Selector的优化过程:

    1. 获取JDK NIO原生Selector的抽象实现类sun.nio.ch.SelectorImplJDK NIO原生Selector的实现均继承于该抽象类。用于判断由SelectorProvider创建出来的Selector是否为JDK默认实现SelectorProvider第三种加载方式)。因为SelectorProvider可以是自定义加载,所以它创建出来的Selector并不一定是JDK NIO 原生的。

    1.        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
    2.             @Override
    3.             public Object run() {
    4.                 try {
    5.                     return Class.forName(
    6.                             "sun.nio.ch.SelectorImpl",
    7.                             false,
    8.                             PlatformDependent.getSystemClassLoader());
    9.                 } catch (Throwable cause) {
    10.                     return cause;
    11.                 }
    12.             }
    13.         });

    JDK NIO Selector的抽象类sun.nio.ch.SelectorImpl

    1. public abstract class SelectorImpl extends AbstractSelector {
    2.     // The set of keys with data ready for an operation
    3.     // //IO就绪的SelectionKey(里面包裹着channel)
    4.     protected Set<SelectionKey> selectedKeys;
    5.     // The set of keys registered with this Selector
    6.     //注册在该Selector上的所有SelectionKey(里面包裹着channel)
    7.     protected HashSet<SelectionKey> keys;
    8.     // Public views of the key sets
    9.     //用于向调用线程返回的keys,不可变
    10.     private Set<SelectionKey> publicKeys;             // Immutable
    11.     //当有IO就绪的SelectionKey时,向调用线程返回。只可删除其中元素,不可增加
    12.     private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
    13.     protected SelectorImpl(SelectorProvider sp) {
    14.         super(sp);
    15.         keys = new HashSet<SelectionKey>();
    16.         selectedKeys = new HashSet<SelectionKey>();
    17.         if (Util.atBugLevel("1.4")) {
    18.             publicKeys = keys;
    19.             publicSelectedKeys = selectedKeys;
    20.         } else {
    21.             //不可变
    22.             publicKeys = Collections.unmodifiableSet(keys);
    23.             //只可删除其中元素,不可增加
    24.             publicSelectedKeys = Util.ungrowableSet(selectedKeys);
    25.         }
    26.     }
    27. }

    这里笔者来简单介绍下JDK NIO中的Selector中这几个字段的含义,我们可以和上篇文章讲到的epoll在内核中的结构做类比,方便大家后续的理解:

    image

    • Set selectedKeys 类似于我们上篇文章讲解Epoll时提到的就绪队列eventpoll->rdllistSelector这里大家可以理解为EpollSelector会将自己监听到的IO就绪Channel放到selectedKeys中。

    这里的SelectionKey暂且可以理解为ChannelSelector中的表示,类比上图中epitem结构里的epoll_event,封装IO就绪Socket的信息。其实SelectionKey里包含的信息不止是Channel还有很多IO相关的信息。后面我们在详细介绍。

    • HashSet keys:这里存放的是所有注册到该Selector上的Channel。类比epoll中的红黑树结构rb_root

    SelectionKeyChannel注册到Selector中后生成。

    • Set publicSelectedKeys 相当于是selectedKeys的视图,用于向外部线程返回IO就绪SelectionKey。这个集合在外部线程中只能做删除操作不可增加元素,并且不是线程安全的

    • Set publicKeys相当于keys的不可变视图,用于向外部线程返回所有注册在该Selector上的SelectionKey

    这里需要重点关注抽象类sun.nio.ch.SelectorImpl中的selectedKeyspublicSelectedKeys这两个字段,注意它们的类型都是HashSet,一会优化的就是这里!!!!

    1. 判断由SelectorProvider创建出来的Selector是否是JDK NIO原生的Selector实现。因为Netty优化针对的是JDK NIO 原生Selector。判断标准为sun.nio.ch.SelectorImpl类是否为SelectorProvider创建出Selector的父类。如果不是则直接返回。不在继续下面的优化过程。

    1.         //判断是否可以对Selector进行优化,这里主要针对JDK NIO原生Selector的实现类进行优化,因为SelectorProvider可以加载的是自定义Selector实现
    2.         //如果SelectorProvider创建的Selector不是JDK原生sun.nio.ch.SelectorImpl的实现类,那么无法进行优化,直接返回
    3.         if (!(maybeSelectorImplClass instanceof Class) ||
    4.             !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
    5.             if (maybeSelectorImplClass instanceof Throwable) {
    6.                 Throwable t = (Throwable) maybeSelectorImplClass;
    7.                 logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
    8.             }
    9.             return new SelectorTuple(unwrappedSelector);
    10.         }

    通过前面对SelectorProvider的介绍我们知道,这里通过provider.openSelector()创建出来的Selector实现类为KQueueSelectorImpl类,它继承实现了sun.nio.ch.SelectorImpl,所以它是JDK NIO 原生的Selector实现

    1. class KQueueSelectorImpl extends SelectorImpl {
    2. }
    1. 创建SelectedSelectionKeySet通过反射替换掉sun.nio.ch.SelectorImpl类selectedKeyspublicSelectedKeys的默认HashSet实现。

    为什么要用SelectedSelectionKeySet替换掉原来的HashSet呢??

    因为这里涉及到对HashSet类型sun.nio.ch.SelectorImpl#selectedKeys集合的两种操作:

    • 插入操作: 通过前边对sun.nio.ch.SelectorImpl类中字段的介绍我们知道,在Selector监听到IO就绪SelectionKey后,会将IO就绪SelectionKey插入sun.nio.ch.SelectorImpl#selectedKeys集合中,这时Reactor线程会从java.nio.channels.Selector#select(long)阻塞调用中返回(类似上篇文章提到的epoll_wait)。

    • 遍历操作:Reactor线程返回后,会从Selector中获取IO就绪SelectionKey集合(也就是sun.nio.ch.SelectorImpl#selectedKeys),Reactor线程遍历selectedKeys,获取IO就绪SocketChannel,并处理SocketChannel上的IO事件

    我们都知道HashSet底层数据结构是一个哈希表,由于Hash冲突这种情况的存在,所以导致对哈希表进行插入遍历操作的性能不如对数组进行插入遍历操作的性能好。

    还有一个重要原因是,数组可以利用CPU缓存的优势来提高遍历的效率。后面笔者会有一篇专门的文章来讲述利用CPU缓存行如何为我们带来性能优势。

    所以Netty为了优化对sun.nio.ch.SelectorImpl#selectedKeys集合的插入,遍历性能,自己用数组这种数据结构实现了SelectedSelectionKeySet,用它来替换原来的HashSet实现。

      资料直通车:Linux内核源码技术学习路线+视频教程内核源码

    学习直通车:Linux内核源码内存调优文件系统进程管理设备驱动/网络协议栈

    SelectedSelectionKeySet

    • 初始化SelectionKey[] keys数组大小为1024,当数组容量不够时,扩容为原来的两倍大小。

    • 通过数组尾部指针size,在向数组插入元素的时候可以直接定位到插入位置keys[size++]。操作一步到位,不用像哈希表那样还需要解决Hash冲突

    • 对数组的遍历操作也是如丝般顺滑,CPU直接可以在缓存行中遍历读取数组元素无需访问内存。比HashSet的迭代器java.util.HashMap.KeyIterator 遍历方式性能不知高到哪里去了。

    1. final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {
    2.     //采用数组替换到JDK中的HashSet,这样add操作和遍历操作效率更高,不需要考虑hash冲突
    3.     SelectionKey[] keys;
    4.     //数组尾部指针
    5.     int size;
    6.     SelectedSelectionKeySet() {
    7.         keys = new SelectionKey[1024];
    8.     }
    9.     /**
    10.      * 数组的添加效率高于 HashSet 因为不需要考虑hash冲突
    11.      * */
    12.     @Override
    13.     public boolean add(SelectionKey o) {
    14.         if (o == null) {
    15.             return false;
    16.         }
    17.         //时间复杂度O(1
    18.         keys[size++= o;
    19.         if (size == keys.length) {
    20.             //扩容为原来的两倍大小
    21.             increaseCapacity();
    22.         }
    23.         return true;
    24.     }
    25.     private void increaseCapacity() {
    26.         SelectionKey[] newKeys = new SelectionKey[keys.length << 1];
    27.         System.arraycopy(keys, 0, newKeys, 0size);
    28.         keys = newKeys;
    29.     }
    30.     /**
    31.      * 采用数组的遍历效率 高于 HashSet
    32.      * */
    33.     @Override
    34.     public Iterator<SelectionKey> iterator() {
    35.         return new Iterator<SelectionKey>() {
    36.             private int idx;
    37.             @Override
    38.             public boolean hasNext() {
    39.                 return idx < size;
    40.             }
    41.             @Override
    42.             public SelectionKey next() {
    43.                 if (!hasNext()) {
    44.                     throw new NoSuchElementException();
    45.                 }
    46.                 return keys[idx++];
    47.             }
    48.             @Override
    49.             public void remove() {
    50.                 throw new UnsupportedOperationException();
    51.             }
    52.         };
    53.     }
    54. }

    看到这里不禁感叹,从各种小的细节可以看出Netty对性能的优化简直淋漓尽致,对性能的追求令人发指。细节真的是魔鬼。

    1. Netty通过反射的方式用SelectedSelectionKeySet替换掉sun.nio.ch.SelectorImpl#selectedKeyssun.nio.ch.SelectorImpl#publicSelectedKeys这两个集合中原来HashSet的实现。

    • 反射获取sun.nio.ch.SelectorImpl类中selectedKeyspublicSelectedKeys

    1.   Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    2.   Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    • Java9版本以上通过sun.misc.Unsafe设置字段值的方式

    1.        if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
    2.                         long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
    3.                         long publicSelectedKeysFieldOffset =
    4.                                 PlatformDependent.objectFieldOffset(publicSelectedKeysField);
    5.                         if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
    6.                             PlatformDependent.putObject(
    7.                                     unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
    8.                             PlatformDependent.putObject(
    9.                                     unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
    10.                             return null;
    11.                         }
    12.                         
    13.                     }
    • 通过反射的方式用SelectedSelectionKeySet替换掉hashSet实现的sun.nio.ch.SelectorImpl#selectedKeys,sun.nio.ch.SelectorImpl#publicSelectedKeys

    1.           Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
    2.           if (cause != null) {
    3.                 return cause;
    4.           }
    5.           cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
    6.           if (cause != null) {
    7.                 return cause;
    8.           }
    9.           //Java8反射替换字段
    10.           selectedKeysField.set(unwrappedSelector, selectedKeySet);
    11.           publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
    1. 将与sun.nio.ch.SelectorImpl类中selectedKeyspublicSelectedKeys关联好的Netty优化实现SelectedSelectionKeySet,设置到io.netty.channel.nio.NioEventLoop#selectedKeys字段中保存。

    1.    //会通过反射替换selector对象中的selectedKeySet保存就绪的selectKey
    2.     //该字段为持有selector对象selectedKeys的引用,当IO事件就绪时,直接从这里获取
    3.     private SelectedSelectionKeySet selectedKeys;

    后续Reactor线程就会直接从io.netty.channel.nio.NioEventLoop#selectedKeys中获取IO就绪SocketChannel

    1. SelectorTuple封装unwrappedSelectorwrappedSelector返回给NioEventLoop构造函数。到此Reactor中的Selector就创建完毕了。

    1. return new SelectorTuple(unwrappedSelector,
    2.                       new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    1.     private static final class SelectorTuple {
    2.         final Selector unwrappedSelector;
    3.         final Selector selector;
    4.         SelectorTuple(Selector unwrappedSelector) {
    5.             this.unwrappedSelector = unwrappedSelector;
    6.             this.selector = unwrappedSelector;
    7.         }
    8.         SelectorTuple(Selector unwrappedSelector, Selector selector) {
    9.             this.unwrappedSelector = unwrappedSelector;
    10.             this.selector = selector;
    11.         }
    12.     }
    • 所谓的unwrappedSelector是指被Netty优化过的JDK NIO原生Selector。

    • 所谓的wrappedSelector就是用SelectedSelectionKeySetSelector装饰类将unwrappedSelector和与sun.nio.ch.SelectorImpl类关联好的Netty优化实现SelectedSelectionKeySet封装装饰起来。

    wrappedSelector会将所有对Selector的操作全部代理给unwrappedSelector,并在发起轮询IO事件的相关操作中,重置SelectedSelectionKeySet清空上一次的轮询结果。

    1. final class SelectedSelectionKeySetSelector extends Selector {
    2.     //Netty优化后的 SelectedKey就绪集合
    3.     private final SelectedSelectionKeySet selectionKeys;
    4.     //优化后的JDK NIO 原生Selector
    5.     private final Selector delegate;
    6.     SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
    7.         this.delegate = delegate;
    8.         this.selectionKeys = selectionKeys;
    9.     }
    10.     @Override
    11.     public boolean isOpen() {
    12.         return delegate.isOpen();
    13.     }
    14.     @Override
    15.     public SelectorProvider provider() {
    16.         return delegate.provider();
    17.     }
    18.     @Override
    19.     public Set keys() {
    20.         return delegate.keys();
    21.     }
    22.     @Override
    23.     public Set selectedKeys() {
    24.         return delegate.selectedKeys();
    25.     }
    26.     @Override
    27.     public int selectNow() throws IOException {
    28.         //重置SelectedKeys集合
    29.         selectionKeys.reset();
    30.         return delegate.selectNow();
    31.     }
    32.     @Override
    33.     public int select(long timeout) throws IOException {
    34.         //重置SelectedKeys集合
    35.         selectionKeys.reset();
    36.         return delegate.select(timeout);
    37.     }
    38.     @Override
    39.     public int select() throws IOException {
    40.         //重置SelectedKeys集合
    41.         selectionKeys.reset();
    42.         return delegate.select();
    43.     }
    44.     @Override
    45.     public Selector wakeup() {
    46.         return delegate.wakeup();
    47.     }
    48.     @Override
    49.     public void close() throws IOException {
    50.         delegate.close();
    51.     }
    52. }

    到这里Reactor的核心Selector就创建好了,下面我们来看下用于保存异步任务的队列是如何创建出来的。

     

    newTaskQueue

    1.     NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
    2.                  SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
    3.                  EventLoopTaskQueueFactory queueFactory) {
    4.         super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
    5.                 rejectedExecutionHandler);
    6.         this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
    7.         this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
    8.         final SelectorTuple selectorTuple = openSelector();
    9.         //通过用SelectedSelectionKeySet装饰后的unwrappedSelector
    10.         this.selector = selectorTuple.selector;
    11.         //Netty优化过的JDK NIO远程Selector
    12.         this.unwrappedSelector = selectorTuple.unwrappedSelector;
    13.     }

    我们继续回到创建Reactor的主线上,到目前为止Reactor的核心Selector就创建好了,前边我们提到Reactor除了需要监听IO就绪事件以及处理IO就绪事件外,还需要执行一些异步任务,当外部线程向Reactor提交异步任务后,Reactor就需要一个队列来保存这些异步任务,等待Reactor线程执行。

    下面我们来看下Reactor中任务队列的创建过程:

    1.     //任务队列大小,默认是无界队列
    2.     protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
    3.             SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    4.     private static Queue<Runnable> newTaskQueue(
    5.             EventLoopTaskQueueFactory queueFactory) {
    6.         if (queueFactory == null) {
    7.             return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
    8.         }
    9.         return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    10.     }
    11.     private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
    12.         // This event loop never calls takeTask()
    13.         return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
    14.                 : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
    15.     }  
    • NioEventLoop的父类SingleThreadEventLoop中提供了一个静态变量DEFAULT_MAX_PENDING_TASKS用来指定Reactor任务队列的大小。可以通过系统变量-D io.netty.eventLoop.maxPendingTasks进行设置,默认为Integer.MAX_VALUE,表示任务队列默认为无界队列

    • 根据DEFAULT_MAX_PENDING_TASKS变量的设定,来决定创建无界任务队列还是有界任务队列。

    1.     //创建无界任务队列
    2.     PlatformDependent.<Runnable>newMpscQueue()
    3.     //创建有界任务队列
    4.     PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks)
    5.     public static <T> Queue<T> newMpscQueue() {
    6.         return Mpsc.newMpscQueue();
    7.     }
    8.     public static <T> Queue<T> newMpscQueue(final int maxCapacity) {
    9.         return Mpsc.newMpscQueue(maxCapacity);
    10.     }

    Reactor内的异步任务队列的类型为MpscQueue,它是由JCTools提供的一个高性能无锁队列,从命名前缀Mpsc可以看出,它适用于多生产者单消费者的场景,它支持多个生产者线程安全的访问队列,同一时刻只允许一个消费者线程读取队列中的元素。

    我们知道Netty中的Reactor可以线程安全的处理注册其上的多个SocketChannel上的IO数据,保证Reactor线程安全的核心原因正是因为这个MpscQueue,它可以支持多个业务线程在处理完业务逻辑后,线程安全的向MpscQueue添加异步写任务,然后由单个Reactor线程来执行这些写任务。既然是单线程执行,那肯定是线程安全的了。

    Reactor对应的NioEventLoop类型继承结构

    image.png

    NioEventLoop的继承结构也是比较复杂,这里我们只关注在Reactor创建过程中涉及的到两个父类SingleThreadEventLoop,SingleThreadEventExecutor

    剩下的继承体系,我们在后边随着Netty源码的深入在慢慢介绍。

    前边我们提到,其实Reactor我们可以看作是一个单线程的线程池,只有一个线程用来执行IO就绪事件的监听IO事件的处理异步任务的执行。用MpscQueue来存储待执行的异步任务。

    命名前缀为SingleThread的父类都是对Reactor这些行为的分层定义。也是本小节要介绍的对象

    SingleThreadEventLoop

    Reactor负责执行的异步任务分为三类:

    • 普通任务:这是Netty最主要执行的异步任务,存放在普通任务队列taskQueue中。在NioEventLoop构造函数中创建。

    • 定时任务: 存放在优先级队列中。后续我们介绍。

    • 尾部任务: 存放于尾部任务队列tailTasks中,尾部任务一般不常用,在普通任务执行完后 Reactor线程会执行尾部任务。**使用场景:**比如对Netty 的运行状态做一些统计数据,例如任务循环的耗时、占用物理内存的大小等等都可以向尾部队列添加一个收尾任务完成统计数据的实时更新。

    SingleThreadEventLoop负责对尾部任务队列tailTasks进行管理。并且提供ChannelReactor注册的行为。

    1. public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {
    2.     //任务队列大小,默认是无界队列
    3.     protected static final int DEFAULT_MAX_PENDING_TASKS = Math.max(16,
    4.             SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE));
    5.     
    6.     //尾部任务队列
    7.     private final Queue tailTasks;
    8.     protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
    9.                                     boolean addTaskWakesUp, Queue taskQueue, Queue tailTaskQueue,
    10.                                     RejectedExecutionHandler rejectedExecutionHandler) {
    11.         super(parent, executor, addTaskWakesUp, taskQueue, rejectedExecutionHandler);
    12.         //尾部队列 执行一些统计任务 不常用
    13.         tailTasks = ObjectUtil.checkNotNull(tailTaskQueue, "tailTaskQueue");
    14.     }
    15.     @Override
    16.     public ChannelFuture register(Channel channel) {
    17.         //注册channel到绑定的Reactor上
    18.         return register(new DefaultChannelPromise(channel, this));
    19.     }
    20. }

    SingleThreadEventExecutor

    SingleThreadEventExecutor主要负责对普通任务队列的管理,以及异步任务的执行Reactor线程的启停

    1. public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
    2.     protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
    3.                                         boolean addTaskWakesUp, Queue taskQueue, RejectedExecutionHandler rejectedHandler) {
    4.         //parent为Reactor所属的NioEventLoopGroup Reactor线程组
    5.         super(parent);
    6.         //向Reactor添加任务时,是否唤醒Selector停止轮询IO就绪事件,马上执行异步任务
    7.         this.addTaskWakesUp = addTaskWakesUp;
    8.         //Reactor异步任务队列的大小
    9.         this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
    10.         //用于启动Reactor线程的executor -> ThreadPerTaskExecutor
    11.         this.executor = ThreadExecutorMap.apply(executor, this);
    12.         //普通任务队列
    13.         this.taskQueue = ObjectUtil.checkNotNull(taskQueue, "taskQueue");
    14.         //任务队列满时的拒绝策略
    15.         this.rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
    16.     }
    17. }

    到现在为止,一个完整的Reactor架构就被创建出来了。

    Reactor结构.png

    3. 创建Channel到Reactor的绑定策略

    到这一步,Reactor线程组NioEventLoopGroup里边的所有Reactor就已经全部创建完毕。

    无论是Netty服务端NioServerSocketChannel关注的OP_ACCEPT事件也好,还是Netty客户端NioSocketChannel关注的OP_READOP_WRITE事件也好,都需要先注册到Reactor上,Reactor才能监听Channel上关注的IO事件实现IO多路复用

    NioEventLoopGroup(Reactor线程组)里边有众多的Reactor,那么以上提到的这些Channel究竟应该注册到哪个Reactor上呢?这就需要一个绑定的策略来平均分配。

    还记得我们前边介绍MultithreadEventExecutorGroup类的时候提到的构造器参数EventExecutorChooserFactory吗?

    这时候它就派上用场了,它主要用来创建ChannelReactor的绑定策略。默认为DefaultEventExecutorChooserFactory.INSTANCE

    1. public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
    2.    //从Reactor集合中选择一个特定的Reactor的绑定策略 用于channel注册绑定到一个固定的Reactor上
    3.     private final EventExecutorChooserFactory.EventExecutorChooser chooser;
    4.     chooser = chooserFactory.newChooser(children);
    5. }

    下面我们来看下具体的绑定策略:

    DefaultEventExecutorChooserFactory

    1. public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    2.     public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
    3.     private DefaultEventExecutorChooserFactory() { }
    4.     @Override
    5.     public EventExecutorChooser newChooser(EventExecutor[] executors) {
    6.         if (isPowerOfTwo(executors.length)) {
    7.             return new PowerOfTwoEventExecutorChooser(executors);
    8.         } else {
    9.             return new GenericEventExecutorChooser(executors);
    10.         }
    11.     }
    12.     private static boolean isPowerOfTwo(int val) {
    13.         return (val & -val) == val;
    14.     }
    15.     ...................省略.................
    16. }

    我们看到在newChooser方法绑定策略有两个分支,不同之处在于需要判断Reactor线程组中的Reactor个数是否为2的次幂

    Netty中的绑定策略就是采用round-robin轮询的方式来挨个选择Reactor进行绑定。

    采用round-robin的方式进行负载均衡,我们一般会用round % reactor.length取余的方式来挨个平均的定位到对应的Reactor上。

    如果Reactor的个数reactor.length恰好是2的次幂,那么就可以用位操作&运算round & reactor.length -1来代替%运算round % reactor.length,因为位运算的性能更高。具体为什么&运算能够代替%运算,笔者会在后面讲述时间轮的时候为大家详细证明,这里大家只需记住这个公式,我们还是聚焦本文的主线。

    了解了优化原理,我们在看代码实现就很容易理解了。

    利用%运算的方式Math.abs(idx.getAndIncrement() % executors.length)来进行绑定。

    1.     private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    2.         private final AtomicLong idx = new AtomicLong();
    3.         private final EventExecutor[] executors;
    4.         GenericEventExecutorChooser(EventExecutor[] executors) {
    5.             this.executors = executors;
    6.         }
    7.         @Override
    8.         public EventExecutor next() {
    9.             return executors[(int) Math.abs(idx.getAndIncrement() % executors.length)];
    10.         }
    11.     }

    利用&运算的方式idx.getAndIncrement() & executors.length - 1来进行绑定。

    1.     private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    2.         private final AtomicInteger idx = new AtomicInteger();
    3.         private final EventExecutor[] executors;
    4.         PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
    5.             this.executors = executors;
    6.         }
    7.         @Override
    8.         public EventExecutor next() {
    9.             return executors[idx.getAndIncrement() & executors.length - 1];
    10.         }
    11.     }

    又一次被Netty对性能的极致追求所折服~~~~

    4. 向Reactor线程组中所有的Reactor注册terminated回调函数

    当Reactor线程组NioEventLoopGroup中所有的Reactor已经创建完毕,ChannelReactor的绑定策略也创建完毕后,我们就来到了创建NioEventGroup的最后一步。

    俗话说的好,有创建就有启动,有启动就有关闭,这里会创建Reactor关闭的回调函数terminationListener,在Reactor关闭时回调。

    terminationListener回调的逻辑很简单:

    • 通过AtomicInteger terminatedChildren变量记录已经关闭的Reactor个数,用来判断NioEventLoopGroup中的Reactor是否已经全部关闭。

    • 如果所有Reactor均已关闭,设置NioEventLoopGroup中的terminationFuturesuccess。表示Reactor线程组关闭成功。

    1.        //记录关闭的Reactor个数,当Reactor全部关闭后,才可以认为关闭成功
    2.         private final AtomicInteger terminatedChildren = new AtomicInteger();
    3.         //关闭future
    4.         private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    5.         final FutureListener<Object> terminationListener = new FutureListener<Object>() {
    6.             @Override
    7.             public void operationComplete(Future<Object> future) throws Exception {
    8.                 if (terminatedChildren.incrementAndGet() == children.length) {
    9.                     //当所有Reactor关闭后 才认为是关闭成功
    10.                     terminationFuture.setSuccess(null);
    11.                 }
    12.             }
    13.         };
    14.         
    15.         //为所有Reactor添加terminationListener
    16.         for (EventExecutor e: children) {
    17.             e.terminationFuture().addListener(terminationListener);
    18.         }

    我们在回到文章开头Netty服务端代码模板

    1. public final class EchoServer {
    2.     static final int PORT = Integer.parseInt(System.getProperty("port""8007"));
    3.     public static void main(String[] args) throws Exception {
    4.         // Configure the server.
    5.         //创建主从Reactor线程组
    6.         EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    7.         EventLoopGroup workerGroup = new NioEventLoopGroup();
    8.         ...........省略............
    9.     }
    10. }

    现在Netty的主从Reactor线程组就已经创建完毕,此时Netty服务端的骨架已经搭建完毕,骨架如下:

    主从Reactor线程组.png


    总结

    本文介绍了首先介绍了Netty对各种IO模型的支持以及如何轻松切换各种IO模型

    还花了大量的篇幅介绍Netty服务端的核心引擎主从Reactor线程组的创建过程。在这个过程中,我们还提到了Netty对各种细节进行的优化,展现了Netty对性能极致的追求。

    好了,Netty服务端的骨架已经搭好,剩下的事情就该绑定端口地址然后接收连接了

     

  • 相关阅读:
    电子器件系列44:环形线圈电感
    NewStarCTF2023week4-R通大残(RGB通道隐写)
    springboot 配置kafka批量消费,并发消费
    git stash暂存-详细命令使用
    boss:整个卡尔曼滤波器的简单案例——估计机器人位置
    【学习笔记】线程的生命周期和状态
    (MSFT.O)微软2024财年Q3营收619亿美元
    html菜单的基本制作
    android studio 修改图标
    C++&QT day6
  • 原文地址:https://blog.csdn.net/youzhangjing_/article/details/128154146