• AMQP协议详解


    文章目录


    AMQP协议介绍

    AMQP(Advanced Message Queuing Protocol)高级消息队列协议,一个提供统一消息服务的应用层标准协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个进程间传递异步消息的网络协议。

    基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP在消息提供者和客户端的行为进行了强制规定,使得不同卖商之间真正实现了互操作能力。

    AMQP与JMS

    JMS是早期消息中间件进行标准化的一个尝试,它仅仅是在API级进行了规范,离创建互操作能力还差很远。

    与JMS不同,AMQP是一个Wire级的协议,它描述了在网络上传输的数据的格式,以字节为流。因此任何遵守此数据格式的工具,其创建和解释消息,都能与其他兼容工具进行互操作。

    AMQP核心组成

    在这里插入图片描述

    Producer(生产者)

    生产消息。

    ConnectionFactory(连接工厂)

    生产Connection的的工厂。

    Connection(连接)

    连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。

    AMQP连接通常是长连接。AMQP是一个使用 TCP提供可靠投递的应用层协议。AMQP 使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。

    Channel(信道)

    网络信道,是建立在Connection连接之上的一种轻量级的连接。几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。

    如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤。一个Connection上可以创建任意数量的Channel。

    我们大部分的业务操作是在Channel这个接口中完成的,包括:

    • 队列的声明queueDeclare
    • 交换机的声明exchangeDeclare
    • 队列的绑定queueBind
    • 发布消息basicPublish
    • 消费消息basicConsume等。

    Broker(中间件)

    接受客户端的连接,实现AMQP实体服务,如rabbitmq。

    VirtualHost(虚拟主机)

    虚拟主机,用于进行逻辑隔离,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。

    为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。

    Exchange(交换机)

    交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。

    交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。

    交换机类型

    • Direct exchange(直连交换机)
    • Fanout exchange(扇形交换机)
    • Topic exchange(主题交换机)
    • Headers exchange(头交换机)

    交换机属性

    • Name:交换机名称
    • Durability:持久化标志,表明此交换机是否是持久化的
    • Auto-delete:删除标志,表明当所有队列在完成使用此exchange时,是否删除
    • Arguments:依赖代理本身

    交换机状态

    • 持久(durable)
    • 暂存(transient)

    持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。

    默认交换机

    默认交换机(default exchange)实际上是一个由消息代理预先声明好的,没有名字(名字为空字符串)的直连交换机(direct exchange)。

    可以认为默认交换机就是一个特殊的直连交换机。
    默认交换机名称:空字符串(AMQP default)
    默认交换机类型:直连交换机

    在创建一个Queue时,只要没指定需要绑定的交换机,都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

    直连交换机

    直连型交换机是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由。

    在创建一个Queue时,如果绑定的是直连交换机,可以不必指定routing key的名字,因为他会有一个默认的routing key名称,名称同Queue名称。

    直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)。

    工作流程:

    1. 将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;
    2. 当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。

    在这里插入图片描述

    扇型交换机

    扇型交换机将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。

    如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型交换机一般用来处理消息的广播路由(broadcast routing)。

    在这里插入图片描述

    应用场景:

    • 广播消息;
    • 群聊功能。

    主题交换机

    主题交换机是根据routing key和Exchange的类型将message发送到一个或者多个Queue中,我们经常拿他来实现各种publish/subscribe,即发布订阅。

    直连交换机的路由规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue。
    主题交换机的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。

    它约定

    • binding key中可以存在两种特殊字符 * 与#,用于做模糊匹配。其中*用于匹配一个单词,#用于匹配多个单词(可以是零个)
    • routing key为一个点号分隔的字符串(我们将被点号分隔开的每一段独立的字符串称为一个单词)

    在这里插入图片描述

    • 当生产者发送消息Routing Key=A.A.A的时候,这时候只满足A.*.*,只会被路由到queue1;
    • 当生产者发送消息Routing Key=A.B.A的时候,这时候满足A.*.**.B.*会被路由到queue1、queue2;
    • 当生产者发送消息Routing Key=A.B.C的时候,这时候满足A.*.**.B.**.*.C会被路由到queue1、queue2、queue3。

    应用场景

    • 涉及到分类或者标签的新闻更新;
    • 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务。

    头交换机

    头交换机不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

    头交换机可以视为直连交换机的另一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少用到头交换机)。

    工作流程:

    1. 绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header);
    2. 传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。

    交换机小结

    交换机类型

    交换机名称

    Binding Key

    Routing Key

    路由规则

    默认交换机

    空串,不可修改

    默认为Queue名称,不可改

    同Binding Key名称

    Routing Key==Binding Key,严格匹配

    直连交换机

    自定义

    默认为Queue名称,可修改

    同Binding Key名称

    Routing Key==Binding Key,严格匹配

    扇形交换机

    自定义

    无Binding Key

    无Routing Key

    无Binding Key,自动路由到交换机绑定的所有Queue中

    主题交换机

    自定义

    自定义

    自定义

    Routing Key==Binding Key,模糊匹配

    头交换机

    自定义

    自定义

    自定义

    根据发送的消息内容中的headers属性进行匹配

    Binding(绑定)

    Exchange和Queue之间的虚拟连接。

    BindingKey是Exchange和Queue绑定的规则描述。Binding Key指定当前Exchange下,什么样的Routing Key会被下派到当前绑定的Queue中。

    Routing Key(路由键)

    路由规则,虚拟机可以用它来确定如何路由一个特定消息。

    Binding Key与Routing Key的关系

    • Binding Key是队列和交换机之间的绑定key;
    • Routing Key是生产者发给交换机的一个信息;
    • 当Binding Key和Routing Key能对应上时,将该消息放到相应的队列中。

    Binding Key是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有Routing Key这个字段,Exchange就是根据这个Routing Key和当前Exchange所有绑定的Binding Key做匹配,如果满足要求,就往Binding Key所绑定的Queue发送消息。

    各种交换机中的Binding Key与Routing Key

    • 默认交换机:Binding Key为Queue名称,不可自定义;Routing Key也为Queue名称时才能成功路由到队列
    • 直连交换机:Binding Key为Queue名称,可以自定义;Routing Key于Binding Key相同时才能成功路由到队列
    • 扇形交换机:无Binding Key;当然也无Routing Key。自动路由到交换机绑定的所有Queue中
    • 主题交换机:自定义Binding Key;自定义Routing Key。Routing Key==Binding Key,模糊匹配成功才能成功路由到队列
    • 头交换机:无Binding Key;当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配

    Queue(队列)

    存储着即将被应用消费掉的消息。

    队列属性:

    • Name:队列名称
    • Durable:消息代理重启后,队列依旧存在
    • Exclusive:只被一个连接(connection)使用,而且当连接关闭后队列即被删除
    • Auto-delete:当最后一个消费者退订后即被删除
    • Arguments:一些消息代理用他来完成类似与 TTL 的某些额外功能

    队列创建:
    队列在声明后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。

    队列持久化:
    持久化队列会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。

    持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。

    Consumer(消费者)

    消费者消费消息。在AMQP中,消费者获取待消费消息的途径有两种:

    • 消息中间件将消息投递给消费者(push API)
    • 消费者主动获取消息 (pull API)

    需要注意:多个消费者监听同一个队列时,队列中的消息只会被其中一个消费者消费(并不会每个消费者都消费一次

    Message(消息)

    消息,服务与应用程序之间传送的数据,由Properties(属性)和body(主体)组成。

    属性是对消息进行修饰,比如消息的优先级,延迟等高级特性,主体则就是消息体的内容。

    消息属性:

    • Content type(内容类型)
    • Content encoding(内容编码)
    • Routing key(路由键)
    • Delivery mode (persistent or not)
    • 投递模式(持久化 或 非持久化)
    • Message priority(消息优先权)
    • Message publishing timestamp(消息发布的时间戳)
    • Expiration period(消息有效期)
    • Publisher application id(发布应用的 ID)

    消息主体:
    AMQP 的消息除属性外,也含有一个有效载荷Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。

    消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。

    消息持久化:
    消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。

    简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:消息的持久性完全取决与消息本身的持久模式(persistence mode)。

    将消息以持久化方式发布时,会对性能造成一定的影响。

    AMQP工作过程

    • 发布者(Publisher)发布消息(Message),经由交换机(Exchange)。

    • 交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

    • 最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

    AMQP消息机制

    消息确认

    消费者在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。
    这就给我们出了个难题,AMQP代理在什么时候删除消息才是正确的。

    AMQP的两种消息确认模式

    • 自动确认模式:当消息中间件将消息发送给消费者后立即删除。(使用AMQP方法:basic.deliver 或 basic.get-ok)
    • 显式确认模式:待消费者发送一个确认回执(acknowledgement)后再删除消息。(使用AMQP方法:basic.ack)

    如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。

    拒绝消息

    当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。消费者可以向消息代理(消息中间件)表明,本条消息由于 “拒绝消息” 的原因处理失败了(或者未能在此时完成)。
    当拒绝某条消息时,消费者可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。

    当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

    在AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。

  • 相关阅读:
    Redis系列21:缓存与数据库的数据一致性讨论
    [附源码]计算机毕业设计JAVA音乐交流平台
    Hi3861 业务代码编写框架
    如何防止重复下单?
    山东大学2024深度学习期末考试回忆
    Entertainment in MAC(Round 932)
    【Mars3d】进行水平测量measure.area({的时候,会被模型遮挡的处理方法
    计算机操作系统 第三章:处理机调度与死锁
    # 案例:验证 app 自动化测试环境是否 OK(测试对象:夜神模拟器以及真机)
    从 Java 8 升级到 Java 17 全过程,避坑!
  • 原文地址:https://blog.csdn.net/m0_67391377/article/details/126030946