• RabbitMQ的六种工作模式


    一、普通队列模式

    1.  一个消费者,一个队列,一个消费者。
    2.  消息产生消息放入队列,消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

    22.cnblogs.com/blog/1913282/202207/1913282-20220730231124618-1368574550.png)

    • 获取RabbitMQ连接帮助类

      后面代码,这部分创建连接共用

      public class RabbitMQHelper
          {
              /// 
              /// 获取RabbitMQ连接
              /// 
              /// 
              public static IConnection GetConnection()
              {
                  //实例化连接工厂
                  var factory = new ConnectionFactory
                  {
                      HostName = "127.0.0.1", //ip
                      Port = 5672, // 端口
                      UserName = "Admin", // 账户
                      Password = "Admin", // 密码
                      VirtualHost = "/"   // 虚拟主机
                  };
      
                  return factory.CreateConnection();
              }
          }
    • 生产者

      public class Send
      {
      
          public static void SendMessage()
          {
              string queueName = "normal";
      
              //1.创建链接
              using (var connection = RabbitMQHelper.GetConnection())
              {
                  // 2.创建信道
                  using(var channel = connection.CreateModel())
                  {
                      // 3.声明队列
                      channel.QueueDeclare(queueName, false, false, false, null);
                      // 没有绑定交换机,怎么找到路由队列的呢?
                      for (int i = 1; i <= 30; i++)
                      {
                          //4.构建Byte消息数据包
                          string message =$"第{i}条消息";
                          var body = Encoding.UTF8.GetBytes(message);//消息以二进制形式传输
      
                          // 发送消息到rabbitmq,使用rabbitmq中默认提供交换机路由,默认的路由Key和队列名称完全一致
                          //5.发送数据包
                          channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
                          Thread.Sleep(1000);//添加延迟
                          Console.WriteLine("生产:" + message);
                      }
                  }
              }
      
          } 
      }
    • 消费者

      public class Receive
      {
          public static void ReceiveMessage()
          {
              // 消费者消费是队列中消息
              string queueName = "normal";
              //1.建立链接链接
              var connection = RabbitMQHelper.GetConnection();
              {
                  //2.建立信道
                  var channel = connection.CreateModel();
                  {
                      //3.声明队列:如果你先启动是消费端就会异常
                      channel.QueueDeclare(queueName, false, false, false, null);
                      //4.创建一个消费者实例
                      var consumer = new EventingBasicConsumer(channel);
                      //5.绑定消息接收后的事件委托
                      consumer.Received +=(model, ea) => {
                          var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                          Thread.Sleep(1000);
                          Console.WriteLine(" Normal Received => {0}", message);
                      }; 
                      //6.启动消费者
                      channel.BasicConsume( queue: queueName, autoAck:true, consumer);//开始消费
                  }
      
              }
      
          } 
      }

    二、工作队列模式

    1. 一个消费者,一个队列,多个消费者。但多个消费者中只会有一个会成功地消费消息

    2. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用。

    3. 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

    • 生产者

      public class WorkerSend
          {
      
              public static void SendMessage()
              {
                  string queueName = "Worker_Queue";
      
                  using (var connection = RabbitMQHelper.GetConnection())
                  {
                      using(var channel = connection.CreateModel())
                      {
                          channel.QueueDeclare(queueName, false, false, false, null);
                          for (int i = 0; i < 30; i++)
                          {
                              string message = $"RabbitMQ Worker {i + 1} Message";
                              var body = Encoding.UTF8.GetBytes(message);
                              channel.BasicPublish("", queueName, null, body);
                              Console.WriteLine("send Task {0} message",i + 1);
                          }
                         
                      }
                  }
                  
              } 
          }
    • 消费者

      public class WorkerReceive
          {
              public static void ReceiveMessage()
              {
                  string queueName = "Worker_Queue";
                  var connection = RabbitMQHelper.GetConnection();
                  {
                      var channel = connection.CreateModel();
                      {
                          channel.QueueDeclare(queueName, false, false, false, null);
                          var consumer = new EventingBasicConsumer(channel);
                          //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。
                          channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                          consumer.Received +=(model, ea) => {
                              var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                              Console.WriteLine(" Worker Queue Received => {0}", message);
                          }; 
                          channel.BasicConsume(queueName,true, consumer);
                      }
                     
                  }
                
              } 
          }

    三、扇形队列模式(发布/订阅模式)

    1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。每个消费队列中消息一致,且每个消息消费者都从自己的消息队列的第一个消息开始消费,直到最后。

    2. 交换机为rabbitMQ中内部组件。消息生产者将消息发送给rabbitMQ后,rabbitMQ会根据订阅的消费者个数,生成对应数目的消息队列,这样每个消费者都能获取生产者发送的全部消息。

    3. 一旦消费者断开与rabbitMQ的连接,队列就会消失。如果消费者数目很多,对于rabbitMQ而言,也是个重大负担,订阅模式是个长连接,占用并发数,且每个消费者一个队列会占用大量空间

    4. 相关应用场景:邮件群发,群聊,广播

    • 生产者
    public static void SendMessage()
            {
                //1.创建连接
                using (var connection = RabbitMQHelper.GetConnection())
                {
                    //2.创建信道
                    using(var channel = connection.CreateModel())
                    {
                        // 3.声明交换机对象
                        channel.ExchangeDeclare("fanout_exchange", "fanout");
                       
                        // 4.创建队列
                        string queueName1 = "fanout_queue1";
                        channel.QueueDeclare(queueName1, false, false, false, null);
                        string queueName2 = "fanout_queue2";
                        channel.QueueDeclare(queueName2, false, false, false, null);
                        string queueName3 = "fanout_queue3";
                        channel.QueueDeclare(queueName3, false, false, false, null);
                        
                        // 5.绑定到交互机
                        // fanout_exchange 绑定了 3个队列 
                        channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交换机
                        channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                        channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
    
                        for (int i = 0; i < 10; i++)
                        {
                            //6.构建消息byte数组
                            string message = $"RabbitMQ Fanout {i + 1} Message";
                            var body = Encoding.UTF8.GetBytes(message);
                            //7.发送消息
                            channel.BasicPublish("fanout_exchange", "", null, body);//同时把消息发送到订阅的三个队列
                            Console.WriteLine("Send Fanout {0} message",i + 1);
                        }
                    }
                }
                
            } 
        }
    • 消费者
    public class FanoutConsumer
        {
            public static void ConsumerMessage()
            {
                //1.创建连接
                var connection = RabbitMQHelper.GetConnection();
                {
                    //2,。创建信道
                    var channel = connection.CreateModel();
                    {
                        //3.申明exchange
                        channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
                        
                        // 4.创建队列
                        string queueName1 = "fanout_queue1";
                        channel.QueueDeclare(queueName1, false, false, false, null);
                        string queueName2 = "fanout_queue2";
                        channel.QueueDeclare(queueName2, false, false, false, null);
                        string queueName3 = "fanout_queue3";
                        channel.QueueDeclare(queueName3, false, false, false, null);
                        
                        // 5.绑定到交互机
                        channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
                        channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                        channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");
    
                        Console.WriteLine("[*] Waitting for fanout logs.");
    
                        //6.申明consumer
                        var consumer = new EventingBasicConsumer(channel);
                        //绑定消息接收后的事件委托
                        consumer.Received += (model, ea) => {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body.ToArray());
                            Console.WriteLine("[x] {0}", message);
    
                        };
                        //7.启动消费者
                        channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只会消费队列queueName1中的消息,其他队列中订阅的消息仍然存在
                        Console.WriteLine(" Press [enter] to exit.");
                        Console.ReadLine();
                    }
                }
            }
        }

    四、直接队列模式(Routing路由模式)

    1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。

    2. 消息生产者将消息发送给交换机,交换机按照路由判断,将路由到的RouteKey的消息,推送与之绑定的队列,交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

    • 生产者:
    public static void SendMessage()
            {
                //1.创建连接
                using (var connection = RabbitMQHelper.GetConnection())
                {
                    //2.创建信道
                    using(var channel = connection.CreateModel())
                    {
                        // 3.声明Direct交换机
                        channel.ExchangeDeclare("direct_exchange", "direct");
    
                        // 4.创建队列
                        string queueName1 = "direct_queue1";
                        channel.QueueDeclare(queueName1, false, false, false, null);
                        string queueName2 = "direct_queue2";
                        channel.QueueDeclare(queueName2, false, false, false, null);
                        string queueName3 = "direct_queue3";
                        channel.QueueDeclare(queueName3, false, false, false, null);
    
                        // 5.绑定到交互机 指定routingKey
                        channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
                        channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
                        channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");
    
                        for (int i = 0; i < 10; i++)
                        {
                            string message = $"RabbitMQ Direct {i + 1} Message =>green";
                            var body = Encoding.UTF8.GetBytes(message);
                            // 发送消息的时候需要指定routingKey发送
                            channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只发布到RouteKey:green的队列
                            Console.WriteLine("Send Direct {0} message",i + 1);
                        }
                    }
                }
                
            } 
        }
    • 消费者
    public class DirectConsumer
        {
            public static void ConsumerMessage()
            {
                //1.创建连接
                var connection = RabbitMQHelper.GetConnection();
                //2.创建通信
                var channel = connection.CreateModel();
                //3.声明交换机
                channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
                //4.绑定交换机
                var queueName = "direct_queue2";//队列direct_queue3绑定有red,yellow,green共3个RouteKey
                channel.QueueDeclare(queueName, false, false, false, null);
                //此处消费通信没有必要绑定所有的RouteKey,根据前生产者通信的路由规则,每个队列中只会路由到一种消息
                channel.QueueBind(queue: queueName,
                                          exchange: "direct_exchange",
                                          routingKey: "red");
                channel.QueueBind(queue: queueName,
                                          exchange: "direct_exchange",
                                          routingKey: "yellow");
                channel.QueueBind(queue: queueName,
                                          exchange: "direct_exchange",
                                          routingKey: "green");
    
                Console.WriteLine(" [*] Waiting for messages.");
    
                //5.实例化消费者
                var consumer = new EventingBasicConsumer(channel);
                //6.为消费者绑定消费委托事件
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
    
                    // 消费完成后需要手动签收消息,如果不写该代码就容易导致重复消费问题
                    //7.手动确认签收消息
                    channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次签收性能损耗
                };
    
                // 消息签收模式
                // 手动签收 保证正确消费,不会丢消息(基于客户端而已)
                // 自动签收 容易丢消息 
                // 签收:意味着消息从队列中删除
                channel.BasicConsume(queue: queueName,
                                     autoAck: false,
                                     consumer: consumer);//设置为不自动签收,进行手动签收
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }
        }

    五、模糊匹配队列模式(Topic 主题模式)

    1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。

    2. 此时的自己唯一的Routekey,不是一个确定值,像我们熟悉的正则表达式对应的匹配规则。

    3. 生产者产生消息,把消息交给交换机,交换机根据RouteKey的模糊匹配到对应的队列,由队列监听消费者消费消息。

    4. 规则:

      和* 都是通配符,命名规则是多个单词用顿号(.)分隔开

      代表代表一个单词

      *代表多个单词

    • 生产者:
    public static void SendMessage()
            {
                //1.创建连接
                using (var connection = RabbitMQHelper.GetConnection())
                {
                    //2.创建信道
                    using (var channel = connection.CreateModel())
                    {
                        //3.声明交换机
                        channel.ExchangeDeclare("topic_exchange", "topic");
                        //4.声明队列
                        string queueName1 = "topic_queue1";
                        channel.QueueDeclare(queueName1, false, false, false, null);
                        string queueName2 = "topic_queue2";
                        channel.QueueDeclare(queueName2, false, false, false, null);
                        string queueName3 = "topic_queue3";
                        channel.QueueDeclare(queueName3, false, false, false, null);
                        //5.绑定到交互机
                        channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*");
                        channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");
                        channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");
                    
                        for (int i = 0; i < 10; i++)
                        {
                            //6.准备发送字节数组
                            string message = $"RabbitMQ Topic {i + 1} Delete Message";
                            var body = Encoding.UTF8.GetBytes(message);
                            //7.根据RouteKey发布消息
                            channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//会发布到queueName1,queueName2
                            Console.WriteLine("Send Topic {0} message", i + 1);
                        }
                    }
                }
    
            }
    • 消费者:
    public static void ConsumerMessage()
            {
                //1.创建连接
                var connection = RabbitMQHelper.GetConnection();
                //2.创建通信
                var channel = connection.CreateModel();
                //3.声明交换机
                channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
                //4.声明队列
                var queueName = "topic_queue3";
                channel.QueueDeclare(queueName, false, false, false, null);
                //5.绑定交换机
                channel.QueueBind(queue: queueName,
                                          exchange: "topic_exchange",
                                          routingKey: "user.data.*");
    
                Console.WriteLine(" [*] Waiting for messages.");
                //6.创建消费者
                var consumer = new EventingBasicConsumer(channel);
                //7.绑定消费委托事件
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body;
                    var message = Encoding.UTF8.GetString(body.ToArray());
                    var routingKey = ea.RoutingKey;
                    Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
                };
    
                //8.启动消费
                channel.BasicConsume(queue: queueName,
                                     autoAck: true,
                                     consumer: consumer);
    
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
            }

    六、RPC 模式(了解)

    RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

    1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

    2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

    3、服务端将RPC方法 的结果发送到RPC响应队列。

    4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

  • 相关阅读:
    Pycharm run 输出界面控制一行能够输出的元素个数
    双亲委派机制
    31-k8s集群svc的代理模式-iptables修改为ipvs
    【「收藏」Oracle 数据库安装】
    [附源码]java毕业设计网上书店系统
    在Vue3中使用Element-Plus分页(Pagination )组件
    DBOW概要理解与记录
    微服务的好处(优点)有哪些?
    【C语言】深入浅出的概述运算符相关知识(详细讲解+源码展示)
    win10修改截图快捷键
  • 原文地址:https://blog.csdn.net/m0_73257876/article/details/126437206