• 消息发送机制梳理


    Netty的消息发送涉及线程切换、消息队列、高低水位和写半包消息,实现比较复杂。

    【AbstractChannelHandlerContext的write方法】

    当用户线程发起write操作时,Netty会进行判断,如果发现不是NioEventLoop (I/O线程),则将发送消息封装成WriteTask,放入 NioEventLoop的任务队列由NioEventLoop线程执行,代码如下(AbstractChannelHandlerContext类):

    【NioEventLoop的串行无锁化】

    NioEventLoop属于io线程,每个NioEventLoop都有一个队列, 这个队列是在创建NioEventLoop时被初始化。其他的线程, 也就是多个生产者, 如果需要进行读写操作, 就把读写操作封装成任务放在这个队列中, 然后由这个NioEventLoop封装的IO线程去消费.那么假如现在有一个线程, 它咋知道我是该主动去执行读写操作, 还是应该把自己的读写操作封装成任务放在队列中呢? 那么它就会去调用一下inEventLoop()方法, 这个方法会返回一个boolean值告诉它. 其实这个方法的实现很简单
    从源码中可以看出来, 它就是拿当前线程和之前创建NioEventLoop时绑定的那个IO线程进行判断, 如果是一样的, 说明此线程就是绑定的IO线程, 可以执行读写操作, 如果不一样, 那么说明是其他线程, 就要把读写操作封装成任务放在队列中, 由绑定的那个IO线程去执行.
    这也是Netty设计的异步串行无锁化. 在Netty中线程之间不用同步控制, 可以做到线程安全.
    Netty的NioEventLoop线程内部维护了一个Queue taskQueue,除了处理网络I/O读写操作,同时还负责执行网络读写相关的Task(包含用户自定义Task),代码如下( SingleThreadEventExecutor类):
    NioEventLoop遍历taskQueue,执行消息发送任务,代码如下(AbstractWriteTask类):
    经过一些系统处理操作,最终会调用ChannelOutboundBuffer的addMessage方法,将发送消息加入发送队列(数据结构为链表)。

    【ChannelOutboundBuffer】

    ChannelOutboundBuffer是Netty的发送缓冲队列,它基于链表来管理待发送的消息,定义如下(ChannelOutboundBuffer类):
    在消息发送时会调用ChannelOutboundBuffer 的addMessage方法,修改链表指针,将新加入的消息放到尾部,同时更新上一个尾部消息的next 指针,指向新加入的消息。
    消息发送原理图:
    当SocketChannel无法一次将所有待发送的ByteBuf/ByteBuffer写入网络时,需要决定是注册SelectionKey.OP_WRITE在下一次Selector轮询时继续发送,还是在当前位置循环发送,等到所有消息都发送完成再返回。如果频繁地注册 SelectionKey.OP_WRITE 并 wakeup Selector会影啊性能;但定如术TCP 的发送缓冲区已满,TCP处于KEEP-ALIVE状态,消息无法发送出去,如果不对循环发送次数进行控制,就会长时间处于发送状态,Reactor 线程无法及时读取其他消息和执行排队的Task。所以Netty采取了折中的方式,即如果本次发送的字节数大于0,但足消息尚未发送完,则循环发送,一旦发现 write字节数为0,说明TCP缓冲区已满,此的继续发送没有意义,注册SelectionKey.OP_WRITE并退出循环,在下一个SelectionKey轮询周期继续发送,代码如下(NioSocketChannel类)。

    【NioEventLoop的局部无锁化】

    Netty 的NioEventLoop并不是一个纯粹的IO线程,它除了负责I/O读写操作,还兼顾以下两类任务。
    (1)系统任务:通过调用NioEventLoop的execute(Runnable task)方法执行,Netty有很多系统任务,创建它们的主要原因是,当I/O线程和用户线程同时操作网络资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成任务放入消息队列,由I/O线程负责执行,这样就实现了局部无锁化。
    (2)定时任务:通过调用NioEventLoop 的schedule(Runnable command,long delay,TimeUnit unit)系列方法实现。

    【总结】

    (1)多个业务线程并发调用write相关方法是线程安全的,Netty会将发送消息封装成Task,由1/O线程异步执行。
    (2)由于单个Channel由其对应的NioEventLoop线程(单个NioEventLoop)执行,如果并行调用某个Channel的write操作超过对应的NioEventLoop线程的执行能力,则会导致WriteTask积压。
    (3)NioEventLoop线程需要处理网络读写操作,以及注册到NioEventLoop 上的各种Task,两者相互影响,如果网络读写任务较重,或者注册的Task过多,都会导致对方延迟执行,引发性能问题。
  • 相关阅读:
    C语言——指针进阶
    商品销售管理系统java+mysql
    算法:(贪心算法)-独木舟问题
    AP8100 DC-DC 升压恒压电源管理芯片
    js、javascript中常见获取时间戳的方法
    umi3使用
    五、K8S之Service
    【.NET6+Avalonia】开发支持跨平台的仿WPF应用程序以及基于ubuntu系统的演示
    c# 的一些简单用法
    让我们第一次走进Java的世界
  • 原文地址:https://blog.csdn.net/qq_34448345/article/details/127440083