RabbitMQ六大模型官网:RabbitMQ Tutorials — RabbitMQ
其中Publish/Subscribe, Routing,Topics模型用到了Exchange交换器,交换器分别三种类型,这里会一起讲述。但是没有用到交换器的模型不代表它们没有交换器,它们只是使用了默认的交换器类型Direct。没有指定交换机但是一定会存在一个默认的交换机。

在上图的模型中,有以下概念:
P:生产者,也就是要发送消息的程序。
C:消费者:消息的接收者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。workqueue常用于避免消息堆积问题。
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

在订阅模型中,相比于前面两种模式,多了一个exchange角色,而且过程略有变化:
1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
注意:交换机只能转发消息不能存储消息
Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。
Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列。
Direct:定向,把消息交给符合指定routing key 的队列。
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列。
!注意:Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
通配符规则:
1.# :匹配一个或多个词(包括0)
2.* :匹配不多不少恰好1个词(词的长度可以为0)
举例:
item.# :能够匹配 item.insert.abc 或者 item.insert或者item
item.* :只能匹配 item.insert
*代表一个目录名的单词长度可以为0~n,而#代表的是多级目录0~n

1.导入依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
2.在application.yml进行配置
- # 服务端口
- server:
- port: 8080
- # 配置rabbitmq服务
- spring:
- rabbitmq:
- username: admin
- password: admin
- virtual-host: /
- host: 192.168.10.128
- port: 5672
3.编写配置类,声明队列和交换机,并且绑定
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitmqConfig {
- //声明队列
- @Bean
- public Queue emailQueue() {
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- // return new Queue("TestDirectQueue",true,true,false);
- //一般设置一下队列的持久化就好,其余两个就是默认false
- return new Queue("email.fanout.queue", true);
- }
- @Bean
- public Queue smsQueue() {
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- // return new Queue("TestDirectQueue",true,true,false);
- //一般设置一下队列的持久化就好,其余两个就是默认false
- return new Queue("sms.fanout.queue", true);
- }
- @Bean
- public Queue weixinQueue() {
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- // return new Queue("TestDirectQueue",true,true,false);
- //一般设置一下队列的持久化就好,其余两个就是默认false
- return new Queue("weixin.fanout.queue", true);
- }
- //声明注册fanout模式的交换机
- @Bean
- public FanoutExchange fanoutOrderExchange() {
- return new FanoutExchange("fanout_order_exchange", true, false);
- }
-
-
- //绑定 将队列和交换机绑定
- @Bean
- public Binding weixinBinging() {
- return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
- }
- @Bean
- public Binding smsBinding() {
- return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
- }
- @Bean
- public Binding emailBinding() {
- return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
- }
- }
4.生产者实现Producer
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.util.UUID;
-
- @Service
- public class OrderService {
- @Resource
- private RabbitTemplate rabbitTemplate;
- // 1: 定义交换机
- private String exchangeName = "fanout_order_exchange";
- // 2: 路由key
- private String routeKey = "";
- public void makeOrder(String userId, String productId, int num) {
- // 1: 模拟用户下单
- String orderNumer = UUID.randomUUID().toString();
- // 2: 根据商品id productId 去查询商品的库存
- // int numstore = productSerivce.getProductNum(productId);
- // 3:判断库存是否充足
- // if(num > numstore ){ return "商品库存不足..."; }
- // 4: 下单逻辑
- // orderService.saveOrder(order);
- // 5: 下单成功要扣减库存
- // 6: 下单完成以后
- System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
- // 发送订单信息给RabbitMQ fanout
- rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
- }
- }
5.消费者,创建新的项目,导入依赖和编写yaml ,yaml里面修改端口号,避免端口冲突
- # 服务端口
- server:
- port: 8081
- # 配置rabbitmq服务
- spring:
- rabbitmq:
- username: admin
- password: admin
- virtual-host: /
- host: 192.168.10.128
- port: 5672
- //@RabbitListener 指定⽬标⽅法来作为消费消息的⽅法,通过注解参数指定所监听的队列或者Binding
- //@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
- //@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
- @RabbitListener(queues = {"sms.fanout.queue"})
- @Service
- public class SMSConsumer {
- @RabbitHandler
- public void receiveMessage(String message){
- System.out.println("SMS接收到了————订单信息"+message);
- }
- }
- @RabbitListener(queues = {"email.fanout.queue"})
- @Service
- public class EmailConsumer {
- @RabbitHandler
- public void receiveMessage(String message){
- System.out.println("Email接收到了————订单信息"+message);
- }
- }
- @RabbitListener(queues = {"weixin.fanout.queue"})
- @Service
- public class WeixinConsumer {
- @RabbitHandler
- public void receiveMessage(String message){
- System.out.println("Weixin接收到了————订单信息"+message);
- }
- }
运行

Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。
因此最主要的区别还是在交换机配置这里
- import org.springframework.amqp.core.*;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitmqConfig {
- //声明队列
- @Bean
- public Queue emailQueue() {
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- // return new Queue("TestDirectQueue",true,true,false);
- //一般设置一下队列的持久化就好,其余两个就是默认false
- return new Queue("email.direct.queue", true);
- }
- @Bean
- public Queue smsQueue() {
-
- return new Queue("sms.direct.queue", true);
- }
- @Bean
- public Queue weixinQueue() {
-
- return new Queue("weixin.direct.queue", true);
- }
- //声明注册direct模式的交换机 这里是区别
- @Bean
- public DirectExchange directOrderExchange() {
- return new DirectExchange("direct_order_exchange", true, false);
- }
-
-
- //绑定 将队列和交换机绑定, 并设置路由key
- @Bean
- public Binding weixinBinging() {
- return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with("weixin");
- }
- @Bean
- public Binding smsBinding() {
- return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("sms");
- }
- @Bean
- public Binding emailBinding() {
- return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("email");
- }
- }
有个问题:交换机和队列的声明,绑定关系,在开发中是应该绑定在消费者中还是生产者中呢?
个人想法:应该绑定在消费者中,因为生产者是发送消息的,生产者如果先启动,如果消费者在生产者没有启动的时候,启动会没有消息,发生错误,而消费者是直接与消息打交道的,消费者可以先启动直接等待接收消息就可以了。但是也有人觉得生产者是需要先启动的,直接在生产者里面配置就可以了。所以两边都有歧义,自行考虑
routing key模糊查询
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.TopicExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class TopicEx {
- //声明队列
- @Bean
- public Queue emailQueue() {
- // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
- // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
- // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
- // return new Queue("TestDirectQueue",true,true,false);
- //一般设置一下队列的持久化就好,其余两个就是默认false
- return new Queue("email.topic.queue", true);
- }
- @Bean
- public Queue smsQueue() {
-
- return new Queue("sms.topic.queue", true);
- }
- @Bean
- public Queue weixinQueue() {
-
- return new Queue("weixin.topic.queue", true);
- }
- //声明注册topic模式的交换机
- @Bean
- public TopicExchange topicOrderExchange() {
- return new TopicExchange("topic_order_exchange", true, false);
- }
-
-
- //绑定 将队列和交换机绑定
- @Bean
- public Binding weixinBinging() {
- return BindingBuilder.bind(weixinQueue()).to(topicOrderExchange()).with("#.weixin.#");
- }
- @Bean
- public Binding smsBinding() {
- return BindingBuilder.bind(smsQueue()).to(topicOrderExchange()).with("#.sms.#");
- }
- @Bean
- public Binding emailBinding() {
- return BindingBuilder.bind(emailQueue()).to(topicOrderExchange()).with("#.email.#");
- }
- }
除了用配置文件可以声明队列,交换机,绑定,还可以用注解
- @RabbitListener(bindings =@QueueBinding(
- // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
- value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
- // order.fanout 交换机的名字 必须和生产者保持一致
- exchange = @Exchange(value = "topic_order_exchange",
- // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
- type = ExchangeTypes.TOPIC),
- //key模糊匹配
- key = "*.sms.#"))
测试

可以获取三个队列的消息。