• RabbitMQ—六种模型与Exchange类型


    RabbitMQ六大模型官网:RabbitMQ Tutorials — RabbitMQ

    其中Publish/Subscribe, RoutingTopics模型用到了Exchange交换器,交换器分别三种类型,这里会一起讲述。但是没有用到交换器的模型不代表它们没有交换器,它们只是使用了默认的交换器类型Direct。没有指定交换机但是一定会存在一个默认的交换机。

    一、简单模型(基本消息模型)

    在上图的模型中,有以下概念:
    P:生产者,也就是要发送消息的程序。
    C:消费者:消息的接收者,会一直等待消息到来。
    queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

    二、Work queues工作队列模型

     Work Queues 与入门程序的 简单模式 相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
    应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。workqueue常用于避免消息堆积问题。

    当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?
    主要有两种模式:
    1、轮询模式的分发:一个消费者一条,按均分配
    2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配

    三、Publish/Subscribe发布与订阅--Fanout(广播)

    在订阅模型中,相比于前面两种模式,多了一个exchange角色,而且过程略有变化:

    • 1) 可以有多个消费者

    • 2) 每个消费者有自己的queue(队列)

    • 3) 每个队列都要绑定到Exchange(交换机)

    • 4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

    • 5) 交换机把消息发送给绑定过的所有队列

    • 6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

           注意:交换机只能转发消息不能存储消息

    Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。


    Exchange有常见以下3种类型:
    Fanout:广播,将消息交给所有绑定到交换机的队列。
    Direct:定向,把消息交给符合指定routing key 的队列。
    Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列。
    !注意:Exchange(交换机只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
     

    四、Routing路由模式--Direct(路由)

    Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。

    P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key

    X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

    C1:消费者,其所在队列指定了需要routing key 为 error 的消息

    C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

    五、Topics通配符模式--Topic(通配符)

    Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型Exchange 可以让队列在绑定 Routing key 的时候使用通配符

    Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
    通配符规则:
    1.# :匹配一个或多个词(包括0)
    2.* :匹配不多不少恰好1个词(词的长度可以为0)
    举例:
    item.# :能够匹配 item.insert.abc 或者 item.insert或者item
    item.* :只能匹配 item.insert
     

    *代表一个目录名的单词长度可以为0~n,而#代表的是多级目录0~n

    代码部分

    一、SpringBoot案例——fanout模式

    1.导入依赖

    1. <dependency>
    2. <groupId>org.springframework.boot</groupId>
    3. <artifactId>spring-boot-starter-amqp</artifactId>
    4. </dependency>
    5. <dependency>
    6. <groupId>org.springframework.boot</groupId>
    7. <artifactId>spring-boot-starter-web</artifactId>
    8. </dependency>

    2.在application.yml进行配置 

    1. # 服务端口
    2. server:
    3. port: 8080
    4. # 配置rabbitmq服务
    5. spring:
    6. rabbitmq:
    7. username: admin
    8. password: admin
    9. virtual-host: /
    10. host: 192.168.10.128
    11. port: 5672

    3.编写配置类,声明队列和交换机,并且绑定

    1. import org.springframework.amqp.core.*;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. @Configuration
    5. public class RabbitmqConfig {
    6. //声明队列
    7. @Bean
    8. public Queue emailQueue() {
    9. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    10. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    11. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    12. // return new Queue("TestDirectQueue",true,true,false);
    13. //一般设置一下队列的持久化就好,其余两个就是默认false
    14. return new Queue("email.fanout.queue", true);
    15. }
    16. @Bean
    17. public Queue smsQueue() {
    18. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    19. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    20. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    21. // return new Queue("TestDirectQueue",true,true,false);
    22. //一般设置一下队列的持久化就好,其余两个就是默认false
    23. return new Queue("sms.fanout.queue", true);
    24. }
    25. @Bean
    26. public Queue weixinQueue() {
    27. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    28. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    29. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    30. // return new Queue("TestDirectQueue",true,true,false);
    31. //一般设置一下队列的持久化就好,其余两个就是默认false
    32. return new Queue("weixin.fanout.queue", true);
    33. }
    34. //声明注册fanout模式的交换机
    35. @Bean
    36. public FanoutExchange fanoutOrderExchange() {
    37. return new FanoutExchange("fanout_order_exchange", true, false);
    38. }
    39. //绑定 将队列和交换机绑定
    40. @Bean
    41. public Binding weixinBinging() {
    42. return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange());
    43. }
    44. @Bean
    45. public Binding smsBinding() {
    46. return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange());
    47. }
    48. @Bean
    49. public Binding emailBinding() {
    50. return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange());
    51. }
    52. }

    4.生产者实现Producer

    1. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    2. import org.springframework.stereotype.Service;
    3. import javax.annotation.Resource;
    4. import java.util.UUID;
    5. @Service
    6. public class OrderService {
    7. @Resource
    8. private RabbitTemplate rabbitTemplate;
    9. // 1: 定义交换机
    10. private String exchangeName = "fanout_order_exchange";
    11. // 2: 路由key
    12. private String routeKey = "";
    13. public void makeOrder(String userId, String productId, int num) {
    14. // 1: 模拟用户下单
    15. String orderNumer = UUID.randomUUID().toString();
    16. // 2: 根据商品id productId 去查询商品的库存
    17. // int numstore = productSerivce.getProductNum(productId);
    18. // 3:判断库存是否充足
    19. // if(num > numstore ){ return "商品库存不足..."; }
    20. // 4: 下单逻辑
    21. // orderService.saveOrder(order);
    22. // 5: 下单成功要扣减库存
    23. // 6: 下单完成以后
    24. System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
    25. // 发送订单信息给RabbitMQ fanout
    26. rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    27. }
    28. }

    5.消费者,创建新的项目,导入依赖和编写yaml ,yaml里面修改端口号,避免端口冲突

    1. # 服务端口
    2. server:
    3. port: 8081
    4. # 配置rabbitmq服务
    5. spring:
    6. rabbitmq:
    7. username: admin
    8. password: admin
    9. virtual-host: /
    10. host: 192.168.10.128
    11. port: 5672
    1. //@RabbitListener 指定⽬标⽅法来作为消费消息的⽅法,通过注解参数指定所监听的队列或者Binding
    2. //@RabbitListener 可以标注在类上面,需配合 @RabbitHandler 注解一起使用
    3. //@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,根据接受的参数类型进入具体的方法中。
    4. @RabbitListener(queues = {"sms.fanout.queue"})
    5. @Service
    6. public class SMSConsumer {
    7. @RabbitHandler
    8. public void receiveMessage(String message){
    9. System.out.println("SMS接收到了————订单信息"+message);
    10. }
    11. }
    1. @RabbitListener(queues = {"email.fanout.queue"})
    2. @Service
    3. public class EmailConsumer {
    4. @RabbitHandler
    5. public void receiveMessage(String message){
    6. System.out.println("Email接收到了————订单信息"+message);
    7. }
    8. }

    1. @RabbitListener(queues = {"weixin.fanout.queue"})
    2. @Service
    3. public class WeixinConsumer {
    4. @RabbitHandler
    5. public void receiveMessage(String message){
    6. System.out.println("Weixin接收到了————订单信息"+message);
    7. }
    8. }

    运行

     

    二、SpringBoot案例——direct模式

    Direct模式是fanout模式上的一种叠加,增加了路由RoutingKey的模式。

    因此最主要的区别还是在交换机配置这里

    1. import org.springframework.amqp.core.*;
    2. import org.springframework.context.annotation.Bean;
    3. import org.springframework.context.annotation.Configuration;
    4. @Configuration
    5. public class RabbitmqConfig {
    6. //声明队列
    7. @Bean
    8. public Queue emailQueue() {
    9. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    10. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    11. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    12. // return new Queue("TestDirectQueue",true,true,false);
    13. //一般设置一下队列的持久化就好,其余两个就是默认false
    14. return new Queue("email.direct.queue", true);
    15. }
    16. @Bean
    17. public Queue smsQueue() {
    18. return new Queue("sms.direct.queue", true);
    19. }
    20. @Bean
    21. public Queue weixinQueue() {
    22. return new Queue("weixin.direct.queue", true);
    23. }
    24. //声明注册direct模式的交换机 这里是区别
    25. @Bean
    26. public DirectExchange directOrderExchange() {
    27. return new DirectExchange("direct_order_exchange", true, false);
    28. }
    29. //绑定 将队列和交换机绑定, 并设置路由key
    30. @Bean
    31. public Binding weixinBinging() {
    32. return BindingBuilder.bind(weixinQueue()).to(directOrderExchange()).with("weixin");
    33. }
    34. @Bean
    35. public Binding smsBinding() {
    36. return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("sms");
    37. }
    38. @Bean
    39. public Binding emailBinding() {
    40. return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("email");
    41. }
    42. }

    有个问题:交换机和队列的声明,绑定关系,在开发中是应该绑定在消费者中还是生产者中呢?

    个人想法:应该绑定在消费者中,因为生产者是发送消息的,生产者如果先启动,如果消费者在生产者没有启动的时候,启动会没有消息,发生错误,而消费者是直接与消息打交道的,消费者可以先启动直接等待接收消息就可以了。但是也有人觉得生产者是需要先启动的,直接在生产者里面配置就可以了。所以两边都有歧义,自行考虑

    三、SpringBoot案例——topic模式

    routing key模糊查询

    1. import org.springframework.amqp.core.Binding;
    2. import org.springframework.amqp.core.BindingBuilder;
    3. import org.springframework.amqp.core.TopicExchange;
    4. import org.springframework.amqp.core.Queue;
    5. import org.springframework.context.annotation.Bean;
    6. import org.springframework.context.annotation.Configuration;
    7. @Configuration
    8. public class TopicEx {
    9. //声明队列
    10. @Bean
    11. public Queue emailQueue() {
    12. // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
    13. // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
    14. // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
    15. // return new Queue("TestDirectQueue",true,true,false);
    16. //一般设置一下队列的持久化就好,其余两个就是默认false
    17. return new Queue("email.topic.queue", true);
    18. }
    19. @Bean
    20. public Queue smsQueue() {
    21. return new Queue("sms.topic.queue", true);
    22. }
    23. @Bean
    24. public Queue weixinQueue() {
    25. return new Queue("weixin.topic.queue", true);
    26. }
    27. //声明注册topic模式的交换机
    28. @Bean
    29. public TopicExchange topicOrderExchange() {
    30. return new TopicExchange("topic_order_exchange", true, false);
    31. }
    32. //绑定 将队列和交换机绑定
    33. @Bean
    34. public Binding weixinBinging() {
    35. return BindingBuilder.bind(weixinQueue()).to(topicOrderExchange()).with("#.weixin.#");
    36. }
    37. @Bean
    38. public Binding smsBinding() {
    39. return BindingBuilder.bind(smsQueue()).to(topicOrderExchange()).with("#.sms.#");
    40. }
    41. @Bean
    42. public Binding emailBinding() {
    43. return BindingBuilder.bind(emailQueue()).to(topicOrderExchange()).with("#.email.#");
    44. }
    45. }

    除了用配置文件可以声明队列,交换机,绑定,还可以用注解

    1. @RabbitListener(bindings =@QueueBinding(
    2. // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
    3. value = @Queue(value = "sms.topic.queue",autoDelete = "false"),
    4. // order.fanout 交换机的名字 必须和生产者保持一致
    5. exchange = @Exchange(value = "topic_order_exchange",
    6. // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
    7. type = ExchangeTypes.TOPIC),
    8. //key模糊匹配
    9. key = "*.sms.#"))

    测试

     

    可以获取三个队列的消息。

     

  • 相关阅读:
    干货 | 师兄手把手教你如何踏上科研道路
    聊一聊redis十种数据类型及底层原理
    .NET ORM框架HiSql实战-第一章-集成HiSql
    学习记录683@类别不平衡问题解决的基本策略之再缩放的数学解释
    云原生游戏第 2 讲:OpenKruiseGame 设计理念详解
    seaborn学习1
    【Unity】填坑,Unity接入Epic Online Service上架Epic游戏商城
    jmeter 简单数据写入器 创建文件失败
    NetCore框架WTM的分表分库实现
    磷化铟量子点 InP/ZnS QDs 近红外二区量子点的吸收发射光谱图
  • 原文地址:https://blog.csdn.net/m0_46845579/article/details/125581863