• 追求极致性能,RocketMQ 消息通信详解


    1 介绍

    RocketMQ 消息队列架构主要包括 NameServe、Broker(Master/Slave)、Producer、Consumer 4 个核心部件,基本执行流程如下:

    点击查看大图

    1. NameServer 优先启动。NameServer 是整个 RocketMQ 的“中央大脑” ,作为 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。

    2. Broker 启动后,需要将自己注册至 NameServer 中,并 保持长连接,每 30s 发送一次发送心跳包,来确保 Broker 是否存活。并将 Broker 信息 ( IP+、端口等信息)以及 Broker 中存储的 Topic 信息上报。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。

    3. NameServer 如果检测到 Broker 宕机(因为使用心跳机制, 如果检测超 120s(两分钟)无响应),则从路由注册表中将其移除。

    4. 生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(Broker 可能是 Cluster 模式),然后根据负载均衡算法从列表中选择 1 台 Broker ,建立连接通道,进行消息发送。

    5. 消费者在订阅某个 topic 的消息之前从 NamerServer 获取 Broker 服务器地址列表(Broker 可能是 Cluster 模式),包括关联的全部 Topic 队列信息。进而获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费数据。

    6. 生产者和消费者默认每 30s 从 NamerServer 获取 Broker 服务器地址列表,以及关联的所有 Topic 队列信息,更新到 Client 本地。

      2 ~ 4 步骤实际上是 Producer、Broker 以及 NameServer 之间整个进行数据通信的过程,面对复杂的消息队列系统,一个性能优良,稳定性高的网络通信模块是非常重要的,它体现了 RocketMQ 集群消息的整体吞吐和负载能力。也是 RocketMQ 保证高性能、高稳定性的基石。

    2 网络通信过程分析

    2.1 通信类(rocketmq-remoting )的结构解析

    点击查看大图

    通过上图可以看到,在整个 RocketMQ 队列系统中,rocketmq-remoting 这个 module 是专门用来负责网络通信职能的。并且从模块依赖关系中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服务) 等模块均依赖了它。

    通信层是基于 Netty 进行扩展的,并自定义了通信协议,用于将消息传递给 Broker 进行存储。实现 Client 与 Server 之间高效的数据请求与接收。

    RocketMQ 相关视频解析:https://www.bilibili.com/video/BV1kB4y1U7bk/?spm_id_from=333.999.0.0

    2.2 协议结构设计

    因为是基于 Netty 进行扩展的,所以自定义了 RocketMQ 的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。在 RocketMQ 中,负责这个工作的就是 RemotingCommand 类,我们来看看这个类的几个重要属性:

    2.3 消息内容的组成结构

    传输的消息内容主要由一下几个部分组成:

    点击查看大图

    2.4 RocketMQ 消息通信流程

    在 RocketMQ 消息队列中支持通信的模式主要有

    • sync 同步发送模式

    • async 异步发送模式

    • oneway 单向模式,无需关注 Response

    2.4.1 通信流程说明

    下图从 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基于 NettyRemotingClient 的消息发送,以及 Handler 处理过程来说明。

    点击查看大图

    • Broker 和 NameServer 启动时同步调用 NettyRemotingServer.start() 方法, 初始化 Netty 服务器配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组配置 Channel 添加 NettyServerHandler 调用 serverBootstrap.bind() 监听端口,等待 client 的 connection

    • Producer 和 Consumer 同样需要启动 Netty 的客户端,通过调用 NettyRemotingClient.start() 初始化 Netty 客户端配置客户端 NioEventLoopGroup 线程组配置 Channel 添加 NettyClientHandler

    • 发送同步消息时,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创建用于通信的 Channel 通道。

    • 创建完 Channel 后,生产者 Producer 调用 Channel.writeAndFlush() 发送数据

    • NettyRemotingServer 服务端线程组 处理可读事件,调用 NettyServerHandler 处理数据。

    • 下一步,NettyServerHandler 调用 processMessageReceived 方法,接收并处理传送过来的数据。

    • 根据请求码 RequestCode 区别不同的请求,来执行不同的 Processor。说明:Processor 在服务端初始化的时候,将 RequestCode 添加到 Processor 缓存中。消息的存、查、拉取都是不同的请求码。

    • processMessageReceived 从 ResponseTables(key 为 opaque) 缓存中取出 ResponseFuture,并将将返回结果设置到 ResponseFuture。同步模式下执行 responseFuture.putResponse()方法,异步调用执行回调方法。

    • NettyRemotingClient 收到可读事件,调用 NettyClientHandler 读取并处理返回事件。

    2.4.2 Reactor 多线程设计

    上面我们说过了,RocketMQ 的通信是采用 Netty 组件作为底层通信库。同样的,它也遵循 Reactor 多线程模型,并在此基础上做了一些优化。

    点击查看大图

    上面图中四个图形可以大致说明 NettyRemotingServer 的 Reactor 多线程模型,在 RocketMQ 中的存在形式。

    • M:1 个 Reactor 主线程:eventLoopGroupBoss,它的职能是负责监听 TCP 网络连接请求,有连接请求过来时候,创建 SocketChannel,并注册到 selector 上。

    • S:RocketMQ 的源码中会选择 NIO 或 Epoll,来监听网络数据,当监听到网络数据过来时,读取数据并丢给 Worker 线程池:eventLoopGroupSelector,Rocket 源码中默认设置线程数为 3。

    • M1:执行业务之前的各种杂事(SSL 认证、空闲检查、网络连接检查、编解码、序列化反序列化 等等),交付给 这些工作交给 defaultEventExecutorGroup 去处理,RocketMQ 源码中默认线程数设置为 8。

    • M2:剩下处理业务的操作,就直接放在业务线程池中执行了。按照之前说的,依据 RequestCode 去 processorTable 本地缓存中找到对应的 processor,并封装成 task 任务,在丢给对应的业务 processor 线程池来处理。

    完整的可以参照官网的这张图:

    点击查看大图

    总结

    上面介绍了 RocketMQ 消息通信的主要内容,我们用几句话总结下:

    • 整个 RocketMQ 队列系统中,rocketmq-remoting Module 是专门用来负责网络通信职能的。

    • 网络通信模块基于 Netty 进行扩展的,所以自定义了 RocketMQ 的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。

    • 理解 NettyRemotingServer/NettyRemotingClient 的初始化过程,以及调用 NettyServerHandler/NettyClienthandler 进行处理的执行流程。

    • 同步异步:同步和异步消核心区别是 同步消息通过 Netty 发送请求后会执行 ResponseFuture.waitResponse() 阻塞等待,异步的请求则 SendCallback 相应的方法进行回调处理。

    • 多线程模式下会通过 1 个 Reactor 主线程(监听连接),以及 Reactor 线程池(监听数据)、Worker 线程池(处理前置工作)、Processor 线程池(处理业务逻辑) 来处理通信过程。

    需要这份资料的朋友可以+文末wx名片免费获取

  • 相关阅读:
    从SpringBoot启动,阅读源码设计
    codesys【按钮】
    单链表经典OJ题
    考虑阶梯式碳交易机制与电制氢的综合能源系统热电优化(Matlab代码实现)
    c语言程序范例
    Unity之UI、模型跟随鼠标移动(自适应屏幕分辨率、锚点、pivot中心点)
    CMakeLists.txt新手教程
    软件工程师都应该知道的10个定律
    Java全栈解密:从JVM内存管理到Spring框架,揭秘垃圾回收、类加载机制与Web开发精髓的全方位旅程
    百度之星-新的阶乘提问
  • 原文地址:https://blog.csdn.net/Q54665642ljf/article/details/127921872