• .Net Core&RabbitMQ消息存储可靠机制


    前言

    上篇讨论过消息投递和消息消费过程中如何确保可靠传输,也提及到消息到达RabbitMQ中到被消费前也需要可靠的留存,可因许多的不确定因素会影响着消息的存在与否。

    消息中转点

    生产者发送消息到RabbitMQ中,如果交换机根据自身类型和RoutingKey能够匹配到队列,则存入相关队列,但当匹配不到队列时,遇到两种情况而使得消息走向不同的方向,消息可能会丢失或是发回给生产者,这取决于生产者对消息的配置。

    0517938d45650d3fc889d9a1c6956054.png

    • 生产者设置了Mandatory且为true,则消息回退给生产者。

    • 当生产者为设置Mandatory或是设置为false时,为了避免消息丢失,可以由交换机路由给备份交换机负责去搞定存储。
      f838a5af64a07aed468a4f47c261ab59.png

    Mandatory

    生产者发送消息时,可以设置一个参数mandatory,来决定消息到达RabbitMQ后,如果出现交换机根据自身类型及RoutingKey找不到合适的队列情况下,消息的一个走向。

    • 当mandatory为true时,消息则返回给生产者。
      bcaacd3333485fcc4e94f60824eaa6c3.png

    • 当mandatory为false时,消息则被丢弃。

    生产者代码

    当在BasicPublish方法参数中设置mandatory为true且队列暂不声明时,仅有一个交换机,消息将会被返回。

    1. var connFactory = new ConnectionFactory
    2. {
    3.     HostName = "xxx.xxx.xxx.xxx",
    4.     Port = 5672,
    5.     UserName = "rabbitmqdemo",
    6.     Password = "rabbitmqdemo@test",
    7.     VirtualHost = "rabbitmqdemo"};
    8. using (var conn = connFactory.CreateConnection())
    9. {
    10.     using (var channel = conn.CreateModel())
    11.     {
    12.         var exchangeName = "mandatory_publishsubscribe_exchange";
    13.         channel.ExchangeDeclare(exchange: exchangeName, type: "fanout");
    14.         while (true)
    15.         {
    16.             Console.WriteLine("消息内容(exit退出):");
    17.             var message = Console.ReadLine();
    18.             if (message.Trim().ToLower() == "exit")
    19.             {
    20.                 break;
    21.             }
    22.             var body = Encoding.UTF8.GetBytes(message);
    23.             channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: true, basicProperties: null, body: body);
    24.             Console.WriteLine("消息内容发送完毕:" + message);
    25.         }
    26.     }
    27. }

    生产者发送消息,交换机收到消息但无对应队列,消息被返回。
    86dbe2b9f84913634db67f68e37eeeb5.png

    为了直观的知道消息返回到了生产者,我们可以增加一个监听器,来监听返回的消息。

    监听回退消息

    当mandatory设置为true,消息回退时可以监听消息

    1. channel.BasicReturn += new EventHandler((sender, e) =>
    2. {
    3.     var message = Encoding.UTF8.GetString(e.Body.ToArray());
    4.     Console.WriteLine($"收到回退消息:{message}");
    5. });

    生产者发送消息,因无匹配队列,消息被返回,可以直观的看到返回的消息。
    b4413ba55ba048df217ed41fab0b5a36.png

    备份交换机

    当mandatory设置为false时,消息被丢失了,这种情况可不太好。可以使用备份交换机来存储原要被丢弃的消息,当需要这些消息的时候,还能拿到这些消息。实际上备份交换机没有什么特殊,和主交换机是一样的只是充当备份的角色。
    298d774ebf27c6b37e9e8a265d06f47a.png

    生产者代码
    1. 在创建主交换机的时候,给定参数argument,设置该主交换机的备份交换机,指定备份交换机名称。

    2. 然后声明备份交换机并绑定一个队列,用于存储被丢弃的消息。

    3. 发送消息时mandatory参数设置为false。

    1. var connFactory = new ConnectionFactory
    2. {
    3.     HostName = "xxx.xxx.xxx.xxx",
    4.     Port = 5672,
    5.     UserName = "rabbitmqdemo",
    6.     Password = "rabbitmqdemo@test",
    7.     VirtualHost = "rabbitmqdemo"};
    8. using (var conn = connFactory.CreateConnection())
    9. {
    10.     using (var channel = conn.CreateModel())
    11.     {
    12.         var exchangeName = "aedemo_publishsubscribe_exchange";
    13.         var alternateExchangeName = "aedemo_ae_publishsubscribe_exchange";
    14.         var arguments = new Dictionary<string, object>
    15.         {
    16.             { "alternate-exchange", alternateExchangeName }
    17.         };
    18.         channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", arguments: arguments);
    19.         channel.ExchangeDeclare(exchange: alternateExchangeName, type: "fanout");
    20.         var alternateExchangeQueueName = alternateExchangeName + "_worker";
    21.         channel.QueueDeclare(queue: alternateExchangeQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
    22.         channel.QueueBind(queue: alternateExchangeQueueName, exchange: alternateExchangeName, routingKey: "");
    23.         while (true)
    24.         {
    25.             Console.WriteLine("消息内容(exit退出):");
    26.             var message = Console.ReadLine();
    27.             if (message.Trim().ToLower() == "exit")
    28.             {
    29.                 break;
    30.             }
    31.             var body = Encoding.UTF8.GetBytes(message);
    32.             channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: false, basicProperties: null, body: body);
    33.             Console.WriteLine("消息内容发送完毕:" + message);
    34.         }
    35.     }
    36. }

    启动程序,可以从Web面板中看到主交换机和备份交换机都创建完毕,并且主交换机打上了有AE的标记。
    2e984ffda13b7c39156c7aefb5ebb533.png

    生产者发送消息,经主交换机匹配但无合适队列后,转发给备份交换机,路由到其队列存储。
    b1eedabd72666eea2d3b638a72fcee16.png

    注:推荐使用Fanout类型的交换机,如果其他比如Direct,当主交换机转发到备份交换机,在进行匹配时候,如果消息给定的RoutingKey没有匹配到相应的队列,消息则会被丢失,这样一来,最初的预想就出现偏差了。

    持久化

    当RabbitMQ在异常情况下,比如系统宕机、重启、关闭等,可能会导致数据丢失,可靠性降低。针对这种情况,RabbitMQ提供了持久化机制,将消息本身和元数据(队列、交换机、绑定信息)都保存到磁盘中。具体分为三类持久化

    • 交换机持久化

    • 队列持久化

    • 消息持久化

    交换机持久化

    当RabbitMQ遇到异常情况(如服务重启)后,如果没有设置交换机持久化,那么交换机相关数据则会被丢失,生产者再发送消息到指定交换机时就失败了。

    服务重启异常

    1、在Web中新建一个交换机,指定非持久化模式。
    55bfd6b65bbeed59d2f2a426ae4f0532.png

    2、新建一个队列,指定非持久化模式。
    f7687baa8a56d322c15567df212691da.png

    3、设置交换机和队列的绑定关系。
    73971a2b35d1a7f0b49a5a02ebff5025.png

    4、生产者前部分正常发送消息,中间经服务重启后,交换机、队列及绑定关系都被清除,生产继续发送消息,出现异常。
    420919a29c7f0fd6362bcece18d502d4.png

    持久化设置

    在声明交换机时可以指定durable参数设置为true(Web面板中也可设置)。

    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, arguments: null);

    RabbitMQ服务重启,生产者继续发送消息给交换机。
    329233e17cc99427500974bce29d70ff.png

    队列持久化

    队列的持久化是队列声明时设置durable参数为true,如果队列不持久化,异常情况(如服务重启)后,队列元数据丢失,存储在内的消息也就丢失了。

    服务重启异常

    1、Web中创建一个交换机并设置为持久化模式。
    6741d7ee297a04d0fc43f4a60a0f8320.png

    2、创建一个队列并设置为非持久化模式
    409063821df4bbf75ed40622cf191f8a.png

    3、设置交换机和队列的绑定关系。
    c26e4bff1b512261f67065dcf07168d6.png

    4、生产者前部分正常发送消息,中间经服务重启后,队列及绑定关系被清除,生产继续发送消息,匹配队列失败,消息被回退给生产者。
    faa9b755883d2c6dd84257a0ef53c5c7.png

    持久化设置

    在声明队列时可以指定durable参数设置为true(Web面板中也可设置)。

    channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);

    RabbitMQ服务重启,生产者继续发送消息给交换机。
    30b788944e92f9fe76f663a40082d92c.png

    消息持久化

    队列的持久化仅能保证其自身的数据不丢失,而其存储的消息却不能保证不会丢失。
    186e51b76363235b65262897dcf833a6.png

    持久化设置

    需要对消息消息设置持久化,以确保消息本身不会因异常情况(如服务重启)而丢失。在发送消息时,可以设置消息的基础属性,来支持消息的持久化。

    1. var basicProperties = channel.CreateBasicProperties();
    2. basicProperties.DeliveryMode = 2;// 1非持久化 2持久化channel.BasicPublish(exchange: exchangeName, routingKey: "", mandatory: true, basicProperties: basicProperties, body: body);

    如此一来,当异常情况(如服务重启后),消息还是存在的。
    46f9032f5834d8c41f5d1ce4c9d8fff6.png

    注:消息持久化会影响性能,仅确保有价值的消息持久化,来权衡可靠与吞吐量。

    2022-08-25,望技术有成后能回来看见自己的脚步

  • 相关阅读:
    小鹏汽车的“未来”故事有点性感
    谈一谈Decoder模块
    深入理解 Jetpack Compose 内核:SlotTable 系统
    SQLite System.Data.SQLite和sqlite-net-pcl之间的区别
    STC15单片机-看门狗介绍
    HSCCTF 3th 2024 Web方向 题解wp
    windows环境变量滥用维权/提权
    docker安装jenkins最新版
    如何发布自己的golang库
    长短期记忆神经网络
  • 原文地址:https://blog.csdn.net/farway000/article/details/126672625