• 基于 IDEA 搭建 RocketMQ-4.6 源码环境


    RocketMQ 架构

    源码搭建前, 需要理解 RocketMQ 的四个重要组件, 以及 RocketMQ 的工作流程:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NsC5WRMG-1668600773110)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/18fd4bb6a5994ffd8e0f47dc49441784~tplv-k3u1fbpfcp-watermark.image?)]

    • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

    • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

    • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

    • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

    结合部署架构图,描述集群工作流程:

    • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
    • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
    • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
    • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
    • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

    准备环境

    1. JDK1.8
    2. IDEA.2022
    3. RocketMQ-4.6
    4. windows11

    构建源码

    1. 拉取源码:

    git clone https://github.com/apache/rocketmq.git

    我的项目路径在 d:\yyr\zgp\rocketmq

    1. 执行 maven 命令

    mvn clean install -Dmaven.test.skip=true

    在这里插入图片描述

    我们主要关注上图中标注的 4 个模块

    • broker
    • namesrv
    • example
    • distribution

    运行程序

    0. 准备工作

    在 rocketmq 工程的根目录下, 新建 conf 目录(暂时不关注目录中的四个文件)

    在这里插入图片描述

    1. 运行 NameServer
    1. 打开 distribution 模块, 将 logback_namesrv.xml 文件放在 rocketmq/conf 目录下

    在这里插入图片描述

    1. 首先需要,通过 IDEA 配置环境变量 ROCKETMQ_HOME, 类似我们装 JDK 一样, 需要配置 JAVA_HOME

    在这里插入图片描述

    1. 运行 namesrv 模块启动类 org/apache/rocketmq/namesrv/NamesrvStartup.java 模块

    当出现如下日志时, 通常可以 断定 是启动成功了。

    在这里插入图片描述

    2. 运行 Broker

    1.打开 distribution 模块, 将 logback_broker.xml 以及 broker.conf 文件放在 rocketmq/conf 目录下

    在这里插入图片描述

    2.通过 IDEA 配置环境变量 ROCKETMQ_HOME, 类似我们装 JDK 一样, 需要配置 JAVA_HOME。并指定程序运行时需要读取的配置文件 broker.conf 的位置

    在这里插入图片描述

    1. 运行 broker 模块

    启动类 org/apache/rocketmq/broker/BrokerStartup.java

    当出现如下日志时, 通常可以 断定 是启动成功了。(很多文章会说控制台打印如下日志就代表 broker 运行成功了, 其实不然。后面会说明原因

    在这里插入图片描述

    3. 运行消息生产者 Producer

    在这里插入图片描述

    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
    
            // TopicTest 可以随意替换
            DefaultMQProducer producer = new DefaultMQProducer("TopicTest");
            // 指定 namesrv 地址, 默认端口是 9876
            producer.setNamesrvAddr("localhost:9876");
          
            producer.start();
    
            // 这里进行了修改, 只发送一条消息
            for (int i = 0; i < 1; i++) {
                try {
                    Message msg = new Message("zhangsan" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                    );
    
                    SendResult sendResult = producer.send(msg);
    
                    System.out.printf("%s%n", sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
         
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    消息发送成功

    在这里插入图片描述

    可能会出现的问题

    找不到名称为 “zhangsan” 的消息主题

    在这里插入图片描述

    问题出现的主要原因是 broker 没有注册到 namesvr, 要么没有指定 broker.conf 文件, 要么就是 broker.conf 配置文件中没有配置 namesvr 的地址。

    在这里插入图片描述

    brokerClusterName = DefaultCluster
    brokerName = broker-a
    brokerId = 0
    deleteWhen = 04
    fileReservedTime = 48
    brokerRole = ASYNC_MASTER
    flushDiskType = ASYNC_FLUSH
    autoCreateTopicEnable = true
    namesrvAddr = localhost:9876 // 注意这里
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    4. 运行 Consumer

    在这里插入图片描述

    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // 与生产者保持一致
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TopicTest
            consumer.setNamesrvAddr("localhost:9876");
    
           
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            /*
             * Subscribe one more more topics to consume.
             */
            consumer.subscribe("TopicTest1", "*");
    
            /*
             *  Register callback to execute on arrival of messages fetched from brokers.
             */
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            /*
             *  Launch the consumer instance.
             */
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    消息消费成功

    在这里插入图片描述

    5. 安装可视化工具 rocketmq-console

    如果源码环境搭建完成后, 消息始终无法消费,或者没有发送出去,但是又无法判断哪个环节出现了问题, 我们就可以搭建可视化工具, 通常情况下, 这样更容易找到哪个模块出现了问题。

    rocketmq-console 的搭建非常简单。

    1. 克隆源码

    https://github.com/apache/rocketmq-externals.git

    1. 切换到对应的分支

    在这里插入图片描述

    1. 修改 application.properties 文件, 添加 namesvr 地址

    在这里插入图片描述

    1. 运行并访问

    http://localhost:8080/

    这是验证 namesvr 和 broker 是否启动成功最简单的办法。
    除此之外, 我们也可以看出来消息是否发送成功, 是否消费成功。

    在这里插入图片描述

  • 相关阅读:
    【tensorflow2.6】图片数据建模流程:猫狗分类,83.6%识别率
    dubbo面试题
    【精讲】本地存储插件、前端svg动态波动图的、项目日常总结
    使用赋值方法画图形
    flutter_学习记录_02底部 Tab 切换保持页面状态的几种方法
    【Java】深拷贝和浅拷贝,Cloneable接口
    python基于django的智能短视频推荐系统 nodejs 前后端分离
    Java基础进阶List-ArrayList集合
    什么是视图
    UDP协议结构及其注意事项
  • 原文地址:https://blog.csdn.net/cnm10050/article/details/127892412