• RocketMQ相关概念


    目录

    1、什么是RocketMQ

    2、为什么要使用MQ

    3、RocketMQ相关概念

    4、消费模式

    5、RocketMq的刷盘机制

    6、同步复制和异步复制

    7、什么情况下会出现堆积

    8、如何确保消息不丢失?

    1、什么是RocketMQ

            RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是一款开源的布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

    具有以下特点:

    1. 能够保证严格的消息顺序
    2. 提供丰富的消息拉取模式
    3. 高效的订阅者水平扩展能力
    4. 实时的消息订阅机制
    5. 亿级消息堆积能力

    2、为什么要使用MQ

    1、要做到系统解耦,当新的模块进来时,可以做到代码改动最小;  能够解耦

    2、设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰,限流

    3、强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

    Mq的作用  削峰限流 异步 解耦合

    定义:

    中间件(缓存中间件  redis memcache  数据库中间件 mycat  canal   消息中间件mq )

    面向消息的中间件(message-oriented middleware) MOM能够很好的解决以上的问题。

    是指利用高效可靠的消息传递机制进行与平台无关(跨平台)的数据交流,并基于数据通信来进行分布式系统的集成。

    通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储,流量削峰,异步通信,数据同步等

    流程:

            发送者(生产者)把消息发给消息服务器[MQ],消息服务器把消息存放在若干队列/主题中,在合适的时候,消息服务器会把消息转发给接收者(消费者)。在这个过程中,发送和接收是异步的,也就是发送无需等待,发送者和接受者的生命周期也没有必然关系在发布pub/订阅sub模式下,也可以完成一对多的通信,可以让一个消息有多个接收者[微信订阅号就是这样的]。

    异步处理模式:

            消息发送者可以发送一个消息而无需等待响应。消息发送者把消息发送到一条虚拟的通道(主题或队列)上;

            消息接收者则订阅或监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出回应。整个过程都是异步的。

    3、RocketMQ相关概念

    Producer:消息的发送者,生产者;举例:发件人

    Consumer:消息接收者,消费者;举例:收件人

    Broker:暂存和传输消息的通道;举例:快递

    NameServer:管理Broker;举例:各个快递公司的管理机构 相当于broker的注册中心,保留了broker的信息

    Queue:队列,消息存放的位置,一个Broker中可以有多个队列

    Topic:主题,消息的分类

    ProducerGroup:生产者组

    ConsumerGroup:消费者组,多个消费者组可以订阅通过一个主题,同时消费一个主题的消息(同一个消息会同时发到不同的消费者组例如:A组收到消息进行发短信,B组收到消息进行记录补偿数据。不同消费者组内的消费者可以采用负载均衡的策略进行消费),同一个消费者组下的消费者的订阅关系必须一致(topic、tag),不同的就要放到不同的组。

    订阅关系一致指的是同一个消费者组下所有Consumer实例所订阅的Topic、Tag必须完全一致。如果订阅关系不一致,消息消费的逻辑就会混乱,甚至导致消息丢失。

    Tag:消息标签,二级消息类型,用来进一步区分某个Topic下的消息分类。Topic 与 Tag 都是业务上用来归类的标识,区分在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。

    Message Key:Key 一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很 方便。RocketMQ 会创建专门的索引文件,用来存储 Key 与消息的映射,由于是 Hash 索引,应尽量使 Key 唯一,避免潜在的哈希冲突。

    Tag和key的用:

    RocketMQ消息的过滤(TAG过滤、SQL过滤)解析_rocketmq 消息过滤-CSDN博客

    消息发送的流程是,Producer询问NameServer,NameServer分配一个broker 然后Consumer也要询问NameServer,得到一个具体的broker,然后消费消息。

    一个topic如果有四个队列,集群模式下监听同一个topic的集群消费者最多只能有4个,不同的消费者监听不同的队列,如果有第五个,它就没有监听的队列,就永远都不会收到消息。集群消费者数量<=topic的队列数量

    如果是广播模式则不影响,所有的消费者都可以收到消息。

    4、消费模式

    MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

            Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

            Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

            Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式。

    5、RocketMq的刷盘机制

     RocketMQ的刷盘机制是一种确保消息可靠性的机制,简单来说就是Broker收到消息后,将消息存储到磁盘上,对消息进行持久化处理。这样可以解决几个问题:

    1. 存储空间问题。内存空间有限,存入磁盘可以维护更多消息。
    2. 消息可靠性问题。消息存入磁盘后,即使断电了,重启后便可恢复消息。

    同步刷盘:

            消息从Producder端发送出去后,被Broker接收,Broker接收到消息后将消息写入内存的PageCache后,立即通知刷盘线程进行刷盘,当前线程等待刷盘线程的通知。刷盘线程开始进行刷盘操作,刷盘完毕后唤醒之前等待的线程,再返回写成功状态,最后Producer会收到消息发送成功的ACK

    异步刷盘:

            消息从Producder端发送出去后,被Broker接收到,Broker端接收到消息后,消息被写入PageCache后立即返回写成功给Producer端。然后另一个异步线程专门会将PageCache中的数据写到磁盘里,确保消息的持久化。

    同步和异步刷盘的比较:

            从上图的比较可以发现同步和异步刷盘的主要区别在于消息写入PageCache后是否立即返回写状态。可以从几个维度分析两者的差异:

    • 吞吐量。由于异步刷盘在写入PageCache后立即返回,没有经历IO操作,因此吞吐量比同步刷盘的高很多。
    • 可靠性。同步刷盘是在完全写磁盘成功后才算成功,而异步刷盘是写入PageCache就返回,PageCache本质就是内存,假如在异步线程写磁盘之前机器断电了,消息还是可能丢失的,因此可靠性方面同步刷盘较高。
    • 性能方面。同步刷盘写入磁盘后才算成功,而异步刷盘只需要写入内存就算成功,因此异步刷盘性能高于同步刷盘。
    • 适用场景。同步刷盘可靠性高,因此适用金融等对数据要求较高的场景。异步刷盘可靠性相对来说低一些,但是性能好,因此适合要求高吞吐和高性能的场景。

    刷盘机制配置:

            刷盘方式可以通过Broker配置文件里的flushDiskType参数设置,这个参数有两种值:

    • SYNC_FLUSH (同步刷盘)。
    • ASYNC_FLUSH (异步刷盘)。

    主流mq的刷盘机制:

    6、同步复制和异步复制

    上面的同步刷盘和异步刷盘是在单个结点层面的,而同步复制和异步复制主要是指的 Borker 主从模式下,主节点返回消息给客户端的时候是否需要同步从节点。

    同步复制: 也叫 “同步双写”,也就是说,只有消息同步双写到主从节点上时才返回写入成功 。
    异步复制: 消息写入主节点之后就直接返回写入成功 。

    7、什么情况下会出现堆积

    1、生产太快了

                a.生产方可以做业务限流

             b.增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))

               c.动态扩容队列数量,从而增加消费者数量

    2、消费者消费出现问题

            排查消费者程序的问题

    8、如何确保消息不丢失?

    1. 生产者使用同步发送模式 ,收到mq的返回确认以后  顺便往自己的数据库里面写msgId status(0) time
    2. 消费者消费以后 修改数据这条消息的状态 = 1
    3. 写一个定时任务 间隔两天去查询数据  如果有status = 0 and time < day-2
    4. 将mq的刷盘机制设置为同步刷盘
    5. 使用集群模式 ,搞主备模式,将消息持久化在不同的硬件上
    6. 可以开启mq的trace机制,消息跟踪机制

  • 相关阅读:
    现代软件为什么要采用微服架构
    FPGA+MCU+SDRAM方案,用于服装厂生产过程中以及设计过程中制作样板,剪裁布料
    医药流通企业如何安全访问医药ERP?无需公网IP和改变现有IT架构
    SpringBoot工程模板
    js中HTMLCollection如何循环
    JAVA并发编程之原子性、可见性与有序性
    Self-Supervised Visual Feature Learning With Deep Neural Networks: A Survey
    一文理解TS泛型
    Uni-app 命令行创建项目 多端运行
    Python中高效的爬虫框架
  • 原文地址:https://blog.csdn.net/m0_46979453/article/details/134253860