AMQP(Advanced Message Queuing Protocol)高级消息队列协议,一个提供统一消息服务的应用层标准协议,是应用层协议的一个开放标准,为面向消息的中间件设计。AMQP是一个进程间传递异步消息的网络协议。
基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP在消息提供者和客户端的行为进行了强制规定,使得不同卖商之间真正实现了互操作能力。
JMS是早期消息中间件进行标准化的一个尝试,它仅仅是在API级进行了规范,离创建互操作能力还差很远。
与JMS不同,AMQP是一个Wire级的协议,它描述了在网络上传输的数据的格式,以字节为流。因此任何遵守此数据格式的工具,其创建和解释消息,都能与其他兼容工具进行互操作。
生产消息。
生产Connection的的工厂。
连接,应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。
AMQP连接通常是长连接。AMQP是一个使用 TCP提供可靠投递的应用层协议。AMQP 使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。
网络信道,是建立在Connection连接之上的一种轻量级的连接。几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。
如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤。一个Connection上可以创建任意数量的Channel。
我们大部分的业务操作是在Channel这个接口中完成的,包括:
接受客户端的连接,实现AMQP实体服务,如rabbitmq。
虚拟主机,用于进行逻辑隔离,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。
为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。
交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。
交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息之后将它路由给一个或零个队列。它使用哪种路由算法是由交换机类型和绑定(Bindings)规则所决定的。
交换机类型:
交换机属性:
交换机状态:
持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。
默认交换机(default exchange)实际上是一个由消息代理预先声明好的,没有名字(名字为空字符串)的直连交换机(direct exchange)。
可以认为默认交换机就是一个特殊的直连交换机。
默认交换机名称:空字符串(AMQP default)
默认交换机类型:直连交换机
在创建一个Queue时,只要没指定需要绑定的交换机,都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。
直连型交换机是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由。
在创建一个Queue时,如果绑定的是直连交换机,可以不必指定routing key的名字,因为他会有一个默认的routing key名称,名称同Queue名称。
直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)。
工作流程:
扇型交换机将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。
如果N个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的N个队列。扇型交换机一般用来处理消息的广播路由(broadcast routing)。
应用场景:
主题交换机是根据routing key和Exchange的类型将message发送到一个或者多个Queue中,我们经常拿他来实现各种publish/subscribe,即发布订阅。
直连交换机的路由规则是严格意义上的匹配,换言之Routing Key必须与Binding Key相匹配的时候才将消息传送给Queue。
主题交换机的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。
它约定:
A.*.*
,只会被路由到queue1;A.*.*
和*.B.*
会被路由到queue1、queue2;A.*.*
和*.B.*
和*.*.C
会被路由到queue1、queue2、queue3。应用场景:
头交换机不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。
头交换机可以视为直连交换机的另一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少用到头交换机)。
工作流程:
交换机类型
交换机名称
Binding Key
Routing Key
路由规则
默认交换机
空串,不可修改
默认为Queue名称,不可改
同Binding Key名称
Routing Key==Binding Key,严格匹配
直连交换机
自定义
默认为Queue名称,可修改
同Binding Key名称
Routing Key==Binding Key,严格匹配
扇形交换机
自定义
无Binding Key
无Routing Key
无Binding Key,自动路由到交换机绑定的所有Queue中
主题交换机
自定义
自定义
自定义
Routing Key==Binding Key,模糊匹配
头交换机
自定义
自定义
自定义
根据发送的消息内容中的headers属性进行匹配
Exchange和Queue之间的虚拟连接。
BindingKey是Exchange和Queue绑定的规则描述。Binding Key指定当前Exchange下,什么样的Routing Key会被下派到当前绑定的Queue中。
路由规则,虚拟机可以用它来确定如何路由一个特定消息。
Binding Key与Routing Key的关系
Binding Key是Exchange和Queue绑定的规则描述,这个描述用来解析当Exchange接收到消息时,Exchange接收到的消息会带有Routing Key这个字段,Exchange就是根据这个Routing Key和当前Exchange所有绑定的Binding Key做匹配,如果满足要求,就往Binding Key所绑定的Queue发送消息。
各种交换机中的Binding Key与Routing Key
存储着即将被应用消费掉的消息。
队列属性:
队列创建:
队列在声明后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。
队列持久化:
持久化队列会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。
消费者消费消息。在AMQP中,消费者获取待消费消息的途径有两种:
需要注意:多个消费者监听同一个队列时,队列中的消息只会被其中一个消费者消费(并不会每个消费者都消费一次
)
消息,服务与应用程序之间传送的数据,由Properties(属性)和body(主体)组成。
属性是对消息进行修饰,比如消息的优先级,延迟等高级特性,主体则就是消息体的内容。
消息属性:
消息主体:
AMQP 的消息除属性外,也含有一个有效载荷Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。
消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和MessagePack将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。
消息持久化:
消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。
简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:消息的持久性完全取决与消息本身的持久模式(persistence mode)。
将消息以持久化方式发布时,会对性能造成一定的影响。
发布者(Publisher)发布消息(Message),经由交换机(Exchange)。
交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。
最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。
消费者在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。
这就给我们出了个难题,AMQP代理在什么时候删除消息才是正确的。
AMQP的两种消息确认模式:
如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。
当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。消费者可以向消息代理(消息中间件)表明,本条消息由于 “拒绝消息” 的原因处理失败了(或者未能在此时完成)。
当拒绝某条消息时,消费者可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。
当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。
在AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。