源码搭建前, 需要理解 RocketMQ
的四个重要组件, 以及 RocketMQ
的工作流程:
NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。
Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。
结合部署架构图,描述集群工作流程:
git clone https://github.com/apache/rocketmq.git
我的项目路径在 d:\yyr\zgp\rocketmq
mvn clean install -Dmaven.test.skip=true
我们主要关注上图中标注的 4 个模块
在 rocketmq 工程的根目录下, 新建 conf 目录(暂时不关注目录中的四个文件)
distribution
模块, 将 logback_namesrv.xml
文件放在 rocketmq/conf 目录下ROCKETMQ_HOME
, 类似我们装 JDK 一样, 需要配置 JAVA_HOME
。org/apache/rocketmq/namesrv/NamesrvStartup.java
模块当出现如下日志时, 通常可以 断定 是启动成功了。
1.打开 distribution
模块, 将 logback_broker.xml
以及 broker.conf
文件放在 rocketmq/conf 目录下
2.通过 IDEA 配置环境变量 ROCKETMQ_HOME
, 类似我们装 JDK 一样, 需要配置 JAVA_HOME
。并指定程序运行时需要读取的配置文件 broker.conf 的位置
启动类 org/apache/rocketmq/broker/BrokerStartup.java
当出现如下日志时, 通常可以 断定 是启动成功了。(很多文章会说控制台打印如下日志就代表 broker 运行成功了, 其实不然。后面会说明原因)
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// TopicTest 可以随意替换
DefaultMQProducer producer = new DefaultMQProducer("TopicTest");
// 指定 namesrv 地址, 默认端口是 9876
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 这里进行了修改, 只发送一条消息
for (int i = 0; i < 1; i++) {
try {
Message msg = new Message("zhangsan" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
消息发送成功
找不到名称为 “zhangsan” 的消息主题
问题出现的主要原因是 broker 没有注册到 namesvr, 要么没有指定 broker.conf 文件, 要么就是 broker.conf 配置文件中没有配置 namesvr 的地址。
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
autoCreateTopicEnable = true
namesrvAddr = localhost:9876 // 注意这里
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 与生产者保持一致
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TopicTest
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume.
*/
consumer.subscribe("TopicTest1", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
消息消费成功
如果源码环境搭建完成后, 消息始终无法消费,或者没有发送出去,但是又无法判断哪个环节出现了问题, 我们就可以搭建可视化工具, 通常情况下, 这样更容易找到哪个模块出现了问题。
rocketmq-console 的搭建非常简单。
https://github.com/apache/rocketmq-externals.git
http://localhost:8080/
这是验证 namesvr 和 broker 是否启动成功最简单的办法。
除此之外, 我们也可以看出来消息是否发送成功, 是否消费成功。