• RocketMQ 入门


    RocketMQ 入门

    有一说一,阿里巴巴在IT届的贡献是真的大,不得不有敬畏之心;

    消息中间件是大公司必备的技能,不得不学!刚好抽时间把三款中间件学习一下,再横向对比一下!

    什么事消息中间件?这个问题很基础,简单解释一下:

    MQ介绍

    1. 什么是MQ,为什么要用MQ?

    主要有以下三点:异步解耦削峰

    • 异步:12306春节买票期间,产生大量的短信发送请求;如果同步发送短信的话;就处理不了高并发了;采用MQ将短信放到消息队列;用专门的消费者来发送短信;

    • 解耦

      1、服务之间进行解耦,才可以减少服务之间的影响。提高系统整体的稳定性以及可扩展性。

      2、另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消

      费,并且消费者的增加或者减少对生产者没有影响。

    • 削峰:双十一期间大批量订单产生,为了防止服务崩溃,使用消息队列存储请求;消费者分批次获取定量请求处理;达到削峰的效果;

    2、 MQ优缺点

    引入一个框架必然要接受系统的复杂性提高;

    以前送快递直接送到家,当引入快递驿站之后;流程就会变得复杂起来;

    • 可用性:快递驿站宕机后就拿不到快递了;
    • 一致性:同样的快递,发给不同的买家,买家签收结果不一致;

    3、MQ产品对比

    对比什么的,上图就对了:

    Rocket现在热度慢慢高起来的,

    总结:

    rabbitMQ 我学了;它的一致性可靠性很高,有多种消息处理行为确保消息的准确到达和接收;在可靠性要求高的业务使用广泛;这也是为什么它的吞吐量比较低的原因;

    rocketMQ, 当下比较优秀的消息队列;适合处理互联网业务;

    kafka;吞吐量极高,也就是说比较适合大数据领域;

    RocketMQ实战

    1、下载RocketMQ 4.7.1版本

    RocketMQ运行版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-bin-release.zip

    RocketMQ源码版本下载地址: https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip

    这两个版本我们都下载下来。

    2、快速安装RocketMQ

    本地解压上传到服务器配置环境变量就行了;前提是linux服务器有java环境;

    vi ~/.bash_profile 加入下面的内容: 如果不配置的话,启动NameSever和Broker都会报错

    export JAVA_HOME=/app/jdk1.8/ #java的环境配置
    export ROCKETMQ_HOME=/app/rocketmq/rocketmq-all-4.7.1-bin-release # rocket
    PATH=$ROCKETMQ_HOME/bin:$JAVA_HOME/bin:$PATH:$HOME/.local/bin:$HOME/bin
    export PATH
    
    • 1
    • 2
    • 3
    • 4

    3、 快速运行RocketMQ

    想要运行mq,首先了解一下,rocketMQ的各个组件;

    与普通mq不同的是,rocketmq引入了一个Nameserver 做一个简单的注册中心;有点类似raabbit中的交换机,但是rocketmq没有交换机概念;

    • NameServer : 提供轻量级的Broker路由服务。
    • Broker:实际处理消息存储、转发等服务的核心组件。
    • Producer:消息生产者集群。通常是业务系统中的一个功能模块。
    • Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。

    所以首先要启动的是nameserver注册中心;

    在$ROCKETMQ_HOME/bin目录下有个mqnamesrv。直接执行这个脚本就可以启动RocketMQ的NameServer服务。

    执行可能会提示权限不足 ----> 进入bin目录:chmod 777 mqnamesrv

    运行可能报报错----->主要原因是mq申请内存大小为4G,而linux多半没那么多内存;需要修改一下配置文件;

    修改的方式是直接修改runserver.sh。 用vi runserver.sh编辑这个脚本,在脚本中找到这一行调整内存大小为512M

    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
    • 1

    然后使用命令 nohup bin/mqnamesrv &后台启动

    然后就是启动broker

    启动Broker的脚本是runbroker.sh。Broker的默认预设内存是8G,启动前,如果内存不够,同样需要调整下JVM内存。vi runbroker.sh找到这一行,进行内存调整

    JAVA_OPT="${JAVA_OPT} -server -Xms512m -Xmx512m -Xmn256m"
    
    • 1

    然后我们需要找到$ROCKETMQ_HOME/conf/broker.conf, vi指令进行编辑,在最下面加入一个配置:

    autoCreateTopicEnable=true #自动创建主题,==rabbit中的通过程序创建交换机和队列
    
    • 1

    然后也以静默启动的方式启动runbroker.sh

    nohup ./mqbroker &
    
    • 1

    启动完成后,同样是检查nohup.out日志,有这一条关键日志就标识启动成功了。 并且jps指令可以看到一个BrokerStartup进程。

    The broker[worker1, 192.168.232.128:10911] boot success. serializeType=JSON
    
    • 1

    在观察runserver.sh和runbroker.sh时,我们还可以查看到其他的JVM执行参数,这些参数都可以进行定制。例如我们观察到一个比较有意思的地方,nameServer使用的是CMS垃圾回收器,而Broker使用的是G1垃圾回收器。 关于垃圾回收器的知识又需要复习了

    RocketMQ 消息模型

    学过rabbit的都知道,rabbitmq有好几种消息模型,什么普通消息,延迟消息,死信队列等等;

    消息不能只有一种模型,所以,在rocket中,也有好几种模型,目的就是满足开发需求;

    顺序消息

    这也算是rocketmq中的一大特色了;

    消息发送者会采取Round Robin轮询方式把消息发送到不同的MessageQueue(分区队列),这样就能保证同一组的的消息在messagequeue中是有序的,发送有序的的话,如何保证消费是有序的呢,

    当有多个消费者的时候,消费者会从多个队列获取消息,默认不超过32条,这样的话就保证不了消息的有序性了,所以为了保证顺序性,rocketmq限制了消费者只能从同一个队列里面获取消息,这样就能完全保证消息的有序性了;

    但是!rocketmq只能保证消息的局部有序,不能保证消息的全局有序;

    具体这篇文章就讲得很好我觉得!

    rocketMQ顺序消息详解

    广播消息

    广播消息并没有特定的消息消费者样例,这是因为这涉及到消费者的集群消费模式。在集群状态

    (MessageModel.CLUSTERING)下,每一条消息只会被同一个消费者组中的一个实例消费到(这跟

    kafka和rabbitMQ的集群模式是一样的)。而广播模式则是把消息发给了所有订阅了对应主题的消

    费者,而不管消费者是不是同一个消费者组。

    延迟消息

    所谓延迟消息,无非就是消息不会立马被消费,而是等待特定的时间后才会消费;这个模式在rabbitmq中也有,但是rabbitmq中实现起来比较鸡肋,使用ttl加死信队列才能实现;

    然而rocket就不用这么麻烦,这也算是它的一个特色的功能;

    不过很可惜;rock开源版本中,你并不能定义具体的延迟时间,它提供了18个延迟级别来供用户选择,基本能满足日常需求,商业版本就可以随意定制时间;

    当然,rock也没有那么吝啬,这18个级别对应的时间也可自定义,但是一般没必要;

    其实说白了这18个级别也就是rock系统主题的18 个队列而已,没什么特别的;

    批量消息

    这个就更好理解了;无非就是将消息累积发送;减少网络IO的开销罢了;当然累积也得有个度,一般规定消息大小是1M,但实际上是4M;

    如果消息大于规定的容量怎么办呢;拆分消息呗那就!

    过滤消息

    roct发送消息有一个特点;那就是消息的组成是由主题topic、tag和消息题组成;

    所以我们就可以依靠tag来做消息的过滤,但是tag只有一个属性,无法满足复杂的业务过滤需求,为此;rock特别推出了sql表达式来进行过滤;

    事务消息

    这是rock中的一个重点,也是一个难点;

    首先:为啥需要事务消息?

    比方说:一个业务的组成需要两步,第一步本地执行;第二步提交到消息队列让下游执行;

    问题来了:如果这两步是需要满足原子性的;那应该如何保证?

    如果实在本地的话;就很简单了,直接加事务就行了,但是现在是分布式系统;就需要涉及到分布式事务的理念了,当然!可以引入分布式事务来满足这个需求,但是;这不就更麻烦了,如果消息对列能解决这个问题不就好了;这就是rockmq事务消息出现的目的;

    那么rock具体是怎么做的呢?看一张图就大致明白了;

    解释一下

    1. half消息并不是消息的一半,实际上就是全部消息;只不过half状态下的消息消费者是不可见的,也就不会消费这条消息;为什么不可见?是不是对消息队列做了手脚?其实也没那么复杂,他只不过把消息放到了内部的RMQ_SYS_TRANS_HALF_TOPIC 这个Topic罢了,所以消费者看不见,当收到提交或回滚通知后就会将消息转存到指定的topic或者直接丢弃罢了;充分体现了:“没有什么问题是加一层解决不了的”;
    2. 根据本地事务执行情况通知消息队列回滚还是提交到消费者;
    3. broker定时回查本地事务执行情况,
    • 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。

    • 返回ROLLBACK_MESSAGE状态的消息会被丢弃。

    • 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。

    为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,超过这个次数的话消息就会丢弃掉!防止占用队列!

    仔细看就会发现,所谓的事务消息只是保证了分布式事务的一半:因为消息在消费者端的消费情况我们也会有所出入,我们只是保证了消息与本地事务的一致性;消费者端的消费出现异常还需要进行重试或者其他补偿措施来保证最终一致性!

    事务消息详细

  • 相关阅读:
    【设计模式】 - 创建者模式 - 原型模式
    C++ —— 命名空间
    mysql实战45讲【2】一条sql更新语句是如何执行的
    外贸客户开发信怎么写?如何撰写营销邮件?
    C++学习 --deque
    Redis入门完整教程:复制配置
    基于STC89C52单片机空气PM2.5系统设计资料
    学习率设置太大或者太小会有哪些影响?
    3D格式转换工具HOOPS Exchange最全技术指南(三):4大功能特征与典型使用场景
    Mybatis中使用${}和使用#{}
  • 原文地址:https://blog.csdn.net/m0_52255061/article/details/125567099