• 浅谈Rocket_MQ笔记


    浅谈Rocket_MQ笔记

    1.安装与启动

    可在官网中选择版本下载zip:

    https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.9.4/rocketmq-all-4.9.4-source-release.zip

    首先配置环境变量,JDK1.8+rocketMq的都需要配置。
    ①vim /etc/profile

    export JAVA_HOME=/usr/java/jdk1.8.0_131
    export JRE_HOME=${JAVA_HOME}/jre
    export JAVA_PATH=JAVAHOME/bin:{JRE_HOME}/bin
    export PATH=PATH:{JAVA_PATHi}
    export PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
    export PATH=$MAVEN_HOME/bin:$PATH
    export ROCKETMQ_HOME=/opt/rocketmq-4.9.2
    export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    ②source /etc/profile
    注:

    其中路径地址要再次校验是否正确,否则启动时会出现:
    runserver.sh: No such file or directory

    nohup ./mqnamesrv &
    ④nohup ./mqbroker -n localhost:9876 & (其目的是能连接到指定的broker)
    ⑤jps验证是否有启动成功。如启动不成功也可查看nohup.out查看日志
    ⑥tools.sh org.apache.rocketmq.example.quickstart.Producer(测试发送消息)
    注:报错:connect to null failed
    解决办法:vim tools.sh 添加:export NAMESRV_ADDR=localhost:9876
    ⑦tools.sh org.apache.rocketmq.example.quickstart.Consumer(测试消费消息)
    ⑧如java客户端连接超时,则调整broker启动命令,在mq目录执行,如下:

    nohup sh bin/mqbroker -n xxx.xxx.xxx.xxx:9876 autoCreateTopicEnable=true -c conf/broker.conf &

    ⑨如果需要使用Selector,需要提前在conf/brocker配置

    brokerIP1 = xxx.xxx.xxx.xxx (定义brokerIP1)
    namesrvAddr=xxx.xxx.xxx.xxx:9876(定义nameaddr的访问地址)
    enablePropertyFilter=true(开启Selector相关)

    2.控制台安装与启动

    ①进入到这个dashboard仓库进行下载:https://github.com/apache/rocketmq-dashboard
    ②解压rocketmq-dashboard-master.zip
    ③进入到rocketmq-dashboard-master/src/main/resources 这个目录下,然后查看application.properties这个文件:

    将“server.port=8080” 这个端口配置修改一下,修改成8888.
    rocketmq.config.namesrvAddr=localhost:9876

    ④mvn clean package -Dmaven.test.skip=true
    编译成功后,在rocketmq-console目录下会生成一个目录:target目录,该目录下有启动rocketmq界面的jar文件。
    ⑤启动:java -jar rocketmq-dashboard-2.0.0.jar 或 nohup java -jar rocketmq-dashboard-2.0.0.jar &
    ⑥http://xxx.xxx.xxx.xxx:8888

    3.RocketMQ角色

    在这里插入图片描述
    部署模型:
    在这里插入图片描述
    broker:

    • broker面向producer和consumer接收和发送消息
    • 面向nameServer提交自己的信息
    • 是消息中间件的消息存储、转发服务器
    • 每个broker节点,在启动时,都会遍历NameServer列表,与每个NameServer建立连接,注册自己的信息之后定时上报
      NameServer:注册中心 == zookeeper
    • 服务注册
    • 服务发现
    • 路由管理

    nameServer是服务发现者,集群中各个角色(producer,broker,consumer)都需要定时向nameServer上报自己的状态,以便互相发现彼此,超时不上报的话,nameServer会把它从列表中删除。

    为什么不用zookeeper? ----rockermq希望为了提高性能,CAP定理,客户端负载均衡

    producer:生产者 -----> master 主: 消息写入

    ​ slave 从: 发消息 ------> customer:消费者

    producer:

    • 消息的生产者
    • 通过集群中的其中一个节点建立长连接,获得topic的路由信息,包括topic下面有哪些queue,这些queue分部在哪些broker上

    consumer:

    • 消息的消费者
    • 通过NameServer集群获得topic的路由信息,连接到对应的broker上消费消息。

    ​ 注意:由于master和slave都可以读取消息,因此consumer会与master和slave都建立连接

    topic和queue的关系:
    在这里插入图片描述

    4.RocketMQ事务消息

    在这里插入图片描述
    在这里插入图片描述
    2pc: 2次提交 第1次提交时,不会立即生效,立即消费

    ​ RocketMQ事务消息用的就是2pc这种方式

    tcc: try(锁住)–comfirm(确认)–cancel(取消)

    producer----开启事务 ----发送消息(half message) ----broker:打标记—HFM发送确认

    分布式系统中的事务可以使用2pc、TCC(try,confirm,cancle)来解决分布式系统中的消息原子性

    RocketMQ4.3+提供分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致

    RockMQ实现方式:

    ​ Half Message:预处理消息,当broker收到此消息后,会存储到RMQ_SYS_RANS_HALF_TOPIC的消息消费队列中

    ​ 检查事务状态:broker会开启一个定时任务,消息RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会想向消息发送者确认执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

    ​ 超时:如果超过回调次数,默认回滚消息
    TransactionListener的两个方法:

    ​ executeLocalTransaction:半消息发送成功触发此方法来执行本地事务

    ​ checkLocalTransaction: brocker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态

    本地事务执行状态:

    ​ 执行成功,确认提交:LocalTransactionState.COMMIT_MESSAGE

    ​ 回滚消息,broker端会删除版消息:LocalTransactionState.ROLLBACK_MESSAGE

    ​ 暂时为未知状态,等待broker回调:LocalTransactionState.UNKNOW

    ​ 问题:producer.send()放入事务里,还是事务外?

    5.重试机制

    producer

    ​ 默认超时时间 private int sendMsgTimeOut = 30000;

    同步发送时,重试次数:默认是2

    ​ producer.setRetryTimesWhenSendFaild(3);

    异步发送时,重试次数: 默认是2

    ​ producer.setRetryTimeWhenSendAsyncFailed();

    Consumer

    ​ 消费超时 单位分钟

    ​ consumer.setConsumerTimeOut();

    ​ 发送ack,消费失败

    ​ RECONSUMER_LATER

    broker投递

    ​ 只有在消息模式为MessageModelCLUSTERING集群模式时,broker才会自动进行重试,广播消息不重试,重投使用messageDelayLevel
    默认值:

    ​ messageDelayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    6.重复消费

    在这里插入图片描述
    消息丢失

    SendResult

    ​ producer在发送同步/异步可靠消息后,会接收到SendResult,表示消息发送成功。

    ​ SendResult其中属性sendStatus表示了broker是否真正完成了消息存储

    ​ 当sendStatus=‘ok’的时候,应该重新发送消息,避免丢失

    消息重复消费
    ​ 影响消息正常发送和消费的重要原因是网络的不确定性。

    引起重复消费的原因:

    ​ ACK

    ​ 正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息正常消费,从queue中剔除,

    ​ 当ack因为网络原因无法发送到broker,broker会认为此条消息没有被消费,此后会被开启消息重投机制把消息再次投递到consumer

    ​ group

    ​ 在SLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次(每个group的一个consumer),但是针对不同group的consumer会推送多次

    ​ 解决方案:

    ​ 数据库表

    ​ 处理消息前,使用消息主键在表中带有约束的字段insert

    ​ Map

    ​ 单机时可以使用map concurrentHashMap -->putIfAbsent

    ​ Redis

    ​ 使用主键或set操作 比如用messageId作为主键

    7.顺序消费

    在这里插入图片描述
    1.同一个topic(一个topic包含多个queue:数组 先进先出)

    2.同一个queue

    3.发消息的时候一个线程去发送消息

    ①消费的时候一个线程 消费一个queue里的消息
    ②多个queue只能保证单个queue里的顺序

  • 相关阅读:
    SPARK基本编程
    mysql docker 安装
    ARM如何利用PMU的Cycle Counter(时钟周期)来计算出CPU的时钟频率
    adb无线模式连接设备(包含多台设备无线连接)
    第二章 使用管理门户(二)
    嵌入式分享合集104
    踹掉后端,前端导出Excel!
    【分享】集简云嵌入方案介绍
    WalkMe的数字用户体验到底是啥
    Docker命令-build
  • 原文地址:https://blog.csdn.net/weixin_44653248/article/details/126814555