源码拉取:
RocketMQ的官方Git仓库地址:GitHub - apache/rocketmq: Mirror of Apache RocketMQ 可以用git把项目clone下来或者直接下载代码包。也可以到RocketMQ的官方网站上下载指定版本的码: Downloading the Apache RocketMQ Releases - Apache RocketMQ
RocketMQ源码下载
下载后就可以解压导入到IDEA中进行解读了。我们只要注意下是下载的4.7.1版本就行了。
源码下很多的功能模块,很容易让人迷失方向,我们只关注下几个最为重要的模块:
broker: broker 模块(broke 启动进程)
client :消息客户端,包含消息生产者、消息消费者相关类
example: RocketMQ 例代码
namesrv:NameServer实现相关类(NameServer启动进程)
store:消息存储实现相关类
各个模块的功能大都从名字上就能看懂。我们可以在有需要的时候再进去看源码。但是这些模块有些东西还是要关注的。例如docs文件夹下的文档,以及各个模块下都有非常丰富的junit测试代码,这些都是非常有用的。
将源码导入IDEA后,需要先对源码进行编译。编译指令 clean install -Dmaven.test.skip=true
编译完成后就可以开始调试代码了。调试时需要按照以下步骤:
调试时,先在项目目录下创建一个conf目录,并从distribution拷贝broker.conf和logback_broker.xml和logback_namesrv.xml
源码6
注解版源码中已经复制好了。
展开namesrv模块,运行NamesrvStartup类即可启动NameServer,启动配置启动参数 或者代码设置
配置完成后,再次执行,看到以下日志内容,表示NameServer启动成功
The Name Server boot success. serializeType=JSON
启动Broker之前,我们需要先修改之前复制的broker.conf文件
- # Licensed to the Apache Software Foundation (ASF) under one or more
- # contributor license agreements. See the NOTICE file distributed with
- # this work for additional information regarding copyright ownership.
- # The ASF licenses this file to You under the Apache License, Version 2.0
- # (the "License"); you may not use this file except in compliance with
- # the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
-
- brokerClusterName = DefaultCluster
- brokerName = broker-a
- brokerId = 0
- deleteWhen = 04
- fileReservedTime = 48
- brokerRole = ASYNC_MASTER
- flushDiskType = ASYNC_FLUSH
-
- # 自动创建Topic
- autoCreateTopicEnable=true
- # nameServ地址
- namesrvAddr=localhost:9876
- # 存储路径
- storePathRootDir= storerocketmq
- # commitLog路径
- storePathCommitLog=storerocketmq\commitlog
- # 消息队列存储路径
- storePathConsumeQueue=storerocketmq\consumequeue
- # 消息索引存储路径
- storePathIndex=storerocketmq\index
- # checkpoint文件路径
- storeCheckpoint=storerocketmq\checkpoint
- # abort文件存储路径
- abortFile=storerocketmq\abort
然后Broker的启动类是broker模块下的BrokerStartup。
启动Broker时,同样需要ROCETMQ_HOME环境变量,并且还需要配置一个-c 参数,指向broker.conf配置文件。
The broker[LAPTOP-M42V5TBM, 192.168.3.12:10911] boot success. serializeType=JSON
在源码的example模块下,提供了非常详细的测试代码。例如我们启动example模块下的org.apache.rocketmq.example.quickstart.Producer类即可发送消息。
但是在测试源码中,需要指定NameServer地址。这个NameServer地址有两种指定方式,一种是配置一个NAMESRV_ADDR的环境变量。另一种是在源码中指定。我们可以在源码中加一行代码指定NameServer
producer.setNamesrvAddr("127.0.0.1:9876");
然后就可以发送消息了。
我们可以使用同一模块下的org.apache.rocketmq.example.quickstart.Consumer类来消费消息。运行时同样需要指定NameServer地址
consumer.setNamesrvAddr("127.0.0.1:9876");