• RocketMQ安装部署


    RocketMQ安装部署

    一、单机部署

    环境说明

    对于 RocketMQ 4.3.0版本,官方要求环境如下,其中 Git 用于从 GitHub 获取源码,没有安装也没关系,可以直接下载
    官方推荐的流程是:Linux 系统上安装 Git 工具 、Maven、Java JDK
    Git 工具用于直接从 GitHub 获取 RocketMQ 项目源码下载到 Linux 系统上
    然后 Maven 将 RocketMQ 源码进行编译成二进制文件
    安装了 Java JDK 就可以运行 RocketMQ 了

    1.1、下载

    ## 切换/opt/home目录下
    cd /opt/home
    
    ## 下载
    wget https://archive.apache.org/dist/rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip
    
    ## 项目解压
    unzip rocketmq-all-4.8.0-bin-release.zip
    
    ## 重命名
    mv rocketmq-all-4.8.0-bin-release rocketmq4.8.0
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    1.2、修改RocketMQ启动配置

    修改目录 /opt/home/rocketmq4.8.0/bin 下的 3 个配置文件: runserver.sh、runbroker.sh 、tools.sh不然会报insufficient memory

    cd /opt/home/rocketmq4.8.0/bin
    
    • 1

    1.2.1、runserver.sh

    vi runserver.sh	
    
    # JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
    • 1
    • 2
    • 3
    • 4

    1.2.2、runbroker.sh

    vi runbroker.sh
    
    # JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    
    • 1
    • 2
    • 3
    • 4

    1.2.3、tools.sh

    vi tools.sh
    
    # JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m"
    
    • 1
    • 2
    • 3
    • 4

    1.2.4、自动动创建Topic功能

    conf/broker.conf⽂件中加⼊如下配置,开启自动动创建Topic功能,不建议开启。

    autoCreateTopicEnable=true
    
    • 1

    1.3、RocketMQ 启动

    1.3.1、启动 NameServer

    进入 RocketMQ 安装目录下的/opt/home/rocketmq4.8.0/bin目录进行操作

    cd /opt/home/rocketmq4.8.0/bin
    
    • 1

    执行命令启动NameServer

    ## 创建日志目录 
    mkdir logs
    
    # nohup ./mqnamesrv &:属于后台以静默⽅式启动
    # ./mqnamesrv:属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
    
    nohup ./mqnamesrv > logs/mqnamesrv.out 2>1 &
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    查看启动状态,在当前目录下会有一个nohup.out的日志文件,可以打开查看。

    ## 查看日志
    tail -f logs/mqnamesrv.out
    
    ## 看到以下表示启动成功
    The Name Server boot success. serializeType=JSON
    
    • 1
    • 2
    • 3
    • 4
    • 5

    解决报错

    ## 报错
    ERROR: Please set the JAVA_HOME variable in your environment, We need java(x64)! !!
    
    ## 解决 配置jdk环境变量
    # 需要 export 环境变量
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.3.2、启动 Broker

    同样进入 RocketMQ 安装目录下的 /opt/home/rocketmq4.8.0/bin目录进行操作

    执行启动命令,并且常驻内存,注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问。

    # 启动命令,并且常驻内存:注意ip地址要配置成为服务的ip地址,保证地址以及端口能够访问
    # nohup ./mqbroker -n 192.168.0.101:9876 & :属于后台以静默⽅式启动
    # sh ./mqbroker -n 92.168.0.101:9876 :属于终端启动,直接输出日志信息,按 ctrl+c 可直接关闭退出
    
    nohup ./mqbroker -n 192.168.0.101:9876 > logs/mqbroker.out 2>1 &
    
    • 1
    • 2
    • 3
    • 4
    • 5

    查看启动状态,启动之后同样提示将日志信息追加到了当前目录下的nohup.out文件中。

    ## 查看日志
    tail -f logs/mqbroker.out
    
    ## 看到以下表示启动成功
    The broker[linux1, 192.168.0.101:10911] boot success. serializeType=JSON and name server is 192.168.0.101:9876
    
    • 1
    • 2
    • 3
    • 4
    • 5

    1.3.3、发送/接收消息测试

    发送/接收消息之前,需要告诉客户端(Producer、Consumer)名称服务器的位置,RocketMQ 提供了多种方法来实现这一点

    • 编程方式,如:producer.setNamesrvAddr("ip:port")
    • Java 选项,如:rocketmq.namesrv.addr
    • 环境变量,如:NAMESRV_ADDR
    • HTTP 端点

    发送消息

    export NAMESRV_ADDR=192.168.0.101:9876
    ./tools.sh org.apache.rocketmq.example.quickstart.Producer
    
    • 1
    • 2

    示例:

    [root@linux1 rocketmq]# export NAMESRV_ADDR=localhost:9876
    [root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k).  A new max generation size of 261632k will be used.
    16:19:05.806 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
    RocketMQLog:WARN Please initialize the logger system properly.
    SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F379FA0000, offsetMsgId=AC11000100002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=3], queueOffset=0]
    ......
    SendResult [sendStatus=SEND_OK, msgId=AC11000176396FF3C5B512F382B603E7, offsetMsgId=AC11000100002A9F00000000000317BF, messageQueue=MessageQueue [topic=TopicTest, brokerName=linux1, queueId=2], queueOffset=249]
    16:19:08.609 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[172.17.0.1:10911] result: true
    16:19:08.631 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    接收消息

    ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
    
    • 1

    示例

    [root@linux1 rocketmq]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    OpenJDK 64-Bit Server VM warning: MaxNewSize (262144k) is equal to or greater than the entire heap (262144k).  A new max generation size of 261632k will be used.
    16:21:15.395 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
    Consumer Started.
    ConsumeMessageThread_3 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=201, queueOffset=1, sysFlag=0, bornTimestamp=1659601146477, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146478, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F000000000000057F, commitLogOffset=1407, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275866, UNIQ_KEY=AC11000176396FF3C5B512F37A6D0007, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55], transactionId='null'}]] 
    ConsumeMessageThread_4 Receive New Messages: [MessageExt [brokerName=linux1, queueId=2, storeSize=202, queueOffset=2, sysFlag=0, bornTimestamp=1659601146500, bornHost=/192.168.0.101:48216, storeTimestamp=1659601146501, storeHost=/172.17.0.1:10911, msgId=AC11000100002A9F00000000000008A4, commitLogOffset=2212, bodyCRC=2088767104, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1659601275867, UNIQ_KEY=AC11000176396FF3C5B512F37A84000B, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49, 49], transactionId='null'}]] 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    消息发送完毕之后就会退出,在同一窗口中可以使用消费者类来进行接收消息,消费是多线程的。

    1.3.4、关闭服务

    与启动顺序相反进行关闭,先关闭 broker、在关闭 nameserv

    ./mqshutdown broker
    ./mqshutdown namesrv
    
    • 1
    • 2

    1.4、控制台的安装与启动

    RocketMQ有一个可视化的dashboard,通过该控制台可以直观的查看到很多数据。

    1.4.1、下载

    下载地址:https://github.com/apache/rocketmq-externals/tags

    下载rocketmq-console-1.0.0:https://github.com/apache/rocketmq-externals/archive/refs/tags/rocketmq-console-1.0.0.zip

    1.4.2、修改配置

    使用IDEA打开项目,修改其src/main/resources中的application.properties配置文件。

    • 原来的端口号为8080,如果8080端口被占用,修改为一个不常用的端口。
    • 指定RocketMQ的name server地址,集群环境为集群1:9876;集群2:9876;集群3:9876
    server.contextPath=
    server.port=7000
    #spring.application.index=true
    spring.application.name=rocketmq-console
    spring.http.encoding.charset=UTF-8
    spring.http.encoding.enabled=true
    spring.http.encoding.force=true
    logging.config=classpath:logback.xml
    #if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
    rocketmq.config.namesrvAddr=localhost:9876
    #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    rocketmq.config.isVIPChannel=
    #rocketmq-console's data path:dashboard/monitor
    rocketmq.config.dataPath=/tmp/rocketmq-console/data
    #set it false if you don't want use dashboard.default true
    rocketmq.config.enableDashBoardCollect=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    1.4.3、添加依赖

    在解压目录rocketmq-consolepom.xml中添加如下JAXB依赖。

    JAXB,Java Architechture for Xml Binding,用于XML绑定的Java技术,是一个业界标准,是一 项可以根据XML Schema生成Java类的技术。

    <dependency>
        <groupId>javax.xml.bindgroupId>
        <artifactId>jaxb-apiartifactId>
        <version>2.3.0version>
    dependency>
    <dependency>
        <groupId>com.sun.xml.bindgroupId>
        <artifactId>jaxb-implartifactId>
        <version>2.3.0version>
    dependency>
    <dependency>
        <groupId>com.sun.xml.bindgroupId>
        <artifactId>jaxb-coreartifactId>
        <version>2.3.0version>
    dependency>
    <dependency>
        <groupId>javax.activationgroupId>
        <artifactId>activationartifactId>
        <version>1.1.1version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    1.4.4、打包

    在rocketmq-console目录下,打开cmd窗口运行maven的打包命令。

    mvn clean package -Dmaven.test.skip=true
    
    • 1

    在这里插入图片描述

    1.4.5、启动

    这里为了方便,我将jar包上传到服务器,然后启动。

    nohup java -jar rocketmq-console-ng-1.0.0.jar > rocketmq-console.out 2>1 &
    
    • 1

    在这里插入图片描述

    1.4.6、访问

    访问地址:http://192.168.0.101:8080/#/
    在这里插入图片描述
    更换语言为中文

    二、集群搭建理论

    在这里插入图片描述

    2.1、数据复制与刷盘策略

    在这里插入图片描述

    2.1.1、复制策略

    复制策略是Broker的Master与Slave间的数据同步方式。分为同步复制与异步复制:

    • 同步复制:消息写入master后,master会等待slave同步数据成功后才向producer返回成功ACK。
    • 异步复制:消息写入master后,master立即向producer返回成功ACK,无需等待slave同步数据成功。

    异步复制策略会降低系统的写入延迟,RT变小,提高了系统的吞吐量。

    2.1.2、刷盘策略

    刷盘策略指的是broker中消息的落盘方式,即消息发送到broker内存后消息持久化到磁盘的方式。分为 同步刷盘与异步刷盘:

    • 同步刷盘:当消息持久化到broker的磁盘后才算是消息写入成功。
    • 异步刷盘:当消息写入到broker的内存后即表示消息写入成功,无需等待消息持久化到磁盘。

    1.异步刷盘策略会降低系统的写入延迟,RT变小,提高了系统的吞吐。
    2.消息写入到Broker的内存,一般是写入到了PageCache。
    3.对于异步 刷盘策略,消息会写入到PageCache后立即返回成功ACK。但并不会立即做落盘操 作,而是当PageCache到达一定量时会自动进行落盘。

    2.2、Broker集群模式

    为了追求更好的性能,RocketMQ的最佳实践方式都是在集群模式下完成。RocketMQ官方提供了三种集群搭建方式。

    • 2主2从异步通信方式
      使用异步方式进行主从之间的数据复制,吞吐量大,但可能会丢消息。
      使用conf/2m-2s-async文件夹内的配置⽂件做集群配置。
    • 2主2从同步通信方式
      使用同步方式进行主从之间的数据复制,保证消息安全投递,不会丢失,但影响吞吐量
      使用conf/2m-2s-sync文件夹内的配置文件做集群配置。
    • 2主无从方式
      会存在单点故障,且读的性能没有前两种方式好。
      使用conf/2m-noslave文件夹内的配置文件做集群配置。
    • Dledger高可用集群
      上述三种官方提供的集群没办法实现高可用,即在master节点挂掉后,slave节点没办法自动被选举为新的master,而需要人工实现。

    RocketMQ在4.5版本之后引⼊了第三方的Dleger⾼可用集群。

    三、搭建主从异步集群

    3.1、集群架构

    这里要搭建一个双主双从异步复制的Broker集群。为了方便,这里使用了两台主机来完成集群的搭建。 这两台主机的功能与broker角色分配如下表。

    服务器IPNameServerBROKER角色
    服务器1192.168.0.101192.168.0.101:9876broker-a(master),broker-b-s(slave)
    服务器2192.168.0.102192.168.0.102:9876broker-b(master),broker-a-s(slave)
    服务器3192.168.0.103192.168.0.103:9876

    三台服务器都需要安装JDK和RocketMQ,安装步骤参考如上。

    3.2、启动三台nameserver

    nameserver是⼀个轻量级的注册中心,broker把自己的信息注册到nameserver上。而且,nameserver是无状态的,直接启动即可。三台nameserver之间不需要通信,而是被请求方来关联三台nameserver的地址。

    在每台服务器的bin目录下执行如下命令:

    ## 切换到bin目录
    cd /opt/home/rocketmq4.8.0/bin
    
    ## 创建日志目录
    mkdir logs
    
    • 1
    • 2
    • 3
    • 4
    • 5

    服务器1

    ## 后台静默启动
    nohup ./mqnamesrv -n 192.168.0.101:9876 > logs/mqnamesrv.out 2>1 &
    
    • 1
    • 2

    服务器2

    ## 后台静默启动
    nohup ./mqnamesrv -n 192.168.0.102:9876 > logs/mqnamesrv.out 2>1 &
    
    • 1
    • 2

    服务器3

    ## 后台静默启动
    nohup ./mqnamesrv -n 192.168.0.103:9876 > logs/mqnamesrv.out 2>1 &
    
    • 1
    • 2

    3.3、配置broker

    broker-abroker-b-s这两台broker是配置在服务器1上,broker-bbroker-a-s这两台broker是配置在服务器2上。这两对主从节点在不同的服务器上,服务器3上没有部署broker。

    需要修改每台broker的配置文件。注意,同⼀台服务器上的两个broker保存路径不能⼀样。

    3.3.1、broker-a的master节点

    在服务器1上,进⼊到conf/2m-2s-async⽂件夹内,修改broker-a.properties⽂件。

    cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
    
    vi broker-a.properties
    
    # 指定整个broker集群的名称,或者说是RocketMQ集群的名称
    brokerClusterName=DefaultCluster
    # 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
    brokerName=broker-a
    # broker所在服务器的ip
    brokerIP1=192.168.0.101
    # broker的id,0表示master,>0表示slave
    brokerId=0
    # 指定删除消息存储过期文件的时间为凌晨4点
    deleteWhen=04
    # 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
    fileReservedTime=48
    # 指定当前broker为异步复制master
    brokerRole=ASYNC_MASTER
    # 指定刷盘策略为异步刷盘
    flushDiskType=ASYNC_FLUSH
    # 指定Name Server的地址
    namesrvAddr=192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876
    # 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
    defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # broker对外服务的监听端口
    listenPort=10911
    # abort文件存储路径
    abortFile=/data/rocketmq/store/abort
    # 消息存储路径
    storePathRootDir=/data/rocketmq/store
    # commitLog存储路径
    storePathCommitLog=/data/rocketmq/store/commitlog
    # 消费队列存储路径
    storePathConsumeQueue=/data/rocketmq/store/consumequeue
    # 消息索引存储路径
    storePathIndex=/data/rocketmq/store/index
    # checkpoint文件存储路径
    storeCheckpoint=/data/rocketmq/store/checkpoint
    # 限制的消息大小
    maxMessageSize=65536
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3.3.2、broker-a的slave节点

    在服务器2上,进⼊到conf/2m-2s-async文件夹内,修改broker-a-s.properties文件。

    cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
    
    vi broker-a-s.properties
    
    # 指定整个broker集群的名称,或者说是RocketMQ集群的名称
    brokerClusterName=DefaultCluster
    # 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
    brokerName=broker-a
    # broker所在服务器的ip
    brokerIP1=192.168.0.102
    # broker的id,0表示master,>0表示slave
    brokerId=1
    # 指定删除消息存储过期文件的时间为凌晨4点
    deleteWhen=04
    # 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
    fileReservedTime=48
    # 指定当前broker为异步复制master
    brokerRole=SLAVE
    # 指定刷盘策略为异步刷盘
    flushDiskType=ASYNC_FLUSH
    # 指定Name Server的地址
    namesrvAddr=192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876
    # 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
    defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # broker对外服务的监听端口
    listenPort==11011
    # abort文件存储路径
    abortFile=/data/rocketmq/store-slave/abort
    # 消息存储路径
    storePathRootDir=/data/rocketmq/store-slave
    # commitLog存储路径
    storePathCommitLog=/data/rocketmq/store-slave/commitlog
    # 消费队列存储路径
    storePathConsumeQueue=/data/rocketmq/store-slave/consumequeue
    # 消息索引存储路径
    storePathIndex=/data/rocketmq/store-slave/index
    # checkpoint文件存储路径
    storeCheckpoint=/data/rocketmq/store-slave/checkpoint
    # 限制的消息大小
    maxMessageSize=65536
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3.3.3、broker-b的master节点

    在服务器2上,进入到conf/2m-2s-async文件夹内,修改broker-b.properties文件。

    cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
    
    vi broker-b.properties
    
    # 指定整个broker集群的名称,或者说是RocketMQ集群的名称
    brokerClusterName=DefaultCluster
    # 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
    brokerName=broker-b
    # broker所在服务器的ip
    brokerIP1=192.168.0.102
    # broker的id,0表示master,>0表示slave
    brokerId=0
    # 指定删除消息存储过期文件的时间为凌晨4点
    deleteWhen=04
    # 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
    fileReservedTime=48
    # 指定当前broker为异步复制master
    brokerRole=ASYNC_MASTER
    # 指定刷盘策略为异步刷盘
    flushDiskType=ASYNC_FLUSH
    # 指定Name Server的地址
    namesrvAddr=192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876
    # 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
    defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # broker对外服务的监听端口
    listenPort=10911
    # abort文件存储路径
    abortFile=/data/rocketmq/store/abort
    # 消息存储路径
    storePathRootDir=/data/rocketmq/store
    # commitLog存储路径
    storePathCommitLog=/data/rocketmq/store/commitlog
    # 消费队列存储路径
    storePathConsumeQueue=/data/rocketmq/store/consumequeue
    # 消息索引存储路径
    storePathIndex=/data/rocketmq/store/index
    # checkpoint文件存储路径
    storeCheckpoint=/data/rocketmq/store/checkpoint
    # 限制的消息大小
    maxMessageSize=65536
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3.3.4、broker-b的slave节点

    在服务器1上,进入到conf/2m-2s-async文件夹内,修改broker-b-s.properties文件。

    cd /opt/home/rocketmq4.8.0/conf/2m-2s-async
    
    vi broker-b-s.properties
    
    # 指定整个broker集群的名称,或者说是RocketMQ集群的名称
    brokerClusterName=DefaultCluster
    # 指定master-slave集群的名称。一个RocketMQ集群可以包含多个master-slave集群
    brokerName=broker-b
    # broker所在服务器的ip
    brokerIP1=192.168.0.101
    # broker的id,0表示master,>0表示slave
    brokerId=1
    # 指定删除消息存储过期文件的时间为凌晨4点
    deleteWhen=04
    # 指定未发生更新的消息存储文件的保留时长为48小时,48小时后过期,将会被删除
    fileReservedTime=48
    # 指定当前broker为异步复制master
    brokerRole=SLAVE
    # 指定刷盘策略为异步刷盘
    flushDiskType=ASYNC_FLUSH
    # 指定Name Server的地址
    namesrvAddr=192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876
    # 在发送消息⾃动创建不存在的topic时,默认创建的队列数为4个
    defaultTopicQueueNums=4
    # 是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    # broker对外服务的监听端口
    listenPort=11011
    # abort文件存储路径
    abortFile=/data/rocketmq/store-slave/abort
    # 消息存储路径
    storePathRootDir=/data/rocketmq/store-slave
    # commitLog存储路径
    storePathCommitLog=/data/rocketmq/store-slave/commitlog
    # 消费队列存储路径
    storePathConsumeQueue=/data/rocketmq/store-slave/consumequeue
    # 消息索引存储路径
    storePathIndex=/data/rocketmq/store-slave/index
    # checkpoint文件存储路径
    storeCheckpoint=/data/rocketmq/store-slave/checkpoint
    # 限制的消息大小
    maxMessageSize=65536
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    3.3.5、启动broker

    在服务器1中启动broker-a(master)broker-b-s(slave)

    cd /opt/home/rocketmq4.8.0/bin
    nohup ./mqbroker -c ../conf/2m-2s-async/broker-a.properties > logs/broker-a.out 2>1 &
    nohup ./mqbroker -c ../conf/2m-2s-async/broker-b-s.properties > logs/broker-b-s.out 2>1 &
    
    • 1
    • 2
    • 3

    在服务器2中启动broker-b(master)broker-a-s(slave)

    cd /opt/home/rocketmq4.8.0/bin
    nohup ./mqbroker -c ../conf/2m-2s-async/broker-b.properties > logs/broker-b.out 2>1 &
    nohup ./mqbroker -c ../conf/2m-2s-async/broker-a-s.properties > logs/broker-a-s.out 2>1 &
    
    • 1
    • 2
    • 3

    3.4、验证集群

    使用RocketMQ提供的tools工具验证集群是否正常工作。

    在服务器1上配置环境变量,用于被tools中的生产者和消费者程序读取该变量。

    ## 配置环境变量
    export NAMESRV_ADDR='192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876'
    
    ## 环境变量生效
    source /etc/profile
    
    • 1
    • 2
    • 3
    • 4
    • 5

    启动生产者

    ./tools.sh org.apache.rocketmq.example.quickstart.Producer
    
    • 1

    启动消费者

    ./tools.sh org.apache.rocketmq.example.quickstart.Consumer
    
    • 1

    四、mqadmin管理工具

    RocketMQ提供了命令工具用于管理topic、broker、集群、消息等。比如可以使用mqadmin创建topic:

    该命令在官网中有详细的用法解释:https://github.com/apache/rocketmq/tree/master/docs/cn

    ./mqadmin updateTopic -n 192.168.0.101:9876 -c DefaultCluster -t myTopic1
    
    • 1

    下面提供了mqadmin工具的各种命令。

    4.1、创建topic:updateTopic

    参数是否必填说明
    -b如果-c为空,则必填broker 地址,表示topic 建在该broker
    -c如果-b为空,则必填cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…
    -p指定新topic 的权限限制( W|R|WR )
    -r可读队列数(默认为8)
    -w可写队列数(默认为8)
    -ttopic 名称(名称只能使用字符 1+$ )

    4.2、删除Topic:deleteTopic

    参数是否必填说明
    -ccluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…
    -ttopic 名称(名称只能使用字符 2+$ )

    4.3、创建(修订)订阅组:updateSubGroup

    参数是否必填说明
    -b如果-c为空,则必填broker 地址,表示topic 建在该broker
    -c如果-b为空,则必填cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)
    -d是否容许广播方式消费
    -g订阅组名
    -i从哪个broker 开始消费
    -m是否容许从队列的最小位置开始消费,默认会设置为false
    -q消费失败的消息放到⼀个重试队列,每个订阅组配置几个重试队列
    -r重试消费最大次数,超过则投递到死信队列,不再投递,并报警
    -s消费功能是否开启
    -w发现消息堆积后后,将Consumer 的消费请求重定向到另外⼀台Slave 机器
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.4、删除订阅组配置:deleteSubGroup

    参数是否必填说明
    -b如果-c为空,则必填broker 地址,表示topic 建在该broker
    -c如果-b为空,则必填cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)
    -g订阅组名
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.5、更新Broker 配置⽂件:updateBrokerConfig

    参数是否必填说明
    -b如果-c为空,则必填broker 地址,表示topic 建在该broker
    -c如果-b为空,则必填cluster 名称,表示topic 建在该集群(集群可通过clusterList 查询)
    -kkey 值
    -vvalue 值
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.6、查看Topic 列表信息:topicList

    参数是否必填说明
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.7、查看Topic 路由信息:topicRoute

    参数是否必填说明
    -ttopic 名称
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.8、查看Topic 统计信息:topicStats

    参数是否必填说明
    -ttopic 名称
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.9、查看Broker 统计信息:brokerStats

    参数是否必填说明
    -bbroker 地址
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.10、根据消息ID 查询消息:queryMsgById

    参数是否必填说明
    -i消息id
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.11、根据消息Key 查询消息:queryMsgByKey

    参数是否必填说明
    -f被查询消息的止时间
    -kmsgKey
    -ttopic 名称
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.12、根据Offset 查询消息:queryMsgByOffset

    参数是否必填说明
    -bBroker 名称,表示订阅组建在该broker(这里需要注意填写的是 broker 的名称,不是broker 的地址,broker名称可以在clusterList查到)
    -iquery 队列id
    -ooffset 值
    -ttopic 名称
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.13、查询Producer 的网络连接:producerConnection

    参数是否必填说明
    -g生产者所属组名
    -ttopic 名称
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.14、查询Consumer 的⽹络连接:consumerConnection

    参数是否必填说明
    -g生产者所属组名
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.15、查看订阅组消费状态:consumerProgress

    参数是否必填说明
    -g生产者所属组名
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.16、查看集群消息:clusterList

    参数是否必填说明
    -m打印更多信息
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.18、添加(更新)KV 配置信息:updateKvConfig

    参数是否必填说明
    -kkey 值
    -vvalue 值
    -sNamespace 值
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.19、删除KV 配置信息:deleteKvConfig

    参数是否必填说明
    -kkey 值
    -sNamespace 值
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.20、添加(更新)Project group 配置信息:updateProjectGroup

    参数是否必填说明
    -pproject group 名
    -i服务器ip
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.21、删除Project group 配置信息:deleteProjectGroup

    参数是否必填说明
    -pproject group 名
    -i服务器ip
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.22、取得Project group 配置信息:getProjectGroup

    参数是否必填说明
    -pproject group 名
    -i服务器ip
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.23、设置消费进度:resetOffsetByTime

    根据时间来设置消费进度,设置之前要关闭这个订阅组的所有consumer,设置完再启动,方可生效。

    参数是否必填说明
    -f通过时间戳强制回滚(true|false),默认为true
    -s时间戳
    -g消费者所属组名
    -ttopic 名称
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.24、清除特定Broker权限:wipeWritePerm

    参数是否必填说明
    -bbroker 地址
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    4.25、获取Consumer消费进度:getConsumerStatus

    该命令只打印当前与cluster 连接的consumer 的消费进度

    参数是否必填说明
    -g消费者所属组名
    -t查询主题
    -iConsumer 客户端ip
    -h打印帮助
    -nnameserve 服务地址列表,格式ip:port;ip:port;…

    五、安装可视化管理控制平台

    按照1.4进行操作,修改其src/main/resources中的application.properties配置文件。

    server.contextPath=
    server.port=8080
    #spring.application.index=true
    spring.application.name=rocketmq-console
    spring.http.encoding.charset=UTF-8
    spring.http.encoding.enabled=true
    spring.http.encoding.force=true
    logging.config=classpath:logback.xml
    #if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
    rocketmq.config.namesrvAddr=192.168.0.101:9876;192.168.0.102:9876;192.168.0.103:9876
    #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
    rocketmq.config.isVIPChannel=
    #rocketmq-console's data path:dashboard/monitor
    rocketmq.config.dataPath=/tmp/rocketmq-console/data
    #set it false if you don't want use dashboard.default true
    rocketmq.config.enableDashBoardCollect=true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在rocketmq-console目录下,打开cmd窗口运行maven的打包命令。

    mvn clean package -Dmaven.test.skip=true
    
    • 1

    这里为了方便,我将jar包上传到服务器,然后启动。

    nohup java -jar rocketmq-console-ng-1.0.0.jar > rocketmq-console.out 2>1 &
    
    • 1

    访问地址:http://192.168.0.101:8080/#/
    在这里插入图片描述


    1. a-zA-Z0-9_- ↩︎

    2. a-zA-Z0-9_- ↩︎

  • 相关阅读:
    中英文说明书丨艾美捷无内毒素卵清蛋白参数和应用
    WPS ppt怎么设置自动播放?wps ppt如何设置自动放映?
    【笔记】Ningx(9)HTTPS
    Java Math.toRadians()具有什么功能呢?
    【电商】电商后台系统整体介绍
    Java,设计,功能权限和数据权限,用户、角色、权限和用户组
    【JavaScript复习十三】数组方法三
    lvs负载均衡之配置lvs-tun模式的httpd负载集群
    DataEase 介绍、使用技巧
    DASCTF 2022十月挑战赛 web
  • 原文地址:https://blog.csdn.net/qq_37242720/article/details/126567047