• RocketMQ 系列(五)高可用与负载均衡


     


    RocketMQ 系列(五)高可用与负载均衡

    RocketMQ 前面系列文章如下:

    上一篇讲了 RocketMQ 的消息存储,这一篇来讲一下怎么实现 RocketMQ 的高可用与负载均衡。

    1、Broker 高可用

    RocketMQ 的高可用主要是通过 BrokerMasterSlave相互配合来实现的。

    4

    Broker 部署相对复杂,Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但是一个 Slave 只能对应一个 Master,Master 与 Slave 的对应关系通过指定相同的 BrokerName,不同的 BrokerId 来定义,BrokerId 为 0 表示 Master,非 0 表示 Slave。Master 也可以部署多个。每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。 注意:当前 RocketMQ 版本在部署架构上支持一 Master多 Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

    1.1、Broker 集群部署方式

    Broker的高可用通过集群部署的方式实现,而部署方式主要分成了以下 4 种:

    1. 单 master方式(不推荐)

      • 优点:除了配置简单没什么优点,适合个人学习使用。
      • 缺点:不可靠,该机器重启或宕机,将导致整个服务不可用。无 HA,测试环境玩玩就行。
    2. 多 master 方式(不推荐):多个 master 节点组成集群,单个 master 节点宕机或者重启对应用没有影响。

      • 优点:所有模式中性能最高
      • 缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。

      注意:使用同步刷盘可以保证消息不丢失,同时 Topic 相对应的 queue 应该分布在集群中各个节点,而不是只在某各节点上,否则,该节点宕机会对订阅该 topic 的应用造成影响。

    3. 多 master 多 slave 异步复制方式:在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。

      • 优点:在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
      • 缺点:使用异步复制的同步方式有可能会有消息丢失的问题。
    4. 多 master 多 slave 同步双写模式:同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式。

      • 优点:同步双写的同步模式能保证数据不丢失。
      • 缺点:发送单个消息 RT 会略长,性能相比异步复制低10%左右。
      • 刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)
      • 同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步)

      注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。

    这里提到了两个概念,刷盘是什么?复制又是什么?让我们带着疑问往下看。

    1.2、同步刷盘与异步刷盘

    RocketMQ 的消息是存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ 为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过 Producer 写入 RocketMQ 的时候,有两种写磁盘方式,分别为同步刷盘和异步刷盘。

    e29a44ed868e4978a8bb3db3a7be0eca.png
    1. 同步刷盘:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的 PAGECACHE 后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。
      • 优点:可以保持MQ的消息状态和生产者/消费者的消息状态一致
      • 缺点:性能比异步的低
    2. 异步刷盘:在返回写成功状态时,消息可能只是被写入了内存的 PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘动作,快速写入。
      • 优点:性能高。
      • 缺点:Master 宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致。

    同步刷盘和异步刷盘,都是通过 Broker 配置文件里的 flushDiskType 参数设置的,把这个参数被配置成 SYNC_FLUSH(同步)、ASYNC_FLUSH (异步)中的一个。

    1.3、同步复制与异步复制

    如果一个 Broker 组有 Master和 Slave,消息需要从 Master 复制到 Slave 上,有同步和异步两种复制方式。

    1. 同步复制:即同步双写,等 Master 和 Slave 均写成功后才反馈给客户端写成功状态。
      • 优点:如果 Master 出故障,Slave 上有全部的备份数据,容易恢复,消费者仍可以从 Slave 消费, 消息不丢失。
      • 缺点:增大数据写入延迟,降低系统吞吐量,性能比异步复制模式略低,大约低10%左右,发送单个 Master 的响应时间会略高。
    2. 异步复制:只要 Master 写成功即可反馈给客户端写成功状态。
      • 优点:系统拥有较低的延迟和较高的吞吐量,Master 宕机之后,消费者仍可以从 Slave 消费,此过程对应用透明,不需要人工干预,性能同多个 Master模式几乎一样。
      • 缺点:如果 Master 出了故障,有些数据因为没有被写入 Slave,而丢失少量消息。

    同步复制和异步复制是通过 Broker 配置文件里的 brokerRole 参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、SYNC_MASTER、SLAVE 三个值中的一个。三个值说明:

    • ASYNC_MASTER:异步复制主节点。
    • SYNC_MASTER:同步复制主节点。
    • SLAVE:从节点。

    1.4、小结

    实际应用中要结合业务场景,合理设置刷盘方式主从复制方式, 尤其是 SYNC_FLUSH (同步刷盘)方式,由于频繁地触发磁盘写动作,会明显降低性能。

    通常情况下,应该把 MasterSlave 配置成 ASYNC_FLUSH异步刷盘)的刷盘方式,主从之间配置成 SYNC_MASTER同步复制)的复制方式,这样即使有一台机器出故障,仍然能保证数据不丢,是个不错的选择。

    2、Producer 高可用

    在创建 Topic 的时候,把 Topic 的多个 Message Queue 创建在多个 Broker 组上(相同 Broker 名称,不同 brokerId 的机器组成一个 Broker 组),这样当一个Broker 组的 Master不可用后,其他组的 Master 仍然可用,Producer 仍然可以发送消息。
    如果Master挂掉了,那么如何选取slave成为新的Master,参考官方文档Controller部署和Broker部署方式,这里就不详讲了。

    大致流程如下:

    1
    1. 首先Topic 在两个Master节点的Broker上都有分别4个Message Queue。
    2. 默认使用轮询的方式进行队列和Broker的选择。例如选中了 Broker A 的 Q4。
    3. 如果发送成功,则正常返回,结束。
    4. 如果发送失败就会触发重试机制(消息最大重试次数是2次),并选择使用哪一种重试策略。
    5. 重试策略有2种:开启和不开启故障延迟机制。(默认不开启)
    6. 如果不开启故障延迟机制,那么重试发送就会轮询选择刚才失败的那个Broker的下一个队列,例如Broker A的Q1。(这种方式的缺点是有可能重试会再一次失败,因为如果第一次失败了大部分情况是这个Broker有问题了,所以当第二次选择这个Broker的其他队列时,大概率也会失败。)
    7. 如果开启了故障延迟机制,那么在消息第一次发送失败后就会将该Broker置为不可用列表,转而重新选择Broker。(这种方式的缺点是,一旦所有的Broker都失败了,那么这个客户端将无法发送消息。)

    3、Consumer 高可用

    Consumer 的高可用是依赖于 Master-Slave 配置的,由于 Master 能够支持读写消息,Slave 支持读消息,当 Master 不可用或繁忙时, Consumer 会被自动切换到从 Slave 读取(自动切换,无需配置)。

    故当 Master 的机器故障后,消息仍可从 Slave 中被消费。

    4、Producer 负载均衡

    对于非顺序消(普通消息、定时/延时消息、事务消息)场景,默认且只能使用轮询模式的负载均衡策略。

    Producer 端,每个实例在发消息的时候,默认会轮询所有的 message queue 发送,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下,如下图:

    生产者负载策略

    如上图所示,M1、M2表示生产者发送的第一条消息、第二条消息,Queue1、Queue2、Queue3 表示主题中的三个队列。

    生产者按照轮询方式分别将消息依次发送到这三个队列中,M1 发送至 Queue1 中、M2 发送至 Queue2 中、M3 发送至 Queue3 中,以此类推,第四条消息 M4又发送至 Queue1 中,循环往复。

    注意轮询模式只使用于非顺序消息(普通消息、定时/延时消息、事务消息)场景,而对于顺序消息,相同的shading key只会对应一个Queue发送消息(5.0以上版本发送顺序消息时可配置 MessageGroupHash 模式)。

    5、Consumer 负载均衡

    在讲ConSumer负载均衡之前,有必要先了解 RocketMQ 的两种消费模式

    RocketMQ 支持两种消息模式:集群消费( Clustering )和广播消费( Broadcasting )。

    5.1、消费模式

    集群消费同一 Topic 下的一条消息只会被同一消费组中的一个消费者消费。也就是说,消息被负载均衡到了同一个消费组的多个消费者实例上。

    2

    广播消费:当使用广播消费模式时,每条消息推送给集群内所有的消费者,保证消息至少被每个消费者消费一次。

    3

    广播模式其实不是负载均衡,由于每个消费者都能够拿到所有消息,故不能达到负载均衡的要求。

    注意:对于 Topic、ConsumerGroup、Consumer 三者的关系需要满足订阅关系一致

    5.2、Queue 分配策略

    Consumer 的负载均衡是指将 Broker 端中多个 Queue 按照某种策略分配给同一个消费组中的不同消费者,因此只有集群消费才能做到负载均衡

    一个 Topic 中的 Queue 只能由 Consumer Group 中的一个 Consumer 进行消费,而一个 Consumer 可以同时消费多个 Queue 中的消息。那么 Queue 与Consumer 间的配对关系是如何确定的,即 Queue 要分配给哪个 Consumer 进行消费,也是有算法策略的,这些策略是通过在创建 Consumer 时的构造器传进去的。

    常见的有四种策略:平均分配、环形分配策略、一致性hash策略、同机房策略。

    通过以下代码可设置策略:

    复制 consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
    
    5.2.1、平均分配策略(默认)

    该算法是要根据 avg = QueueCount / ConsumerCount 的计算结果进行分配的。如果能够整除,则按顺序将 avg 个 Queue 逐个分配 Consumer;如果不能整除,则将多余出的 Queue 按照 Consumer 顺序逐个分配。如下图:

    4

    上面有 5 个 Queue,3 个 Consumer,那么每个 Consumer 可以分配到 1 个 Queue,但是还有 2 个 Queue 是多余的,那么这 2 个 Queue 将依次按顺序分给 Consumer1,Consumer2。

    5.2.2、环形分配策略

    环形平均算法是指,根据消费者的顺序,依次由 Queue 队列组成的环形图逐个分配,该方法不需要提前计算。如下图:

    5

    同样以 5 个 Queue,3 个 Consumer 为例,Queue1、Queue2、Queue3 按照顺序分配给 3 个 Consumer, 剩下的Queue4、Queue5 继续按照顺序分配给 Consumer1,Consumer2。

    5.2.3、一致性 hash 策略

    该算法会将 Consumer 的 hash 值作为Node节点存放到 hash 环上,然后将 Queue 的 hash 值也放到 hash 环 上,通过顺时针方向,距离 Queue 最近的那个Qonsumer 就是该 Queue 要分配的 Consumer。

    6

    顺时针方向进行判断,Queue1 分配给 Consumer1, Queue2、Queue3 分配给 Consumer2, Queue4、Queue5 分配给 Consumer3。

    该算法存在的问题:分配不均,其可以有效减少由于消费者组扩容或缩容所带来的大量的 Rebalance。

    5.2.4、同机房策略

    该算法会根据 Queue 的部署机房位置和 Consumer 的位置,过滤出当前 Consumer 相同机房的 Queue。然后按照平均分配策略或环形平均策略对同机房 Queue进行分配。如果没有同机房 Queue,则按照平均分配策略或环形分配策略对所有 Queue 进行分配。如下图:

    7

    Queue1、Queue2、Queue3 与 Consumer1、Consumer2 同个机房,3个 Queue 按照平均分配策略或环形分配策略指定 Consumer1、Consumer2。而Queue4、Queue5 则直接分配给 Consumer3。

    5.2.5、负载均衡代码

    下面展示了按照主题负载均衡的代码片段:

     mqSet = this.topicSubscribeInfoTable.get(topic);
                //根据Topic和Consumer Group获取Broker端消费者id的rpc请求
                List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }
    
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }
    
                if (mqSet != null && cidAll != null) {
                    List mqAll = new ArrayList();
                    mqAll.addAll(mqSet);
    
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
    
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                    List allocateResult = null;
                    try {
                        //按照Queue策略分配队列
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                  e);
                        return;
                    }
    
                    Set allocateResultSet = new HashSet();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
    				
                    //在负载均衡中更新ProcessQueueTable
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }
    ">复制1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            //广播消费    
            case BROADCASTING: {
                Set mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}",
                                 consumerGroup,
                                 topic,
                                 mqSet,
                                 mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            //集群消费    
            case CLUSTERING: {
                //从本地内存获取该Topic主题下的消息队列结合
                Set mqSet = this.topicSubscribeInfoTable.get(topic);
                //根据Topic和Consumer Group获取Broker端消费者id的rpc请求
                List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }
    
                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }
    
                if (mqSet != null && cidAll != null) {
                    List mqAll = new ArrayList();
                    mqAll.addAll(mqSet);
    
                    Collections.sort(mqAll);
                    Collections.sort(cidAll);
    
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
    
                    List allocateResult = null;
                    try {
                        //按照Queue策略分配队列
                        allocateResult = strategy.allocate(
                            this.consumerGroup,
                            this.mQClientFactory.getClientId(),
                            mqAll,
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                                  e);
                        return;
                    }
    
                    Set allocateResultSet = new HashSet();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
    				
                    //在负载均衡中更新ProcessQueueTable
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }
    

    5.3、消费类型

    消费者从 Broker 中获取消息的方式有两种:pull 方式和 push 方式。

    5.3.1、pull 消费

    Consumer 主动从 Broker 中拉取消息,主动权由 Consumer 控制。一旦获取了批量消息,就会启动消费过程。不过,该方式的实时性较弱,即 Broker 中有了新的消息时消费者并不能及时发现并消费。

    拉取时间间隔是由用户指定,所以在设置该间隔时需要注意:间隔太短,空请求比例会增加;间隔太长,消息的实时性太差.

    5.3.2、push 消费

    该模式下 Broker 收到数据后会主动推送给 Consumer,该获取方式一般实时性较高。

    该获取方式是典型的发布-订阅模式,即 Consumer 向其关联的 Queue 注册了监听器,一旦发现有新的消息到来就会触发回调,去 Queue 中拉取消息。而这些都是基于 Consumer 与 Broker 间的长连接的,长连接的维护是需要消耗系统资源的。

    5.3.3、对比
    • pull 需要应用去实现对关联 Queue 的遍历,实时性差;但便于应用控制消息的拉取。
    • push:封装了对关联 Queue 的遍历,实时性强,但会占用较多的系统资源。

    本篇主要以图解的方式讲解 Broker、Producer、Consumer 的高可用及负载均衡方式,Consumer 的负载均衡较为复杂便细化到其消费模式、Queue 分配策略、消费类型三个方面。高可用与负载均衡对于分布式系统是一个刚需,也是一道难关,道阻且长,小伙伴们加油吧!!!

    参考资料:

  • 相关阅读:
    C语言从头学21——函数
    gitee-快速设置
    软件性能测试分析与调优实践之路-Java应用程序的性能分析与调优-手稿节选
    Flink从入门到放弃—Stream API—常用算子(map和flatMap)
    Python3高级特性(四)之生成器(Generator)
    区块链(3):区块链去中心化
    Java基础练习题---类型转换、双分支、多分支、switch、for
    Android中单例模式正确实现方式
    【前端学习记录】neffos插件与控制台交互
    求A^B的最后三位数表示的整数。说明:A^B的含义是“A的B次方”(快速幂Java实现)
  • 原文地址:https://www.cnblogs.com/CF1314/p/17700900.html