• RabiitMQ消息队列系统


    一、MQ

    1、概念:

    MQ 全称为 Message Queue (消息队列),是一种应用程序对应用程序的通信方法。MQ 允许应用程序将消息写入队列,其他应用程序从队列中读取并处理这些消息,不需要它们之间直接相互联系。消息队列可用于实现异步通信、解耦应用程序、提高系统性能,适用于处理一些不需要即时响应的任务。

    ● 产品:Kafka ;RabbitMQ ;RocketMQ

    2、特性:

    ① 解耦性:通过消息传递来实现应用程序之间的通信,应用程序之间不需要直接相互联系;

    ② 冗余性:通过负载均衡和数据同步来存储请求信息,防止数据丢失;

    ③ 扩展性:适应不断增长的负载和需求,以应对更多的应用程序和用户;

    ④ 削峰填谷:在高负载时期,消息队列接收并存储大量的消息,使系统不会被突然的大量请求压垮,这些消息在队列中等待处理 ;在低负载时期,系统逐渐处理在高负载时期积累的消息。

    ⑤ 顺序保证:在消息队列中,消息的处理和传递遵循特定的顺序,确保消息按照它们被发送到队列的顺序进行处理;

    3、模式:

    (1) P2P 模式:

    ① 基本原理: P2P模式包含三个角色:消息队列 (Queue)、发送者 (Sender)、接收者 (Receiver)

    在点对点模式中,消息被发送者发送到队列,然后由一个特定的接收者(消费者)从队列中接收和处理消息。

    ② 特点:

    P2P 模式每条消息只能被一个接收者消费(一对一);

    接收者在成功接收消息之后,需向队列应答成功;

    P2P 模式每个消息都会被成功处理。

    (2) Pub/Sub 模式:

    ① 基本原理:Pub/Sub 模式包含三个角色:主题 (Topic)、发布者 (Publisher)、订阅者 (Subscriber)

    在 Pub/Sub 模式中,消息被发送到一个主题(topic),多个消费者可以订阅这个主题以接收消息。

    ② 特点:

    Pub/Sub 模式每条消息可以被多个订阅者同时接收(一对多);

    Pub/Sub 模式适用于需要将消息广播给多个接收者的场景。

    二、RabbitMQ 介绍

    1、概念:

    RabbitMQ是一个在 AMQP (Advanced Message Queuing Protocol) 基础上实现的可复用的企业消息系统,支持多种协议和客户端,支持高并发和可扩展。

    ● AMQP 协议:是一个提供统一消息服务的应用层标准高级消息队列协议,基于此协议的客户端与消息中间件可传递消息,不受客户端/中间件不同产品,不同的开发语言等条件的限制。

    2、RabbitMQ 架构:

    消息生产者、消息队列内部组件、队列程序、消息消费者

    3、RabbitMQ 通信过程:

    ① Message:消息,由消息头和消息体组成。消息头是消息的元数据,通常包含与消息有关的一些属性和标识 ;消息体是消息中包含的实际数据,这是消息的主要内容。

    ② Publisher:消息生产者,向交换机发送消息的客户端程序。

    ③ Exchange:交换机 (消息队列的组件),用于接收生产者发送的消息,再将这些消息路由发送给服务器中的队列。

    ④ Queue:消息队列,是消息的容器,用来保存消息,将消息发送给消费者。

    ⑤ Bingding:绑定,将交换器和消息队列连接起来的路由规则,用于消息队列和交换器之间的关联。

    ⑥ Virtual Host:虚拟主机,是一种逻辑隔离机制,它允许将消息队列和相关资源隔离到不同的逻辑单元中。每个虚拟主机都是一个相互独立的消息队列环境,拥有自己的队列、交换机、绑定、用户权限等。

    ⑦ Broker:rabbitmq 的服务器实体。

    ⑧ Connection:连接,rabbitmq 与应用程序建立的TCP的连接。

    ⑨ Channel:信道,是TCP里的虚拟连接,一条TCP连接,可以包含很多条的Channel。

    ⑩ Consumer:从消息队列中获取消息的应用程序。

    4、消息:

    (1) 消息的传输和分发:

    rabbitmq 采用信道的方式传输消息 ;消费者通过订阅队列来接收消息,若多个消费者订阅同一个队列,rabbitmq 则使用轮询(round-robin)来决定哪个消费者将接收到队列中的下一条消息。

    (2) 消息路由规则:

    rabbitmq 使用交换机(Exchange)和绑定(Binding)来进行消息路由。消息的路由过程包括以下步骤:

    ① 生产者发布消息: 消息首先由生产者发布到一个特定的交换机,并且指定一个消息的路由键(Routing Key);

    ② 交换机根据绑定进行路由:绑定是交换机和队列之间的规则, 交换机会根据消息的路由键以及与它绑定的队列来决定将消息路由到哪些队列;

    ③ 消息发送到队列: 一旦消息被交换机路由到一个或多个队列,它就会被存储在这些队列中,等待消费者来处理。

    (3) 常见的交换机类型和它们的路由规则:

    ① 直连交换机(Direct Exchange): 这种交换机使用精确的路由键匹配,只将消息发送到与消息的路由键完全匹配的队列。

    ② 主题交换机(Topic Exchange): 主题交换机使用通配符形式的路由键匹配,允许使用通配符符号(*和#)来匹配多个路由键。

    ③ 扇出交换机(Fanout Exchange): 扇出交换机将消息广播到与它绑定的所有队列,忽略路由键。

    (4) 如何保证消息正确地发送至 rabbitmq:

    将信道设置成confirm模式(发送方确认模式),发送方在信道上发布的消息都会被分配一个唯一的消息 ID(delivery tag)。一旦消息成功投递到目标队列或者消息被写入磁盘(持久化的消息),信道会向发送方发送一个包含消息ID确认消息。

    如果 rabbitmq 发生内部错误或其他原因导致消息丢失,它会发送一条 "nack"(not acknowledged,未确认)消息,以通知发送方发生了问题。

    (5) 如何确认接收方消费了消息:

    rabbitmq 可以确保消息在成功被消费者接收和处理后才会被标记为已接收,并从队列中删除。

    (6) 如何避免消息重复传递或重复消费:

    ● 内部消息标识符(inner-msg-id):每个消息生产者发送的消息都附带一个内部消息标识符,用于在消息传递过程中唯一标识每条消息,消息中间件可以使用这个标识符来确保消息不会被重复传递。

    ● 业务标识符(bizId):在消费者端,每个消息体都必须包含一个业务标识符(bizId),消息消费者可以使用业务标识符来检测并拒绝重复消息。

    (7) 如何确保消息不丢失:

    将 rabbitmq 消息设置为持久化,使 rabbitmq 服务器重启时消息不会丢失。

    ● 把队列的 durable 属性设置为true,表示设置的是持久队列;

    ● 在消息发布前,把消息的 “投递模式” 选项设置为2(持久),将消息标记成持久化。

    三、RabbitMQ 单机部署:

    1、环境:

    设置主机名和域名解析

    192.168.198.131(rabbitmq1)

    192.168.198.132(rabbitmq2)

    192.168.198.133(rabbitmq3)

    2、部署 Erlang:

    Rabbitmq 是基于 Erlang 构建的,部署 Rabbitmq 系统环境需要提供 Erlang 环境

    Erlang下载地址:rabbitmq/erlang - Packages · packagecloud

    wget --content-disposition "https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.7-1.el7.x86_64.rpm/download.rpm?distro_version_id=140"

    rpm -Uvh erlang-23.3.4.7-1.el7.x86_64.rpm

    3、部署 rabbitmq:

    RabiitMQ 安装需要先安装 socat 插件:yum install -y socat

    rabbitmq 下载地址:rabbitmq/rabbitmq-server - Packages · packagecloud

    wget --content-disposition "https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.10.0-1.el7.noarch.rpm/download.rpm?distro_version_id=140"

    rpm -Uvh rabbitmq-server-3.10.0-1.el7.noarch.rpm

    4、rabbitmq web 界面:

    rabbitmq-plugins enable rabbitmq_management

    端口为 15672

    ① 添加用户:rabbitmqctl add_user 用户名 密码

    ② 分配角色:rabbitmqctl set_user_tags 用户名 角色

    ● 超级管理员 administrator:可登陆管理控制台,可查看所有的信息,并且可以对用户,策略进行操作。

    ● 监控者 monitoring:可登陆管理控制台,同时可以查看rabbitmq节点的相关信息 (进程数,内存使用情况,磁盘使用情况等)

    ● 策略制定者 policymaker:可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息。

    ● 普通管理者 management:仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理。无法登陆管理控制台,通常就是普通的生产者和消费者。

    ③ 授予权限:rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*" (后面三个”*”代表用户拥有配置、写、读全部权限)

    四、RabbitMQ 集群部署

    1、简介:

    rabbitmq 集群部署是将多个 rabbitmq 节点组合在一起,通过增加更多的节点来提高系统的可用性和可靠性,形成一个能够提供更强大功能的整体。

    2、原理:

    (1) cookie:

    ● rabbitmq 底层是通过 Erlang 架构来实现的,所以 rabbitmqctl 会启动 Erlang 节点,并基于 Erlang节点来 rabbitmq 节点。cookie 是一个密钥令牌,用于 rabbitmq 集群中的节点认证。Erlang的集群中各节点通过这个 cookie 来实现相互认证,从而保证消息的可靠传输。

    ● 在部署 rabbitmq 集群时,需要保证各节点的 cookie 文件使用的是同一个值,所以部署 rabbitmq 分布式集群时要先安装 Erlang,并把其中一个服务的cookie 复制到另外的节点 ;而且文件的权限是400,以保证节点之间能够正确通信。

    (2) 内存节点和磁盘节点:

    ① 内存节点:将数据存储在内存。内存节点主要负责提供高性能的消息传递,加快消息的传递速度;

    ② 磁盘节点:将数据存储在磁盘上。磁盘节点主要负责确保数据的可靠性,即使在节点故障后也能恢复数据。

    (3) 普通模式和镜像模式:

    ① 普通模式:对于两个节点 rabbit01、rabbit02 来说,消息实体只存在于其中一个节点 (例如 rabbit01),当 consumer 从 rabbit02 节点消费时,RabbitMQ 会临时在 rabbit01、rabbit02 间进行消息传输,把1中的消息实体取出并经过2发送给 consumer。

    ② 镜像模式:消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在 consumer 消费数据时临时读取,但集群内部的同步通讯会占用大量的网络带宽。

    3、集群配置:

    (1) 设置 erlang 运行节点:

    将 rabbitmq1 的 .erlang.cookie 文件拷贝到 2、3,使三台 rabbitmq 的 cookie 内容保持一致,文件权限为400 (只读)。

    scp -r /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq/.erlang.cookie

    scp -r /var/lib/rabbitmq/.erlang.cookie rabbitmq3:/var/lib/rabbitmq/.erlang.cookie

    重启服务:systemctl restart rabbitmq-server

    (2) rabbitmq2、3关闭节点:

    关闭 rabbitmq 运行节点:rabbitmqctl stop

    设置为独立节点:rabbitmq-server -detached

    查看节点状态:rabbitmqctl cluster_status

    (3) rabbitmq2、3添加集群:

    停止应用:rabbitmqctl stop_app

    添加集群:rabbitmqctl join_cluster rabbit@rabbitmq1

    开启应用:rabbitmqctl start_app

    查看集群状态:rabbitmqctl cluster_status

  • 相关阅读:
    WebDAV之葫芦儿·派盘+元思笔记
    Java8 lamda函数式编程,常用的Consumer/Function/Operator/Supplier/Predicate
    PAT 1026 Table Tennis
    QT学习笔记1-Hello, QT
    SQL题目记录
    OJ练习第167题——单词接龙
    【AGC】报错ArrayIndexOutOfBoundsException
    JavaScript WebSocket NFC读写器示例源码
    管道流:字节管道流、字符管道流
    数据结构-堆
  • 原文地址:https://blog.csdn.net/weixin_61428407/article/details/134262395