可在官网中选择版本下载zip:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.4/rocketmq-all-4.9.4-source-release.zip
首先配置环境变量,JDK1.8+rocketMq的都需要配置。
①vim /etc/profile
export JAVA_HOME=/usr/java/jdk1.8.0_131
export JRE_HOME=${JAVA_HOME}/jre
export JAVA_PATH=JAVAHOME/bin:{JRE_HOME}/bin
export PATH=PATH:{JAVA_PATHi}
export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
export PATH=$MAVEN_HOME/bin:$PATH
export ROCKETMQ_HOME=/opt/rocketmq-4.9.2
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
②source /etc/profile
注:
其中路径地址要再次校验是否正确,否则启动时会出现:
runserver.sh: No such file or directory
③nohup ./mqnamesrv &
④nohup ./mqbroker -n localhost:9876 & (其目的是能连接到指定的broker)
⑤jps验证是否有启动成功。如启动不成功也可查看nohup.out查看日志
⑥tools.sh org.apache.rocketmq.example.quickstart.Producer(测试发送消息)
注:报错:connect to null failed
解决办法:vim tools.sh 添加:export NAMESRV_ADDR=localhost:9876
⑦tools.sh org.apache.rocketmq.example.quickstart.Consumer(测试消费消息)
⑧如java客户端连接超时,则调整broker启动命令,在mq目录执行,如下:
nohup sh bin/mqbroker -n xxx.xxx.xxx.xxx:9876 autoCreateTopicEnable=true -c conf/broker.conf &
⑨如果需要使用Selector,需要提前在conf/brocker配置
brokerIP1 = xxx.xxx.xxx.xxx (定义brokerIP1)
namesrvAddr=xxx.xxx.xxx.xxx:9876(定义nameaddr的访问地址)
enablePropertyFilter=true(开启Selector相关)
①进入到这个dashboard仓库进行下载:https://github.com/apache/rocketmq-dashboard
②解压rocketmq-dashboard-master.zip
③进入到rocketmq-dashboard-master/src/main/resources 这个目录下,然后查看application.properties这个文件:
将“server.port=8080” 这个端口配置修改一下,修改成8888.
rocketmq.config.namesrvAddr=localhost:9876
④mvn clean package -Dmaven.test.skip=true
编译成功后,在rocketmq-console目录下会生成一个目录:target目录,该目录下有启动rocketmq界面的jar文件。
⑤启动:java -jar rocketmq-dashboard-2.0.0.jar 或 nohup java -jar rocketmq-dashboard-2.0.0.jar &
⑥http://xxx.xxx.xxx.xxx:8888
部署模型:
broker:
nameServer是服务发现者,集群中各个角色(producer,broker,consumer)都需要定时向nameServer上报自己的状态,以便互相发现彼此,超时不上报的话,nameServer会把它从列表中删除。
为什么不用zookeeper? ----rockermq希望为了提高性能,CAP定理,客户端负载均衡
producer:生产者 -----> master 主: 消息写入
slave 从: 发消息 ------> customer:消费者
producer:
consumer:
注意:由于master和slave都可以读取消息,因此consumer会与master和slave都建立连接
topic和queue的关系:
2pc: 2次提交 第1次提交时,不会立即生效,立即消费
RocketMQ事务消息用的就是2pc这种方式
tcc: try(锁住)–comfirm(确认)–cancel(取消)
producer----开启事务 ----发送消息(half message) ----broker:打标记—HFM发送确认
分布式系统中的事务可以使用2pc、TCC(try,confirm,cancle)来解决分布式系统中的消息原子性
RocketMQ4.3+提供分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致
RockMQ实现方式:
Half Message:预处理消息,当broker收到此消息后,会存储到RMQ_SYS_RANS_HALF_TOPIC的消息消费队列中
检查事务状态:broker会开启一个定时任务,消息RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会想向消息发送者确认执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
超时:如果超过回调次数,默认回滚消息
TransactionListener的两个方法:
executeLocalTransaction:半消息发送成功触发此方法来执行本地事务
checkLocalTransaction: brocker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
本地事务执行状态:
执行成功,确认提交:LocalTransactionState.COMMIT_MESSAGE
回滚消息,broker端会删除版消息:LocalTransactionState.ROLLBACK_MESSAGE
暂时为未知状态,等待broker回调:LocalTransactionState.UNKNOW
问题:producer.send()放入事务里,还是事务外?
producer
默认超时时间 private int sendMsgTimeOut = 30000;
同步发送时,重试次数:默认是2
producer.setRetryTimesWhenSendFaild(3);
异步发送时,重试次数: 默认是2
producer.setRetryTimeWhenSendAsyncFailed();
Consumer
消费超时 单位分钟
consumer.setConsumerTimeOut();
发送ack,消费失败
RECONSUMER_LATER
broker投递
只有在消息模式为MessageModelCLUSTERING集群模式时,broker才会自动进行重试,广播消息不重试,重投使用messageDelayLevel
默认值:
messageDelayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
消息丢失
SendResult
producer在发送同步/异步可靠消息后,会接收到SendResult,表示消息发送成功。
SendResult其中属性sendStatus表示了broker是否真正完成了消息存储
当sendStatus=‘ok’的时候,应该重新发送消息,避免丢失
消息重复消费
影响消息正常发送和消费的重要原因是网络的不确定性。
引起重复消费的原因:
ACK
正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息正常消费,从queue中剔除,
当ack因为网络原因无法发送到broker,broker会认为此条消息没有被消费,此后会被开启消息重投机制把消息再次投递到consumer
group
在SLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次(每个group的一个consumer),但是针对不同group的consumer会推送多次
解决方案:
数据库表
处理消息前,使用消息主键在表中带有约束的字段insert
Map
单机时可以使用map concurrentHashMap -->putIfAbsent
Redis
使用主键或set操作 比如用messageId作为主键
1.同一个topic(一个topic包含多个queue:数组 先进先出)
2.同一个queue
3.发消息的时候一个线程去发送消息
①消费的时候一个线程 消费一个queue里的消息
②多个queue只能保证单个queue里的顺序