RocketMQ 消息队列架构主要包括 NameServe、Broker(Master/Slave)、Producer、Consumer 4 个核心部件,基本执行流程如下:
点击查看大图
NameServer 优先启动。NameServer 是整个 RocketMQ 的“中央大脑” ,作为 RocketMQ 的服务注册中心,所以 RocketMQ 需要先启动 NameServer 再启动 Rocket 中的 Broker。
Broker 启动后,需要将自己注册至 NameServer 中,并 保持长连接,每 30s 发送一次发送心跳包,来确保 Broker 是否存活。并将 Broker 信息 ( IP+、端口等信息)以及 Broker 中存储的 Topic 信息上报。注册成功后,NameServer 集群中就有 Topic 跟 Broker 的映射关系。
NameServer 如果检测到 Broker 宕机(因为使用心跳机制, 如果检测超 120s(两分钟)无响应),则从路由注册表中将其移除。
生产者在发送某个主题的消息之前先从 NamerServer 获取 Broker 服务器地址列表(Broker 可能是 Cluster 模式),然后根据负载均衡算法从列表中选择 1 台 Broker ,建立连接通道,进行消息发送。
消费者在订阅某个 topic 的消息之前从 NamerServer 获取 Broker 服务器地址列表(Broker 可能是 Cluster 模式),包括关联的全部 Topic 队列信息。进而获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费数据。
生产者和消费者默认每 30s 从 NamerServer 获取 Broker 服务器地址列表,以及关联的所有 Topic 队列信息,更新到 Client 本地。
2 ~ 4 步骤实际上是 Producer、Broker 以及 NameServer 之间整个进行数据通信的过程,面对复杂的消息队列系统,一个性能优良,稳定性高的网络通信模块是非常重要的,它体现了 RocketMQ 集群消息的整体吞吐和负载能力。也是 RocketMQ 保证高性能、高稳定性的基石。
点击查看大图
通过上图可以看到,在整个 RocketMQ 队列系统中,rocketmq-remoting 这个 module 是专门用来负责网络通信职能的。并且从模块依赖关系中可以看出 ,rocketmq-client(client)、rocketmq-broker(broker)、rocketmq-namesrv(namesrc 命名服务) 等模块均依赖了它。
通信层是基于 Netty 进行扩展的,并自定义了通信协议,用于将消息传递给 Broker 进行存储。实现 Client 与 Server 之间高效的数据请求与接收。
RocketMQ 相关视频解析:https://www.bilibili.com/video/BV1kB4y1U7bk/?spm_id_from=333.999.0.0
因为是基于 Netty 进行扩展的,所以自定义了 RocketMQ 的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。在 RocketMQ 中,负责这个工作的就是 RemotingCommand 类,我们来看看这个类的几个重要属性:
传输的消息内容主要由一下几个部分组成:
点击查看大图
在 RocketMQ 消息队列中支持通信的模式主要有
sync 同步发送模式
async 异步发送模式
oneway 单向模式,无需关注 Response
下图从 NettyRemotingClient 初始化,NettyRemotingServer 初始化,基于 NettyRemotingClient 的消息发送,以及 Handler 处理过程来说明。
点击查看大图
Broker 和 NameServer 启动时同步调用 NettyRemotingServer.start() 方法, 初始化 Netty 服务器配置 BossGroup/WorkerGroup NioEventLoopGroup 线程组配置 Channel 添加 NettyServerHandler 调用 serverBootstrap.bind() 监听端口,等待 client 的 connection
Producer 和 Consumer 同样需要启动 Netty 的客户端,通过调用 NettyRemotingClient.start() 初始化 Netty 客户端配置客户端 NioEventLoopGroup 线程组配置 Channel 添加 NettyClientHandler
发送同步消息时,调用 NettyRemoteClient.invokeSync(),从 channelTables 缓存中获取或者创建用于通信的 Channel 通道。
创建完 Channel 后,生产者 Producer 调用 Channel.writeAndFlush() 发送数据
NettyRemotingServer 服务端线程组 处理可读事件,调用 NettyServerHandler 处理数据。
下一步,NettyServerHandler 调用 processMessageReceived 方法,接收并处理传送过来的数据。
根据请求码 RequestCode 区别不同的请求,来执行不同的 Processor。说明:Processor 在服务端初始化的时候,将 RequestCode 添加到 Processor 缓存中。消息的存、查、拉取都是不同的请求码。
processMessageReceived 从 ResponseTables(key 为 opaque) 缓存中取出 ResponseFuture,并将将返回结果设置到 ResponseFuture。同步模式下执行 responseFuture.putResponse()方法,异步调用执行回调方法。
NettyRemotingClient 收到可读事件,调用 NettyClientHandler 读取并处理返回事件。
上面我们说过了,RocketMQ 的通信是采用 Netty 组件作为底层通信库。同样的,它也遵循 Reactor 多线程模型,并在此基础上做了一些优化。
点击查看大图
上面图中四个图形可以大致说明 NettyRemotingServer 的 Reactor 多线程模型,在 RocketMQ 中的存在形式。
M:1 个 Reactor 主线程:eventLoopGroupBoss,它的职能是负责监听 TCP 网络连接请求,有连接请求过来时候,创建 SocketChannel,并注册到 selector 上。
S:RocketMQ 的源码中会选择 NIO 或 Epoll,来监听网络数据,当监听到网络数据过来时,读取数据并丢给 Worker 线程池:eventLoopGroupSelector,Rocket 源码中默认设置线程数为 3。
M1:执行业务之前的各种杂事(SSL 认证、空闲检查、网络连接检查、编解码、序列化反序列化 等等),交付给 这些工作交给 defaultEventExecutorGroup 去处理,RocketMQ 源码中默认线程数设置为 8。
M2:剩下处理业务的操作,就直接放在业务线程池中执行了。按照之前说的,依据 RequestCode 去 processorTable 本地缓存中找到对应的 processor,并封装成 task 任务,在丢给对应的业务 processor 线程池来处理。
完整的可以参照官网的这张图:
点击查看大图
上面介绍了 RocketMQ 消息通信的主要内容,我们用几句话总结下:
整个 RocketMQ 队列系统中,rocketmq-remoting Module 是专门用来负责网络通信职能的。
网络通信模块基于 Netty 进行扩展的,所以自定义了 RocketMQ 的消息协议,在传输过程的数据进行结构制定、封装、编解码的过程。
理解 NettyRemotingServer/NettyRemotingClient 的初始化过程,以及调用 NettyServerHandler/NettyClienthandler 进行处理的执行流程。
同步异步:同步和异步消核心区别是 同步消息通过 Netty 发送请求后会执行 ResponseFuture.waitResponse() 阻塞等待,异步的请求则 SendCallback 相应的方法进行回调处理。
多线程模式下会通过 1 个 Reactor 主线程(监听连接),以及 Reactor 线程池(监听数据)、Worker 线程池(处理前置工作)、Processor 线程池(处理业务逻辑) 来处理通信过程。
需要这份资料的朋友可以+文末wx名片免费获取