• 高流量下的后起之秀-【RocketMQ消息中间件】


    前言

    大家都知道阿里巴巴每年的天猫双十一,那一晚的流量是十分巨大的,那么我们有没有想过,阿里是如何承载住着么巨大流量冲击,还能够稳定的运行的么?

    仰仗着阿里开发出的RocketMQ这款优秀的消息中间件

    【官方网址】:https://rocketmq.apache.org/

    在这里插入图片描述



    一、RocketMQ概述

    1.1 简介

    MQ:Message Queue,是一种提供消息队列服务的中间件,是一套提供了消息生产,存储,消息全过程API的软件系统。

    这里所指的消息就是数据,一般消息的体量不会太大


    1.2 MQ的作用

    我们可以将消息中间件的功能大致分为3点

    1.2.1 限流削峰

    MQ可以将系统的超量流量暂时存在MQ中,一百年系统后期可以慢慢的进行处理,从而避免了请求的丢失或者系统被大量请求瞬间冲垮。
    在这里插入图片描述

    1.2.2 异步解耦

    上游系统对下游系统的调用若为同步调用,则会大大降低对系统的而吞吐量与并发性,且系耦合度较高,而异步调用则会解决这些问题,所以两层之间若要实现有同步到异步的转化,一般性的做法就是,在这两层之间添加一个MQ层。
    在这里插入图片描述

    1.2.3 数据收集

    分布式系统会在海量的数据流,如:业务日志,监控数据用户行为,针对这些数据流进行实时或批量采集汇总,然后对这些数据进行大量的数据分析,这是当前当前互联网平台的必备技术,通过MQ完成此类数据收集是最好的选择。

    1.3 常见的MQ产品

    1.3.1 ActiveMQ

    ActiveMQ是一款使用Java开发的MQ产品,但是现在的生产环境中,已经很少有人用了,可以说被淘汰了。

    1.3.2 RabbitMQ

    RabbitMQ是一款使用Erlang开发的,吞吐量比Kafka和RocketMQ较低

    1.3.3 Kafka

    是一款使用Scale/Java开发的,最大的特点是高吞吐率,常用于大数据的实时计算,日志采集等场景。其没有遵循任何的MQ协议,使用自研开发。他的数据是存储在磁盘上的。在SpringCloud一代的时候,内部支持的消息中间件仅支持Kafka和RabbitMQ

    1.3.4 RocketMQ

    RocketMQ时使用Java语言开发的,是历经数年的阿里双十一的考验,没有遵循任何的MQ协议,使用自研的协议。对于SpringCloud Alibaba 其支持Kafka RabbitMQ,但推荐使用RocketMQ

    二、RocketMQ的安装与启动

    2.1 基本概念

    2.1.1 消息(Message)

    消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

    2.1.2 (Topic)

    Topic表示一类消息的集合,每个主题都包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本消息。

    一个生产者可以同时发送多种Topic的消息,而一个消费者只能对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。

    2.1.3 队列(Queue)

    存储消息的物理实体,一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息,一个Topic的Queue也被称为一个Topic中的消息的分区(Partition)
    在这里插入图片描述

    2.1.4 消息标识(MessageId / Key)

    RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:生产者send()消息的时候会自动生成一个MessageId(msgId)当消息到达Borker后,Broker会自动生成一个MessageId(MsgId),MsgId,offsetId与Key都称为消息的标识。

    • MsgId:由Producer端生成,其生成规则为:
      producerId+进程Pid+MessageClientIDSetter类的classLoader的hashCode+当前的时间,+AutomicInteger自增计数器值
    • offsetMsgId:由broker端生成,七生成规则为:brokerIp+物理分区的offset
    • key:由用户指定的业务相关的唯一标识

    2.2 系统架构

    在这里插入图片描述
    RocketMQ架构上主要分为四个部分:

    2.2.1 Productor

    消息生产者,负责生产消息,Producer通过MQ的负载均衡模块选择相应的Borker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

    RocketMQ中的消息的生产者都是以生产者组(Producer Group)形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同类型的Topic的消息。

    2.2.2 Consumer

    消息消费者,负责消费消息,一个消息消费者会从Broker服务器中获取到消息,并对消息业务进行处理。

    RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的,消费者组是同一类消费者的集合,这类Consumer消费者是同一类Topic的消息。消费者组使得在消息消费方面,实现负载均衡和容错降级的目标变得非常容易。

    2.3 工作流程

    具体流程
    1)启动NameServer, NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。
    2)启动Broker时,Broker会 与所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包。
    3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入NameServer中,不过,这步是可选的,也可以在发送消息时自动创建Topic,
    4) Producer发送消息,启动时先跟NameServer集群中的其中- 台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址 (IP+Port) 的映射关系。然后根据算法策略从队选择一个Queue, 与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息
    后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一 -次路由信息。
    5) Consumer跟Producer类似, 跟其中-台NameServer建立长连接, 获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中
    的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳, 以确保Broker的存活状态。

    2.3 单机安装与启动

    2.3.1 准备工作

    系统要求的是64位,JDK要求是1.8以上版本

    在这里插入图片描述
    使用xftp将下载好的RocketMQ的压缩文件,上传至Linux上(opt/tools)
    在这里插入图片描述
    这里因为是zip的压缩文件,所以我们使用unzip命令来完成

    unzip 压缩文件名 [-d 压缩的路径]
    
    • 1

    在这里插入图片描述
    解压完成后的目录结构如下所示:

    在这里插入图片描述

    2.3.2 修改初始内存

    【修改runserver.sh】
    使用vim命令打开bin/runserver.sh文件,现将这些值修改为如下:

    在这里插入图片描述
    修改/bin目录下的runbroker.sh文件的启动配置
    在这里插入图片描述

    2.3.3 启动命令

    参照官网给出的教程来操做

    【1️⃣第一步:启动nameService】

    nohup sh bin/mqnamesrv &
    
    • 1

    在这里插入图片描述
    是否启动成功,我们可以使用如下命令进行查看日志

    tail -f ~/logs/rocketmqlogs/namesrv.log
    
    • 1

    在这里插入图片描述
    【2️⃣启动borker】

    nohup sh bin/mqbroker -n localhost:9876 &
    
    • 1

    查看启动日志

    tail -f ~/logs/rocketmqlogs/broker.log 
    
    • 1

    在这里插入图片描述

    RocketMQ的端口号为:9376

    进行测试RocketMQ发送消息

    > export NAMESRV_ADDR=localhost:9876
    > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    
    > sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    
    • 1
    • 2
    • 3
    • 4

    声明,使用官方给出的启动borker的方式,只能在本地访问,所以我还需要做出如下的配置:

    2.3.4 发放远端访问权限

    修改/conf/broker.conf文件,具体的修改如下图所示:

    在这里插入图片描述

    这里将添加两个属性namesrvAddr=本机公开地址:端口号;brokerIpl=本机公开地址

    然后保存退出后,我们还需要开放端口号6379

    紧接着使用指定的配置文件启动broker,具体命令如下:

    nohup sh bin/mqbroker -n xxxx:9876 autoCreateTopicEnable=true -c /opt/apps/rocketmq-all-4.9.0-bin-release/conf/broker.conf & // 这里是自己的broker.conf地址
    
    • 1

    xxxx:是自己修改的本机的开放地址;
    -c 参数后面跟的是修改的broker.conf的文件地址

    2.3.4 关闭命令

    shutdown
    关闭的时候,先关闭broker,再关闭nameServer

    > sh bin/mqshutdown broker
    The mqbroker(36695) is running...
    Send shutdown request to mqbroker(36695) OK
    
    > sh bin/mqshutdown namesrv
    The mqnamesrv(36664) is running...
    Send shutdown request to mqnamesrv(36664) OK
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    2.4 控制台的安装与启动

    RocketMQ有一个可视化的dashboard,通过该控制台可以直观的查看到很多的信息。

    2.4.1 下载

    下载地址:https://mirrors.aliyun.com/apache/rocketmq/rocketmq-dashboard/1.0.0/

    这里使用的是阿里的镜像源进行下载

    2.4.2 修改配置

    修改其src/main/resources中的application.properties配置文件。

    原来的端口号为 8080 ,修改为一个不常用的
    指定RocketMQ的name server地址
    在这里插入图片描述

    2.4.3 添加依赖

    在解压目录rocketmq-console的pom.xml中添加如下JAXB依赖。

    JAXB,Java Architechture for Xml Binding,用于XML绑定的Java技术,是一个业界标准,是一项可以根据XML Schema生产Java类的技术。

    <dependency>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>com.sun.xml.bind</groupId>
        <artifactId>jaxb-impl</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>com.sun.xml.bind</groupId>
        <artifactId>jaxb-core</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>javax.activation</groupId>
        <artifactId>activation</artifactId>
        <version>1.1.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在这里插入图片描述

    至此,我们的单机版RocketMQ环境就算是搭建完毕了

  • 相关阅读:
    如何降低复杂度,用数据库做消息队列的存储?
    java毕业生设计中小型连锁超市配送中心配送管理计算机源码+系统+mysql+调试部署+lw
    mssql拿shell
    [N1CTF 2018]eating_cms
    c++ 指针
    前端开发核心知识进阶 —— 宏任务和微任务
    吴恩达深度学习笔记:深度学习的 实践层面 (Practical aspects of Deep Learning)1.4-1.5
    002.Breakfast or lunch?
    虹科分享 | MKA:基于先进车载网络安全解决方案的密钥协议
    springboot2.X整合mybatis使用joda时间格式变量完成插入操作
  • 原文地址:https://blog.csdn.net/mmklo/article/details/125592961