将RocketMQ部署到两个同城机房,一个用于对外流量,一个用于灾备切换(比如入口网络故障,导致所有系统不可用)。两个机房共同承担消息的写入和消费。 每个机房都有Produer,每个机房都有Consumer
目标:实现本机房的消费者优先消费本机房中的消息,避免跨机房消费
使用docker模拟多机房场景
机房 | 主机名 | brokerName | 端口 | 状态 |
---|---|---|---|---|
机房A | rocketmq-master1 | rocketmq-master2-MachineRoomA | 10911 | 正常 |
机房B | rocketmq-master2 | rocketmq-master2-MachineRoomB | 10911 | 不正常 |
机房A | rocketmq-slave1 | rocketmq-master2-MachineRoomA | 10911 | 正常 |
机房B | rocketmq-slave2 | rocketmq-master2-MachineRoomB | 10911 | 正常 |
共用 | rocketmq-nameserver1 | 9876 | 正常 | |
共用 | rocketmq-nameserver2 | 9876 | 正常 |
查看集群
消费者关键代码
//修改ClientIP(模拟A机房的consumer),便于区分
consumer.setClientIP("MachineRoomB-" + RemotingUtil.getLocalAddress());
// 机房就近算法
AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
// 多机房解析器
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new
AllocateMachineRoomNearby.MachineRoomResolver() {
@Override
public String brokerDeployIn(MessageQueue messageQueue) {
// brokerName约定是rocketmq-master1-MachineRoomA格式,所以取第三个
return messageQueue.getBrokerName().split("-")[2];
}
@Override
public String consumerDeployIn(String clientID) {
//我们修改的clientIp前缀是MachineRoomB
return clientID.split("-")[0];
}
};
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(averagely, machineRoomResolver));
完整代码
public class MachineRootTest {
private static final Logger logger = LoggerFactory.getLogger("rocketmq-producer");
/**
* 模拟一直生产消息
*/
@Test
public void testSync() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String producerGroup = "ProducerGroupName";
final DefaultMQProducer defaultMQProducer = new DefaultMQProducer(producerGroup);
try {
defaultMQProducer.setInstanceName("producer");
defaultMQProducer.setSendMsgTimeout(20000);
defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setNamesrvAddr(namesrvAddr);
defaultMQProducer.setRetryTimesWhenSendFailed(3);
defaultMQProducer.start();
String topic = "MachineTopic";
String tag = "TagA";
String keys = "keys";
for (int i = 0; i < 1000000; i++) {
String msg = "hello world " + i;
Message message = new Message(topic, tag, keys, msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = defaultMQProducer.send(message);
logger.info("第{}条消息:返回状态{}", i, sendResult.getSendStatus());
TimeUnit.MICROSECONDS.sleep(1000);
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
defaultMQProducer.shutdown();
}
}
@Test
public void testPushConsumerMachineRoomA() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String consumerGroup = "ConsumerGroupNameMachineRoom";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
try {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setInstanceName("ConsumberMachineRoomA");
//修改ClientIP(模拟A机房的consumer),便于区分
consumer.setClientIP("MachineRoomA-" + RemotingUtil.getLocalAddress());
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeMessageBatchMaxSize(20);
consumer.setVipChannelEnabled(false);
String topic = "MachineTopic";
String tag = "TagA";
consumer.subscribe(topic, tag);
// 机房就近算法
AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
// 多机房解析器
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new
AllocateMachineRoomNearby.MachineRoomResolver() {
@Override
public String brokerDeployIn(MessageQueue messageQueue) {
// brokerName约定是rocketmq-master1-MachineRoomA格式,所以取第三个
return messageQueue.getBrokerName().split("-")[2];
}
@Override
public String consumerDeployIn(String clientID) {
return clientID.split("-")[0];
}
};
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(averagely, machineRoomResolver));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt messageExt : msgs) {
try {
String topic1 = messageExt.getTopic();
String msg = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
String tags = messageExt.getTags();
logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), topic1, tags, msg);
} catch (Exception e) {
logger.error(e.getMessage(), e);
// 重试消费,重发到Broker的RETRY TOPIC。 10s后Broker默认重新投递
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
//表示此批消息消费完成
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//Consumer对象在使用之前必须要调用start初始化,初始化一次即可
consumer.start();
LockSupport.park();
} catch (MQClientException e) {
logger.error(e.getMessage(), e);
} finally {
consumer.shutdown();
}
}
@Test
public void testPushConsumerMachineRoomB() {
String namesrvAddr = "rocketmq-nameserver1:9876;rocketmq-nameserver2:9876";
String consumerGroup = "ConsumerGroupNameMachineRoom";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
try {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setInstanceName("ConsumberMachineRoomB");
//修改ClientIP(模拟A机房的consumer),便于区分
consumer.setClientIP("MachineRoomB-" + RemotingUtil.getLocalAddress());
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeMessageBatchMaxSize(20);
consumer.setVipChannelEnabled(false);
String topic = "MachineTopic";
String tag = "TagA";
consumer.subscribe(topic, tag);
// 机房就近算法
AllocateMessageQueueAveragely averagely = new AllocateMessageQueueAveragely();
// 多机房解析器
AllocateMachineRoomNearby.MachineRoomResolver machineRoomResolver = new
AllocateMachineRoomNearby.MachineRoomResolver() {
@Override
public String brokerDeployIn(MessageQueue messageQueue) {
// brokerName约定是rocketmq-master1-MachineRoomA格式,所以取第三个
return messageQueue.getBrokerName().split("-")[2];
}
@Override
public String consumerDeployIn(String clientID) {
return clientID.split("-")[0];
}
};
consumer.setAllocateMessageQueueStrategy(new AllocateMachineRoomNearby(averagely, machineRoomResolver));
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt messageExt : msgs) {
try {
String topic1 = messageExt.getTopic();
String msg = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
String tags = messageExt.getTags();
logger.info("threadName:{},topic:{},tag:{},msg:{}", Thread.currentThread().getName(), topic1, tags, msg);
} catch (Exception e) {
logger.error(e.getMessage(), e);
// 重试消费,重发到Broker的RETRY TOPIC。 10s后Broker默认重新投递
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
//表示此批消息消费完成
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
//Consumer对象在使用之前必须要调用start初始化,初始化一次即可
consumer.start();
LockSupport.park();
} catch (MQClientException e) {
logger.error(e.getMessage(), e);
} finally {
consumer.shutdown();
}
}
}
查看消费情况
将B机房的消费者停掉,可以看到A机房的消费者可以正常消费