RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。
RabbitMQ有六种模式
- 简单模式
- work模式
- Publish/Subscribe发布与订阅模式
- Routing路由模式
- Topics主题模式
- RPC远程调用模式(远程调用,不太算MQ;暂不作介绍)
AMQP 一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。
RabbitMQ是AMQP协议的Erlang的实现。
概念 | 说明 |
连接Connection | 一个网络连接,比如TCP/IP套接字连接。 |
信道Channel | 多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。 |
客户端Client | AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。 |
服务节点Broker | 消息中间件的服务节点;一般情况下可以将一个RabbitMQ Broker看作一台RabbitMQ 服务器。 |
端点 | AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。 |
消费者Consumer | 一个从消息队列里请求消息的客户端程序。 |
生产者Producer | 一个向交换机发布消息的客户端应用程序。 |
导入相关依赖
-
-
org.springframework.boot -
spring-boot-starter-amqp -
在你的application中添加以下代码(根据自己的实际情况修改)
- spring:
- rabbitmq:
- host: 192.168.67.129
- port: 5672
- username: admin
- password: 123
创建配置类
- @Bean
- public Jackson2JsonMessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
-
- @Bean
- public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- rabbitTemplate.setConnectionFactory(connectionFactory);
- rabbitTemplate.setMessageConverter(messageConverter());
- return rabbitTemplate;
- }
创建枚举类RabblimqConstants 创建常量
- public interface RabblimqConstants {
-
- String SIMPLE_QUEUE = "hello_queue_new";//点对点常量
-
- String WORK_QUEUE = "my_work_queue"; //工作模式
-
- //路由模式
- String PUBLISHER_A = "publisher_quque_a";
- String PUBLISHER_B = "publisher_quque_b";
- String PUBLISHER_EXCHANGE = "my.exchange";
-
-
- //创建两个队列
- String FANOUT_QUEUE_A = "fanout_queue_a";
- String FANOUT_QUEUE_B = "fanout_queue_b";
-
-
- //创建交换机 广播模式
- String FANOUT_EXCHANGE = "my.fanout_exchange";
-
-
- //rounting交换机
- String ROUTING_QUEUE_ORANGE = "my_routing_queue_orange";
-
- String ROUTING_QUEUE_BLACK = "my_routing_queue_black";
- String ROUTING_QUEUE_GRENN = "my_routing_queue_green";
-
- String ROUTING_EXCHANGE = "my.routing_exchange";
-
- //定义rounting key
- String ROUNTING_KEY_ORANGE = "orange";
- String ROUNTING_KEY_BLACK = "black";
- String ROUNTING_KEY_GREEN = "green";
-
-
- //topic模式
-
- String TOPIC_QUEUE_RED = "my_topic_queue_red";
-
- String TOPIC_QUEUE_WHITE = "my_topic_queue_white";
-
- String TOPIC_EXCHANGE = "my.topic.exchange";
-
- String TOPIC_KEY_RED = "red.#";
-
- String TOPIC_KEY_WHITE = "*.white.#";
-
-
- String NORMAL_QUEUE = "normal_queue";//正常队列
-
- String NORMAL_EXCHANGE = "normal_exchange";//正常交换机
-
- String NORMAL_ROUNTING_KEY = "normal_routingkey"; //正常的key
-
- String DL_QUEUE = "dl_queue";//死信队列
-
- String DL_EXCHANGE = "dl_exchange";//死信交换机
-
- String DL_ROUNTING_KEY = "dl_routingkey"; //死信key
-
- //插件完成死信队列
- String SDL_QUEUE = "sdl_queue";
-
- String SDL_EXCHANGE = "sdl_exchange";
-
- String SDL_ROUNTING_KEY = "sdl_routingkey";
-
- String SDL_EXCHANGE_TYPE = "x-delayed-message";
-
-
-
- }
模式说明
在上图的模型中,有以下概念:
- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
工作队列模式实际上是一种竞争关系的模式,多个消费者之间是竞争关系,即一条消息如果被某个消费者消费了,那么其他的消费者就获取不到了。
发布订阅模式:
- 1、每个消费者监听自己的队列。
- 2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息
模式说明
路由模式特点:队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey。
Exchange 不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key 完全一致,才会接收到消息
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
模式说明
Topic主题模式也叫通配符模式。Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
#:匹配零个或多个词
*:匹配不多不少恰好1个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert