大家都知道阿里巴巴每年的天猫双十一,那一晚的流量是十分巨大的,那么我们有没有想过,阿里是如何承载住着么巨大流量冲击,还能够稳定的运行的么?
仰仗着阿里开发出的RocketMQ这款优秀的消息中间件
【官方网址】:https://rocketmq.apache.org/
MQ:Message Queue,是一种
提供消息队列服务
的中间件
,是一套提供了消息生产,存储,消息全过程API的软件系统。
这里所指的
消息就是数据
,一般消息的体量不会太大
我们可以将消息中间件的功能大致分为3点
MQ可以将系统的超量流量暂时存在MQ中,一百年系统后期可以慢慢的进行处理,从而避免了请求的丢失或者系统被大量请求瞬间冲垮。
上游系统对下游系统的调用若为同步调用,则会大大降低对系统的而吞吐量与并发性,且系耦合度较高,而异步调用则会解决这些问题,所以两层之间若要实现有同步到异步的转化,一般性的做法就是,在这两层之间添加一个MQ层。
分布式系统会在海量的数据流,如:业务日志,监控数据用户行为,针对这些数据流进行实时或批量采集汇总,然后对这些数据进行大量的数据分析,这是当前当前互联网平台的必备技术,通过MQ完成此类数据收集是最好的选择。
ActiveMQ是一款使用Java开发的MQ产品,但是现在的生产环境中,已经很少有人用了,可以说被淘汰了。
RabbitMQ是一款使用Erlang开发的,吞吐量比Kafka和RocketMQ较低
是一款使用Scale/Java开发的,最大的特点是高吞吐率,常用于大数据的实时计算,日志采集等场景。其没有遵循任何的MQ协议,使用自研开发。他的数据是存储在磁盘上的。在SpringCloud一代的时候,内部支持的消息中间件仅支持Kafka和RabbitMQ
RocketMQ时使用Java语言开发的,是历经数年的阿里双十一的考验,没有遵循任何的MQ协议,使用自研的协议。对于SpringCloud Alibaba 其支持Kafka RabbitMQ,
但推荐使用RocketMQ
消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
Topic表示一类消息的集合,每个主题都包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本消息。
一个生产者可以同时发送多种Topic的消息,而一个消费者只能对某种特定的Topic感兴趣,即只可以订阅和消费一种Topic的消息。
存储消息的物理实体,一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息,一个Topic的Queue也被称为一个Topic中的消息的分区(Partition)
RocketMQ中每个消息拥有唯一的MessageId,且可以携带具有业务标识的Key,以方便对消息的查询。不过需要注意的是,MessageId有两个:生产者send()消息的时候会自动生成一个MessageId(msgId)当消息到达Borker后,Broker会自动生成一个MessageId(MsgId),MsgId,offsetId与Key都称为消息的标识。
producerId+进程Pid+MessageClientIDSetter类的classLoader的hashCode+当前的时间,+AutomicInteger自增计数器值
brokerIp+物理分区的offset
RocketMQ架构上主要分为四个部分:
消息生产者,负责生产消息,Producer通过MQ的负载均衡模块选择相应的Borker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
RocketMQ中的消息的生产者都是以生产者组(Producer Group)形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同类型的Topic的消息。
消息消费者,负责消费消息,一个消息消费者会从Broker服务器中获取到消息,并对消息业务进行处理。
RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的,消费者组是同一类消费者的集合,这类Consumer消费者是同一类Topic的消息。消费者组使得在消息消费方面,实现负载均衡和容错降级的目标变得非常容易。
具体流程
1)启动NameServer, NameServer启动后开始监听端口,等待Broker、Producer、Consumer连接。
2)启动Broker时,Broker会 与所有的NameServer建立并保持长连接,然后每30秒向NameServer定时发送心跳包。
3)发送消息前,可以先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,当然,在创建Topic时也会将Topic与Broker的关系写入NameServer中,不过,这步是可选的,也可以在发送消息时自动创建Topic,
4) Producer发送消息,启动时先跟NameServer集群中的其中- 台建立长连接,并从NameServer中获取路由信息,即当前发送的Topic消息的Queue与Broker的地址 (IP+Port) 的映射关系。然后根据算法策略从队选择一个Queue, 与队列所在的Broker建立长连接从而向Broker发消息。当然,在获取到路由信息
后,Producer会首先将路由信息缓存到本地,再每30秒从NameServer更新一 -次路由信息。
5) Consumer跟Producer类似, 跟其中-台NameServer建立长连接, 获取其所订阅Topic的路由信息,然后根据算法策略从路由信息中获取到其所要消费的Queue,然后直接跟Broker建立长连接,开始消费其中
的消息。Consumer在获取到路由信息后,同样也会每30秒从NameServer更新一次路由信息。不过不同于Producer的是,Consumer还会向Broker发送心跳, 以确保Broker的存活状态。
系统要求的是64位,JDK要求是1.8以上版本
使用xftp将下载好的RocketMQ的压缩文件,上传至Linux上(opt/tools)
这里因为是zip的压缩文件,所以我们使用unzip命令来完成
unzip 压缩文件名 [-d 压缩的路径]
解压完成后的目录结构如下所示:
【修改runserver.sh】
使用vim命令打开bin/runserver.sh文件,现将这些值修改为如下:
修改/bin目录下的runbroker.sh文件的启动配置
参照官网给出的教程来操做
【1️⃣第一步:启动nameService】
nohup sh bin/mqnamesrv &
是否启动成功,我们可以使用如下命令进行查看日志
tail -f ~/logs/rocketmqlogs/namesrv.log
【2️⃣启动borker】
nohup sh bin/mqbroker -n localhost:9876 &
查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
RocketMQ的端口号为:9376
进行测试RocketMQ发送消息
> export NAMESRV_ADDR=localhost:9876
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
声明
,使用官方给出的启动borker的方式,只能在本地访问,所以我还需要做出如下的配置:
修改/conf/broker.conf文件,具体的修改如下图所示:
这里将添加两个属性namesrvAddr=本机公开地址:端口号;brokerIpl=本机公开地址
然后保存退出后,我们还需要开放端口号6379
紧接着使用指定的配置文件启动broker,具体命令如下:
nohup sh bin/mqbroker -n xxxx:9876 autoCreateTopicEnable=true -c /opt/apps/rocketmq-all-4.9.0-bin-release/conf/broker.conf & // 这里是自己的broker.conf地址
xxxx:是自己修改的本机的开放地址;
-c 参数后面跟的是修改的broker.conf的文件地址
shutdown
关闭的时候,先关闭broker,再关闭nameServer
> sh bin/mqshutdown broker
The mqbroker(36695) is running...
Send shutdown request to mqbroker(36695) OK
> sh bin/mqshutdown namesrv
The mqnamesrv(36664) is running...
Send shutdown request to mqnamesrv(36664) OK
RocketMQ有一个可视化的dashboard,通过该控制台可以直观的查看到很多的信息。
下载地址:https://mirrors.aliyun.com/apache/rocketmq/rocketmq-dashboard/1.0.0/
这里使用的是阿里的镜像源进行下载
修改其src/main/resources中的application.properties配置文件。
原来的端口号为 8080 ,修改为一个不常用的
指定RocketMQ的name server地址
在解压目录rocketmq-console的pom.xml中添加如下JAXB依赖。
JAXB,Java Architechture for Xml Binding,用于XML绑定的Java技术,是一个业界标准,是一项可以根据XML Schema生产Java类的技术。
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>javax.activation</groupId>
<artifactId>activation</artifactId>
<version>1.1.1</version>
</dependency>
至此,我们的单机版RocketMQ环境就算是搭建完毕了