RocketMQ主要有四大组成部分:NameServer、Broker、Producer、Consumer
消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟
消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求
NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。
主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块
#Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
#Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
#Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
#HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
#Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
双主模式:多Master模式,无Slave
双主双从+同步模式:多Master多Slave模式,同步双写
双主双从+异步模式:多Master多Slave模式,异步复制
同步发送: 指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。
异步发送: 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包, 异步方式也需要Broker返回确认信息。
单向发送: 指只负责发送消息而不等待服务器回应且没有回调函数触发。
#RocketMQ 三种消息发送模式的使用场景:
同步发送:主要运用在比较重要一点消息传递/通知等业务:
异步发送:通常用于对发送消息响应时间要求更高/更快的场景:
单向发送:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
rocketmq 默认端口:9876(即nameserver端口)
非vip通道端口:10911
vip通道端口:10909
#10909是VIP通道对应的端口,在JAVA中的消费者对象或者是生产者对象中关闭VIP通道即可无需开放10909端口
#本实践部署的是RocketMQ的"双主双从+同步模式"
broker-a 主1
broker-b 主2
broker-a-s 从1
broker-b-s 从2
NameServer1 注册中心1
NameServer2 注册中心2
dashboard 可视化web界面
- #1.创建目录
- mkdir -p /opt/rocketmq
- mkdir -p /opt/rocketmq/namesrv1
- cd /opt/rocketmq/namesrv1
-
- #2.下载rocketmq
- wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
-
- #3.解压
- unzip rocketmq-all-4.9.3-bin-release.zip
- #1.创建目录
- mkdir -p /opt/rocketmq
- mkdir -p /opt/rocketmq/{master1,namesrv2,slave2,dashboard}
- cd /opt/rocketmq/namesrv2
-
- #2.下载rocketmq
- wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
-
- #3.解压
- unzip rocketmq-all-4.9.3-bin-release.zip
-
- #4.复制目录
- cp -r rocketmq-4.9.3 /opt/rocketmq/master1/
- cp -r rocketmq-4.9.3 /opt/rocketmq/slave2/
- #1.创建目录
- mkdir -p /opt/rocketmq
- mkdir -p /opt/rocketmq/{master2,slave1}
- cd /opt/rocketmq/master2
-
- #2.下载rocketmq
- wget https://archive.apache.org/dist/rocketmq/4.9.3/rocketmq-all-4.9.3-bin-release.zip
-
- #3.解压
- unzip rocketmq-all-4.9.3-bin-release.zip
-
- #4.复制目录
- cp -r rocketmq-4.9.3 /opt/rocketmq/slave1/
10.0.61.12上操作
- vim /opt/rocketmq/namesrv1/rocketmq-4.9.3/conf/logback_namesrv.xml
- #将配置文件中所有的${user.home}/logs更改为 /opt/rocketmq/namesrv1/logs
vim /opt/rocketmq/namesrv1/rocketmq-4.9.3/bin/runserver.sh
nohup sh /opt/rocketmq/namesrv1/rocketmq-4.9.3/bin/mqnamesrv -n 10.0.61.12:9876 autoCreateTopicEnable=true > /dev/null 2>&1 &
10.0.61.21上操作
- vim /opt/rocketmq/namesrv2/rocketmq-4.9.3/conf/logback_namesrv.xml
- #将配置文件中所有的${user.home}/logs更改为 /opt/rocketmq/namesrv2/logs
vim /opt/rocketmq/namesrv2/rocketmq-4.9.3/bin/runserver.sh
nohup sh /opt/rocketmq/namesrv2/rocketmq-4.9.3/bin/mqnamesrv -n 10.0.61.21:9876 autoCreateTopicEnable=true > /dev/null 2>&1 &
10.0.61.21上操作
- #runbroker.sh
- vim /opt/rocketmq/master1/rocketmq-4.9.3/bin/runbroker.sh
- #tools.sh
- vim /opt/rocketmq/master1/rocketmq-4.9.3/bin/tools.sh
将原来的配置内容全部注释掉(之后的properties配置文件都得这样)
- #broker-a.properties
- vim /opt/rocketmq/master1/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties
-
- brokerClusterName=rocketmq-cluster
- brokerName=broker-a
- brokerId=0
- namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
- brokerIP1=10.0.61.21
- brokerIP2=10.0.61.21
- defaultTopicQueueNums=4
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- listenPort=10911
- deleteWhen=04
- fileReservedTime=48
- mapedFileSizeCommitLog=1073741824
- mapedFileSizeConsumeQueue=300000
- diskMaxUsedSpaceRatio=88
- storePathRootDir=/opt/rocketmq/master1/data/store
- storePathCommitLog=/opt/rocketmq/master1/data/store/commitlog
- storePathConsumeQueue=/opt/rocketmq/master1/data/store/consumequeue
- storePathIndex=/opt/rocketmq/master1/data/store/index
- storeCheckpoint=/opt/rocketmq/master1/data/store/checkpoint
- abortFile=/opt/rocketmq/master1/data/store/abort
- maxMessageSize=65536
- brokerRole=SYNC_MASTER
- flushDiskType=SYNC_FLUSH
配置文件解释:
- #所属集群名字
- brokerClusterName=rocketmq-cluster
-
- #broker名字,注意此处不同的配置文件填写的不一样; 在broker-b.properties中此处需要修改为:brokerName=broker-b
- brokerName=broker-a
-
- #0: Master; >0: Slave
- brokerId=0
-
- #nameServer地址,分号分割
- namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
-
- #broker启动地址,rocketmq默认内网启动
- brokerIP1=10.0.61.21
-
- #broker的HAIP地址(供Slave同步消息的地址)
- brokerIP2=10.0.61.21
-
- #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
- defaultTopicQueueNums=4
-
- #是否允许Broker自动创建Topic,建议线下开启,线上关闭
- autoCreateTopicEnable=true
-
- #是否允许Broker自动创建订阅组,建议线下开启,线上关闭
- autoCreateSubscriptionGroup=true
-
- #Broker对外服务的监听端口
- listenPort=10911
-
- #删除文件时间点,默认凌晨4点
- deleteWhen=04
-
- #文件保留时间,默认48h
- fileReservedTime=48
-
- #commitLog每个文件的大小默认1G
- mapedFileSizeCommitLog=1073741824
-
- #ConsumeQueue每个文件默认存30W条,根据业务情况调整
- mapedFileSizeConsumeQueue=300000
-
- #检测物理文件磁盘空间
- diskMaxUsedSpaceRatio=88
-
- #存储路径
- storePathRootDir=/opt/rocketmq/master1/data/store
-
- #commitLog 存储路径
- storePathCommitLog=/opt/rocketmq/master1/data/store/commitlog
-
- #消费队列存储路径存储路径
- storePathConsumeQueue=/opt/rocketmq/master1/data/store/consumequeue
-
- #消息索引存储路径
- storePathIndex=/opt/rocketmq/master1/data/store/index
-
- #checkpoint 文件存储路径
- storeCheckpoint=/opt/rocketmq/master1/data/store/checkpoint
-
- #abort 文件存储路径
- abortFile=/opt/rocketmq/master1/data/store/abort
-
- #限制的消息大小
- maxMessageSize=65536
-
- #Broker角色: ASYNC_MASTER(异步复制Master)、SYNC_MASTER(同步双写Master)、SLAVE(从节点)
- brokerRole=SYNC_MASTER
-
- #刷盘方式: ASYNC_FLUSH(异步刷盘)、SYNC_FLUSH(同步刷盘)
- flushDiskType=SYNC_FLUSH
nohup sh /opt/rocketmq/master1/rocketmq-4.9.3/bin/mqbroker -c /opt/rocketmq/master1/rocketmq-4.9.3/conf/2m-2s-sync/broker-a.properties > /opt/rocketmq/master1/logs/master-broker1.log 2>&1 &
tail -f ~/logs/rocketmqlogs/broker.log
10.0.61.22上操作
- #runbroker.sh
- vim /opt/rocketmq/master2/rocketmq-4.9.3/bin/runbroker.sh
- #tools.sh
- vim /opt/rocketmq/master2/rocketmq-4.9.3/bin/tools.sh
- #broker-b.properties
- vim /opt/rocketmq/master2/rocketmq-4.9.3/conf/2m-2s-sync/broker-b.properties
-
- brokerClusterName=rocketmq-cluster
- brokerName=broker-b
- brokerId=0
- namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
- brokerIP1=10.0.61.22
- brokerIP2=10.0.61.22
- defaultTopicQueueNums=4
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- listenPort=10911
- deleteWhen=04
- fileReservedTime=48
- mapedFileSizeCommitLog=1073741824
- mapedFileSizeConsumeQueue=300000
- diskMaxUsedSpaceRatio=88
- storePathRootDir=/opt/rocketmq/master2/data/store
- storePathCommitLog=/opt/rocketmq/master2/data/store/commitlog
- storePathConsumeQueue=/opt/rocketmq/master2/data/store/consumequeue
- storePathIndex=/opt/rocketmq/master2/data/store/index
- storeCheckpoint=/opt/rocketmq/master2/data/store/checkpoint
- abortFile=/opt/rocketmq/master2/data/store/abort
- maxMessageSize=65536
- brokerRole=SYNC_MASTER
- flushDiskType=SYNC_FLUSH
nohup sh /opt/rocketmq/master2/rocketmq-4.9.3/bin/mqbroker -c /opt/rocketmq/master2/rocketmq-4.9.3/conf/2m-2s-sync/broker-b.properties > /opt/rocketmq/master2/logs/master-broker2.log 2>&1 &
tail -f ~/logs/rocketmqlogs/broker.log
10.0.61.22上操作
- #runbroker.sh
- vim /opt/rocketmq/slave1/rocketmq-4.9.3/bin/runbroker.sh
- #tools.sh
- vim /opt/rocketmq/slave1/rocketmq-4.9.3/bin/tools.sh
- #broker-a-s.properties
- vim /opt/rocketmq/slave1/rocketmq-4.9.3/conf/2m-2s-sync/broker-a-s.properties
-
- brokerClusterName=rocketmq-cluster
- brokerName=broker-a
- brokerId=70
- namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
- brokerIP1=10.0.61.22
- brokerIP2=10.0.61.22
- defaultTopicQueueNums=4
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- listenPort=11011
- deleteWhen=04
- fileReservedTime=48
- mapedFileSizeCommitLog=1073741824
- mapedFileSizeConsumeQueue=300000
- diskMaxUsedSpaceRatio=88
- storePathRootDir=/opt/rocketmq/slave1/data/store
- storePathCommitLog=/opt/rocketmq/slave1/data/store/commitlog
- storePathConsumeQueue=/opt/rocketmq/slave1/data/store/consumequeue
- storePathIndex=/opt/rocketmq/slave1/data/store/index
- storeCheckpoint=/opt/rocketmq/slave1/data/store/checkpoint
- abortFile=/opt/rocketmq/slave1/data/store/abort
- maxMessageSize=65536
- brokerRole=SLAVE
- flushDiskType=SYNC_FLUSH
nohup sh /opt/rocketmq/slave1/rocketmq-4.9.3/bin/mqbroker -c /opt/rocketmq/slave1/rocketmq-4.9.3/conf/2m-2s-sync/broker-a-s.properties > /opt/rocketmq/slave1/logs/slave1-broker-a-s.log 2>&1 &
tail -f ~/logs/rocketmqlogs/broker.log
10.0.61.21上操作
- #runbroker.sh
- vim /opt/rocketmq/slave2/rocketmq-4.9.3/bin/runbroker.sh
- #tools.sh
- vim /opt/rocketmq/slave2/rocketmq-4.9.3/bin/tools.sh
- #broker-a-s.properties
- vim /opt/rocketmq/slave2/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties
-
- brokerClusterName=rocketmq-cluster
- brokerName=broker-b
- brokerId=70
- namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
- brokerIP1=10.0.61.21
- brokerIP2=10.0.61.21
- defaultTopicQueueNums=4
- autoCreateTopicEnable=true
- autoCreateSubscriptionGroup=true
- listenPort=11011
- deleteWhen=04
- fileReservedTime=48
- mapedFileSizeCommitLog=1073741824
- mapedFileSizeConsumeQueue=300000
- diskMaxUsedSpaceRatio=88
- storePathRootDir=/opt/rocketmq/slave2/data/store
- storePathCommitLog=/opt/rocketmq/slave2/data/store/commitlog
- storePathConsumeQueue=/opt/rocketmq/slave2/data/store/consumequeue
- storePathIndex=/opt/rocketmq/slave2/data/store/index
- storeCheckpoint=/opt/rocketmq/slave2/data/store/checkpoint
- abortFile=/opt/rocketmq/slave2/data/store/abort
- maxMessageSize=65536
- brokerRole=SLAVE
- flushDiskType=SYNC_FLUSH
nohup sh /opt/rocketmq/slave2/rocketmq-4.9.3/bin/mqbroker -c /opt/rocketmq/slave2/rocketmq-4.9.3/conf/2m-2s-sync/broker-b-s.properties > /opt/rocketmq/slave2/logs/slave2-broker-b-s.log 2>&1 &
tail -f ~/logs/rocketmqlogs/broker.log
10.0.61.21上操作
地址:GitHub - rocketmq/rocketmq-externals: Mirror of Apache RocketMQ (Incubating)
- cd /opt/rocketmq/dashboard
- git clone git@gitcode.net:mirrors/rocketmq/rocketmq-externals.git
- #下载不下来可以下载tar包传到服务器再解压
- #application.properties
- vim /opt/rocketmq/dashboard/rocketmq-externals-master/rocketmq-console/src/main/resources/application.properties
-
- server.contextPath=
- server.port=30916
- #spring.application.index=true
- spring.application.name=rocketmq-console
- spring.http.encoding.charset=UTF-8
- spring.http.encoding.enabled=true
- spring.http.encoding.force=true
- logging.config=classpath:logback.xml
- #if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, you can set it in ops page.default localhost:9876
- rocketmq.config.namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
- #if you use rocketmq version < 3.5.8, rocketmq.config.isVIPChannel should be false.default true
- rocketmq.config.isVIPChannel=
- #rocketmq-console's data path:dashboard/monitor
- rocketmq.config.dataPath=/tmp/rocketmq-console/data
- #set it false if you don't want use dashboard.default true
- rocketmq.config.enableDashBoardCollect=true
- cd /opt/rocketmq/dashboard/rocketmq-externals-master/rocketmq-console
- mvn clean package -Dmaven.test.skip=true
nohup /opt/jdk/jdk1.8.0_333/bin/java -jar /opt/rocketmq/dashboard/rocketmq-externals-master/rocketmq-console/target/rocketmq-console-ng-1.0.0.jar >/opt/rocketmq/dashboard/console.log 2>&1 &
- vim /opt/rocketmq/master1/rocketmq-4.9.3/bin/tools.sh
-
- # 在export JAVA_HOME上面添加如下这段代码
- export NAMESRV_ADDR=10.0.61.12:9876
- cd /opt/rocketmq/master1/rocketmq-4.9.3/bin
-
- ./tools.sh org.apache.rocketmq.example.quickstart.Producer
./tools.sh org.apache.rocketmq.example.quickstart.Consumer
发送成功后我们可以到控制台查看消息和消费情况等信息