• RocketMQ实战之同城机房负载消费


    同城机房

    1. 将RocketMQ部署到两个同城机房,一个用于对外流量,一个用于灾备切换(比如入口网络故障,导致所有系统不可用)。两个机房共同承担消息的写入和消费。 每个机房都有Produer,每个机房都有Consumer

    2. 目标:实现本机房的消费者优先消费本机房中的消息,避免跨机房消费

    3. 使用docker模拟多机房场景

      机房主机名brokerName端口状态
      机房Arocketmq-master1rocketmq-master2-MachineRoomA10911正常
      机房Brocketmq-master2rocketmq-master2-MachineRoomB10911不正常
      机房Arocketmq-slave1rocketmq-master2-MachineRoomA10911正常
      机房Brocketmq-slave2rocketmq-master2-MachineRoomB10911正常
      共用rocketmq-nameserver19876正常
      共用rocketmq-nameserver29876正常

    操作步骤

    1. 查看集群
      在这里插入图片描述

    2. 消费者关键代码

      //修改ClientIP(模拟A机房的consumer),便于区分
      consumer.setClientIP("MachineRoomB-" + RemotingUtil.getLocalAddress());
      
      // 机房就近算法
      AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
      // 多机房解析器
      AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new
              AllocateMachineRoomNearby.MachineRoomResolver() {
                  @Override
                  public String brokerDeployIn(MessageQueue messageQueue) {
                      // brokerName约定是rocketmq-master1-MachineRoomA格式,所以取第三个
                      return messageQueue.getBrokerName().split("-")[2];
                  }
                  @Override
                  public String consumerDeployIn(String clientID) {
                    	//我们修改的clientIp前缀是MachineRoomB
                      return clientID.split("-")[0];
                  }
              };
      consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(averagely, machineRoomResolver));
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
    3. 完整代码

      public class MachineRootTest {
          private static final Logger logger = LoggerFactory.getLogger("rocketmq-producer");
      
      
          /**
           * 模拟一直生产消息
           */
          @Test
          public void testSync() {
              String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
              String producerGroup = "ProducerGroupName";
              final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
              try {
                  defaultMQProducer.setInstanceName("producer");
                  defaultMQProducer.setSendMsgTimeout(20000);
                  defaultMQProducer.setVipChannelEnabled(false);
                  defaultMQProducer.setNamesrvAddr(namesrvAddr);
                  defaultMQProducer.setRetryTimesWhenSendFailed(3);
                  defaultMQProducer.start();
      
                  String topic = "MachineTopic";
                  String tag = "TagA";
                  String keys = "keys";
                  for (int i = 0; i < 1000000; i++) {
                      String msg = "hello world " + i;
                      Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                      SendResult sendResult = defaultMQProducer.send(message);
                      logger.info("第{}条消息:返回状态{}", i, sendResult.getSendStatus());
                      TimeUnit.MICROSECONDS.sleep(1000);
                  }
              } catch (Exception e) {
                  logger.error(e.getMessage(), e);
              } finally {
                  defaultMQProducer.shutdown();
              }
          }
      
          @Test
          public void testPushConsumerMachineRoomA() {
              String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
              String consumerGroup = "ConsumerGroupNameMachineRoom";
      
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
              try {
                  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                  consumer.setNamesrvAddr(namesrvAddr);
                  consumer.setInstanceName("ConsumberMachineRoomA");
                  //修改ClientIP(模拟A机房的consumer),便于区分
                  consumer.setClientIP("MachineRoomA-" + RemotingUtil.getLocalAddress());
                  consumer.setMessageModel(MessageModel.CLUSTERING);
                  consumer.setConsumeMessageBatchMaxSize(20);
                  consumer.setVipChannelEnabled(false);
      
                  String topic = "MachineTopic";
                  String tag = "TagA";
                  consumer.subscribe(topic, tag);
                  // 机房就近算法
                  AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
                  // 多机房解析器
                  AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new
                          AllocateMachineRoomNearby.MachineRoomResolver() {
                              @Override
                              public String brokerDeployIn(MessageQueue messageQueue) {
                                  // brokerName约定是rocketmq-master1-MachineRoomA格式,所以取第三个
                                  return messageQueue.getBrokerName().split("-")[2];
                              }
                              @Override
                              public String consumerDeployIn(String clientID) {
                                  return clientID.split("-")[0];
                              }
                          };
                  consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(averagely, machineRoomResolver));
      
                  consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                      for (MessageExt messageExt : msgs) {
                          try {
      
                              String topic1 = messageExt.getTopic();
                              String msg = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                              String tags = messageExt.getTags();
                              logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), topic1, tags, msg);
                          } catch (Exception e) {
                              logger.error(e.getMessage(), e);
                              // 重试消费,重发到Broker的RETRY TOPIC。 10s后Broker默认重新投递
                              return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                          }
                      }
                      //表示此批消息消费完成
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  });
                  //Consumer对象在使用之前必须要调用start初始化,初始化一次即可
                  consumer.start();
                  LockSupport.park();
              } catch (MQClientException e) {
                  logger.error(e.getMessage(), e);
              } finally {
                  consumer.shutdown();
              }
      
          }
      
          @Test
          public void testPushConsumerMachineRoomB() {
              String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
              String consumerGroup = "ConsumerGroupNameMachineRoom";
      
              DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
              try {
                  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
                  consumer.setNamesrvAddr(namesrvAddr);
                  consumer.setInstanceName("ConsumberMachineRoomB");
                  //修改ClientIP(模拟A机房的consumer),便于区分
                  consumer.setClientIP("MachineRoomB-" + RemotingUtil.getLocalAddress());
                  consumer.setMessageModel(MessageModel.CLUSTERING);
                  consumer.setConsumeMessageBatchMaxSize(20);
                  consumer.setVipChannelEnabled(false);
      
                  String topic = "MachineTopic";
                  String tag = "TagA";
                  consumer.subscribe(topic, tag);
                  // 机房就近算法
                  AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
                  // 多机房解析器
                  AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new
                          AllocateMachineRoomNearby.MachineRoomResolver() {
                              @Override
                              public String brokerDeployIn(MessageQueue messageQueue) {
                                  // brokerName约定是rocketmq-master1-MachineRoomA格式,所以取第三个
                                  return messageQueue.getBrokerName().split("-")[2];
                              }
                              @Override
                              public String consumerDeployIn(String clientID) {
                                  return clientID.split("-")[0];
                              }
                          };
                  consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(averagely, machineRoomResolver));
      
                  consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                      for (MessageExt messageExt : msgs) {
                          try {
      
                              String topic1 = messageExt.getTopic();
                              String msg = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
                              String tags = messageExt.getTags();
                              logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), topic1, tags, msg);
                          } catch (Exception e) {
                              logger.error(e.getMessage(), e);
                              // 重试消费,重发到Broker的RETRY TOPIC。 10s后Broker默认重新投递
                              return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                          }
                      }
                      //表示此批消息消费完成
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                  });
                  //Consumer对象在使用之前必须要调用start初始化,初始化一次即可
                  consumer.start();
                  LockSupport.park();
              } catch (MQClientException e) {
                  logger.error(e.getMessage(), e);
              } finally {
                  consumer.shutdown();
              }
      
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86
      • 87
      • 88
      • 89
      • 90
      • 91
      • 92
      • 93
      • 94
      • 95
      • 96
      • 97
      • 98
      • 99
      • 100
      • 101
      • 102
      • 103
      • 104
      • 105
      • 106
      • 107
      • 108
      • 109
      • 110
      • 111
      • 112
      • 113
      • 114
      • 115
      • 116
      • 117
      • 118
      • 119
      • 120
      • 121
      • 122
      • 123
      • 124
      • 125
      • 126
      • 127
      • 128
      • 129
      • 130
      • 131
      • 132
      • 133
      • 134
      • 135
      • 136
      • 137
      • 138
      • 139
      • 140
      • 141
      • 142
      • 143
      • 144
      • 145
      • 146
      • 147
      • 148
      • 149
      • 150
      • 151
      • 152
      • 153
      • 154
      • 155
      • 156
      • 157
      • 158
      • 159
      • 160
      • 161
      • 162
      • 163
      • 164
      • 165
      • 166
    4. 查看消费情况 在这里插入图片描述

    5. 将B机房的消费者停掉,可以看到A机房的消费者可以正常消费
      在这里插入图片描述

  • 相关阅读:
    微信小程序登录获取不到头像和昵称解决办法!
    Codeforces Round 902 Div 1 (CF 1876)
    java-net-php-python-jsp人事管理系统计算机毕业设计程序
    漏洞补丁:漏洞命名(CVE和CNNVD)及补丁查找
    Real-Time Rendering——9.8.2 Multiple-Bounce Surface Reflection多次反射表面反射
    C++基础语法
    【GA-ACO-BP预测】基于混合遗传算法-蚁群算法优化BP神经网络回归预测研究(Matlab代码实现)
    Centos7系统重装报错“ /dev/root does not exist“解决办法
    中国电子云数据库 Mesh 项目 DBPack 的实践
    Unity中UI Shader遮罩RectMask2D
  • 原文地址:https://blog.csdn.net/usagoole/article/details/126355376