• RabbitMQ的六种模式


    一、RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布订阅。其中一对一是简单队列模式,一对多是Worker模式,而发布订阅包括发布订阅模式,路由模式和通配符模式,为什么说发布订阅模式包含三种模式呢,其实发布订阅,路由,通配符三种模式都是使用只是交换机(Exchange)类型不一致

    1:一对一

    需要建立两个项目(NuGet引入RabbitMQ.Client),一个Send,一个Receive。代码如下

    Send:

    public static void SendMsg()
            {
                //创建链接工厂对象
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "guest",
                    Password = "guest"
    
                };
                //创建对象
                using (IConnection con = factory.CreateConnection())
                {
                    //创建链接会话对象
                    using (IModel model = con.CreateModel())
                    {
                        string queueName = "myQueue";
                        //声明一个队列‘
                        model.QueueDeclare(
                            queue: queueName //消息队列名称
                            , durable: true//是否缓存
                            , exclusive: false//是否独有
                            , autoDelete: true//自动删除
                            , arguments: null
                            );
                        //发送消息
                        while (true)
                        {
                            Console.WriteLine("请输入发送消息:");
                            string msg = Console.ReadLine();
                            byte[] data = Encoding.UTF8.GetBytes(msg);
                            model.BasicPublish(
                                exchange: ""
                                , routingKey: queueName
                                , basicProperties: null
                                , body: data
                                );
                        }
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    Receive:

    public static void ReceiveMsg()
            {
                //链接工厂
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
                factory.UserName = "guest";
                factory.Password = "guest";
                //链接对象
                using (var conn = factory.CreateConnection())
                {
                    //回话对象
                    using (var model = conn.CreateModel())
                    {
                        model.QueueDeclare(queue: "myQueue"
                            , durable: true
                            , exclusive: false
                            , autoDelete: true,
                            arguments: null);
                        //创建消费者对象
                        var customer = new EventingBasicConsumer(model);
                        customer.Received += Customer_Received;
                        //开启监听
                        model.BasicConsume(queue: "myQueue"
                            , autoAck: false
                            , consumer: customer);
                        Console.WriteLine("Press [enter] to exit");
                        Console.ReadLine();
                    }
                }
            }
    
            private static void Customer_Received(object sender, BasicDeliverEventArgs e)
            {
                byte[] body = e.Body.ToArray();
                string msg = Encoding.UTF8.GetString(body);
                Console.WriteLine("收到消息:" + msg);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    最终运行如下

    其中guest是默认的管理账号和密码,如果需要使用这些,可以登录http://localhost:15672/添加新用户

    这样你就可以在你的项目中使用新建的账号密码了。

    其中我遇到了两个坑,大家可能也需要注意一下

    1:端口:如果没有开启5672端口,可以在防火墙中开启端口,一直下一步就行

    2:就是账号和密码,我开始的时候没有使用guest,随便设置了两个,程序启动报错,(ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN.)如果遇到类似的情况,可以按照上面说过的,登录http://localhost:15672/#/users网页新建用户就可以了。

    3:在创建用户之后,发现程序启动不起来。报错(The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=‘NOT_ALLOWED - vhost / not found’, classId=10, methodId=40)原因是没有权限

    点击新建的用户设置virtualhost为/。设置好之后就会发现guest1和上面两个用户一样,Can access virtual hosts 是斜杠了,这样你就可以使用新建的用户发送消息了

    2:worker模式(多个1对1)

    以上的一对一是有问题的。

    可以看到运行两个接收者,然后发送者发送了1-5这五个消息,第一个接收者接收的是奇数,而第二个接收者接收的是偶数,但是现在的worker存在这很大的问题,

    1.丢失数据:一旦其中一个宕机,那么另外接收者的无法接收原本这个接收者所要接收的数据

    2.无法实现能者多劳:如果其中的接收者接收的较慢,那么便会极大的浪费性能,所以需要实现接收快的多接收

    开启多个消费者,然后生产者发送消息会出现下面的情况,每个消费者都会收到一条不同的消息,

    在Rabbit中存在两种消息确认模式,

    自动确认:只要消息从队列获取,无论消费者获取到消息后是否成功消费,都认为是消息成功消费,也就是说上面第二个接收者其实已经消费了它所接收的数据

    手动确认:消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈

    也就是说我们只要将消息确认模式改为手动即可,改为手动确认方式只需改两处,1.开启监听时将autoAck参数改为false,2.消息消费成功后返回确认

    由于我上面在消费者开启监听的时候将autoAck设置成了false,这样就开启了手动确认模式,且在接受消息时加入了Thread.Sleep(1000),这样在其中一个消费者崩溃的时候,会将本应该发送给崩溃的消费者的消息重新发给未崩溃的消费者。这里模拟了一下,在生产者发消息的时候,将第二个消费者关闭,然后发现本该在第二个消费者接收的消息,分别发送到了第一个和第三个,无论是否已经发送过。

    这样就避免了数据丢失。

    你可能注意到了,调度依照我们希望的方式运行。例如在有两个工作者的情况下,当所有的奇数任务都很繁重而所有的偶数任务都很轻松的时候,其中一个工作者会一直处于忙碌之中而另一个几乎无事可做。RabbitMQ并不会对此有任何察觉,仍旧会平均分配消息。

    这种情况发生的原因是由于当有消息进入队列时,RabbitMQ只负责将消息调度的工作,而不会检查某个消费者有多少未经确认的消息。它只是盲目的将第n个消息发送给第n个消费者而已。

    如何实现公平调度呢,其实是在手动确认的基础上实现的,将里面的Thread.Sleep(1000)改成随机数, System.Threading.Thread.Sleep(new Random().Next(1, 5) * 1000);并且在创建消费者之前添加确认机制。

    //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息

    model.BasicQos(0, 1, false);
    
    • 1

    这里可以看到,生产者发了很多消息,但是由于消费者没有确认,所以都没有收到后续的消息,所以,在消费者中还需要添加一个确认代码

    添加消息确认:

    model.BasicAck(e.DeliveryTag, true);
    
    • 1

    之后可以看到,由于不同的消费者接收消息的延迟随机,回复确认收到的时间也是随机的,这样,不同的消费者就会收到不定的消息了,这样就实现了公平调度。

    消费者完整代码:

    //由于消费者确认需要model,所以我将之前的事件写成了  ReceiveMsg里面!
    public static void ReceiveMsg()
            {
                //链接工厂
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
                factory.UserName = "fanlin";
                factory.Password = "1234";
                //链接对象
                using (var conn = factory.CreateConnection())
                {
                    //回话对象
                    using (var model = conn.CreateModel())
                    {
                        model.QueueDeclare(queue: "myQueue"
                            , durable: true
                            , exclusive: false
                            , autoDelete: true,
                            arguments: null);
                        //公平调度模式需要如下配置
                        //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
                        model.BasicQos(0, 1, false);
                        //创建消费者对象
                        var customer = new EventingBasicConsumer(model);
                        customer.Received += (sender, e) =>
                        {
                            //worker模式 如果其中一个中途崩溃,会出现收到消息,但是没有打印的情况,而另外一个也不会打印
                            //这里随机休息,表示每个消费者接收的消息的快慢不同,以实现公平调度。
                            System.Threading.Thread.Sleep(new Random().Next(1, 5) * 1000);
                            byte[] body = e.Body.ToArray();
                            string msg = Encoding.UTF8.GetString(body);
                            Console.WriteLine("收到消息:" + msg);
                            //公平调度
                            //返回消息确认
                            model.BasicAck(e.DeliveryTag, true);
                        };
                        //开启监听
                        model.BasicConsume(queue: "myQueue"
                            , autoAck: false
                            , consumer: customer);
                        Console.WriteLine("Press [enter] to exit");
                        Console.ReadLine();
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    补:上面的工作者模式已经将durable设置为true了。表示队列的持久化,这样即使生产者崩溃了,重启之后依旧不会忘记所有的队列,发消息依旧能够给每个消费者发送过去。

    接下来我们还需要设置消息的持久化。在Send中添加如下代码,此属性也需要在BasicPublish中进行标注

    //这里将消息标记为持久化
    //将消息标示为持久化并不能完全保证消息不会丢失。尽管它会告诉RabbitMQ将消息存储到硬盘上,但是在RabbitMQ接收到消息并将其进行存储两个行为之间仍旧会有一个窗口期。同样的,RabbitMQ也不会对每一条消息执行fsync(2),所以消息获取只是存到了缓存之中,而不是硬盘上。虽然持久化的保证不强,但是应对我们简单的任务队列已经足够了。如果你需要更强的保证,可以使用publisher confirms.

    var properties = model.CreateBasicProperties();
                        properties.Persistent = true;
    
    • 1
    • 2

    请注意,这是官方的注释:

    消息持久化的注释
    将消息标示为持久化并不能完全保证消息不会丢失。尽管它会告诉RabbitMQ将消息存储到硬盘上,但是在RabbitMQ接收到消息并将其进行存储两个行为之间仍旧会有一个窗口期。同样的,RabbitMQ也不会对每一条消息执行fsync(2),所以消息获取只是存到了缓存之中,而不是硬盘上。虽然持久化的保证不强,但是应对我们简单的任务队列已经足够了。如果你需要更强的保证,可以使用publisher confirms.

    对于消息的持久化暂时不知道如何验证,我会多去了解一下!

    3:发布订阅模式(1对多)

    发布订阅模式对于生产者唯一的改变就在于需要声明一个交换机,将消息发送到交换机,通过交换机发送出去。

    先介绍下交换机的几种类型:

    有几个可用的交换机类型:直连交换机(direct), 主题交换机(topic), 头交换机(headers) 和扇形交换机(fanout),这里先使用fanout,扇形交换机是负责将消息发送给所有他知道的队列

    3.1发布订阅

    Send

    /// 
            /// 发布订阅 生产者
            /// 发布订阅模式的区别在与不是直接发送消息,而是声明一个交换机,将消息发送到交换机,而交换机将这些消息发送给消费者
            /// 
            public static void SendMsg_Publish()
            {
                Console.WriteLine("发布订阅模式");
                //创建链接工厂对象
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "fanlin",
                    Password = "1234"
    
                };
                //创建对象
                using (IConnection con = factory.CreateConnection())
                {
                    //创建链接会话对象
                    using (IModel model = con.CreateModel())
                    {
                        //声明一个交换机
                        model.ExchangeDeclare(exchange: "myExchange", type: "fanout");
                        //发送消息
                        while (true)
                        {
                            Console.WriteLine("请输入发送消息:");
                            string msg = Console.ReadLine();
                            byte[] data = Encoding.UTF8.GetBytes(msg);
                            model.BasicPublish(
                                exchange: "myExchange"
                                , routingKey:""
                                , basicProperties: null
                                , body: data
                                );
                        }
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    而消费者需要绑定队列和交换机,这样交换机就能给每个消费者发送消息了。

    Receive

    /// 
            /// 发布订阅 消费者
            /// 
            public static void ReceiveMsg_Publish()
            {
                //链接工厂
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
                factory.UserName = "fanlin";
                factory.Password = "1234";
                //链接对象
                using (var conn = factory.CreateConnection())
                {
                    //回话对象
                    using (var model = conn.CreateModel())
                    {
                        //声明交换机
                        model.ExchangeDeclare("myExchange", "fanout");
                        string queueName = "myExchange_" + new Random().Next(1, 5);
                        Console.WriteLine("发布订阅模式" + queueName);
                        model.QueueDeclare(queue: queueName
                            , durable: true
                            , exclusive: false
                            , autoDelete: true,
                            arguments: null);
                        //绑定交换机和队列
                        model.QueueBind(queueName, "myExchange", "");
                        //公平调度模式需要如下配置
                        //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
                        model.BasicQos(0, 1, false);
                        //创建消费者对象
                        var customer = new EventingBasicConsumer(model);
                        customer.Received += (sender, e) =>
                        {
                            byte[] body = e.Body.ToArray();
                            string msg = Encoding.UTF8.GetString(body);
                            Console.WriteLine("收到消息:" + msg);
                            //公平调度
                            //返回消息确认
                            model.BasicAck(e.DeliveryTag, true);
                        };
                        //开启监听
                        model.BasicConsume(queue: queueName
                            , autoAck: false
                            , consumer: customer);
                        Console.WriteLine("Press any key to exit");
                        Console.ReadLine();
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    运行如下:

    3.2路由模式

    路由模式是指交换机将消息绑定到指定的消费者,绑定的消费者也只会收到定义路由绑定的消息。

    如图,我先开启了一个info的工作者,开启了一个info的消费者,一个warrning的消费者,这样info 的工作发送的消息只有info消费者才能收到。

    而接下来我又开启了一个warrning的生产者,此时warrning生产者发送的消息就可以让warrning的消费者收到了。如图

    这就是路由模式的效果。

    接下来上完整代码

    Send:

    /// 
            /// 发布订阅 生产者
            /// 发布订阅模式的区别在与不是直接发送消息,而是声明一个交换机,将消息发送到交换机,而交换机将这些消息发送给消费者
            /// 
            public static void SendMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, LogLevel logLevel = LogLevel.Null)
            {
                Console.WriteLine("发布订阅模式");
                //创建链接工厂对象
                var factory = new ConnectionFactory()
                {
                    HostName = "localhost",
                    Port = 5672,
                    UserName = "fanlin",
                    Password = "1234"
    
                };
                //创建对象
                using (IConnection con = factory.CreateConnection())
                {
                    //创建链接会话对象
                    using (IModel model = con.CreateModel())
                    {
                        //声明一个交换机
                        string exChangeName = "";
                        if(exchangeType!=ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
                        model.ExchangeDeclare(exchange: exChangeName, type: exchangeType.ToString(),true);
                        //发送消息
                        while (true)
                        {
                            Console.WriteLine("请输入发送消息:");
                            string msg = Console.ReadLine();
                            byte[] data = Encoding.UTF8.GetBytes(msg);
                            model.BasicPublish(
                                exchange: exChangeName
                                , routingKey: logLevel.ToString()
                                , basicProperties: null
                                , body: data
                                );
                        }
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    两个枚举的定义:

    public enum ExchangeType
        {
            fanout = 0,
            direct = 1,
            topic = 2,
            headers = 3
        }
        public enum LogLevel
        {
            Null = -1,
            info = 0,
            warrning = 1,
            debug = 2,
            error = 3
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Receive:

    /// 
            /// 发布订阅 消费者
            /// 
            public static void ReceiveMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, LogLevel logLevel = LogLevel.Null)
            {
                //链接工厂
                var factory = new ConnectionFactory();
                factory.HostName = "localhost";
                factory.Port = 5672;
                factory.UserName = "fanlin";
                factory.Password = "1234";
                //链接对象
                using (var conn = factory.CreateConnection())
                {
                    //回话对象
                    using (var model = conn.CreateModel())
                    {
                        //声明一个交换机
                        string exChangeName = "";
                        if (exchangeType != ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
                        //声明交换机
                        model.ExchangeDeclare(exChangeName, exchangeType.ToString(),true);
                        string queueName = "myExchange_" + new Random().Next(1, 5);
                        Console.WriteLine("发布订阅模式" + queueName);
                        model.QueueDeclare(queue: queueName
                            , durable: true
                            , exclusive: false
                            , autoDelete: true,
                            arguments: null);
    
                        //绑定时可以多个绑定
                        if (logLevel == LogLevel.all)
                        {
                            foreach (string level in Enum.GetValues(typeof(LogLevel)))
                            {
                                if (level != "Null" && level != "all")
                                {
                                    model.QueueBind(queueName, exChangeName, level);
                                }
                            }
                        }
                        else if (logLevel == LogLevel.Null)
                        {
                            //绑定交换机和队列
                            model.QueueBind(queueName, exChangeName, "");
                        }
                        else
                        {
                            //绑定交换机和队列
                            model.QueueBind(queueName, exChangeName, logLevel.ToString());
                        }
                        //多劳多得模式需要如下配置
                        //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
                        model.BasicQos(0, 1, false);
                        //创建消费者对象
                        var customer = new EventingBasicConsumer(model);
                        customer.Received += (sender, e) =>
                        {
                            byte[] body = e.Body.ToArray();
                            string msg = Encoding.UTF8.GetString(body);
                            Console.WriteLine("收到消息:" + msg);
                            //多劳多得
                            //返回消息确认
                            model.BasicAck(e.DeliveryTag, true);
                        };
                        //开启监听
                        model.BasicConsume(queue: queueName
                            , autoAck: false
                            , consumer: customer);
                        Console.WriteLine("Press any key to exit");
                        Console.ReadLine();
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    3.3主题交换机

    主题交换机也可称为通配符,主要是用过#和*来做匹配的。

    • `*` (星号) 能够替代一个单词。

    • `#` (井号) 能够替代零个或多个单词。
      规则大致如下图

      然后我们开启4个生产者,分别有传入不同的参数

    • 然后开启三个消费者,匹配不同的路由

      可以看到,不同匹配的消费者会从不同的生产者收到消息,这样比直连交换机更好的就在于可以根据多个条件进行配置,更加容易扩展,也更加灵活。附上完整代码
      Send:

       /// 
              /// 发布订阅 生产者
              /// 发布订阅模式的区别在与不是直接发送消息,而是声明一个交换机,将消息发送到交换机,而交换机将这些消息发送给消费者
              /// 
              public static void SendMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, string routingKey = "")
              {
                  Console.WriteLine("发布订阅模式");
                  //创建链接工厂对象
                  var factory = new ConnectionFactory()
                  {
                      HostName = "localhost",
                      Port = 5672,
                      UserName = "fanlin",
                      Password = "1234"
      
                  };
                  //创建对象
                  using (IConnection con = factory.CreateConnection())
                  {
                      //创建链接会话对象
                      using (IModel model = con.CreateModel())
                      {
                          //声明一个交换机
                          string exChangeName = "";
                          if (exchangeType != ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
                          Console.WriteLine("交换机名称:" + exChangeName + "routingKey:" + routingKey);
                          model.ExchangeDeclare(exchange: exChangeName, type: exchangeType.ToString(), true);
                          //发送消息
                          while (true)
                          {
                              Console.WriteLine("请输入发送消息:");
                              string msg = Console.ReadLine();
                              byte[] data = Encoding.UTF8.GetBytes(msg);
                              model.BasicPublish(
                                  exchange: exChangeName
                                  , routingKey: routingKey
                                  , basicProperties: null
                                  , body: data
                                  );
                          }
                      }
                  }
              }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43

      Receive:

       /// 
              /// 发布订阅 消费者
              /// 
              public static void ReceiveMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, string routingKey = "")
              {
                  //链接工厂
                  var factory = new ConnectionFactory();
                  factory.HostName = "localhost";
                  factory.Port = 5672;
                  factory.UserName = "fanlin";
                  factory.Password = "1234";
                  //链接对象
                  using (var conn = factory.CreateConnection())
                  {
                      //回话对象
                      using (var model = conn.CreateModel())
                      {
                          //声明一个交换机
                          string exChangeName = "";
                          if (exchangeType != ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
                          //声明交换机
                          model.ExchangeDeclare(exChangeName, exchangeType.ToString(), true);
                          string queueName = model.QueueDeclare().QueueName;
                          Console.WriteLine("交换机名称:" + exChangeName + "queueName:"+ queueName + "routingKey:" + routingKey);
      
                          //绑定时可以多个绑定
                          var arrTmp = routingKey.Split(',');
                          foreach (string key in arrTmp)
                          {
                              //绑定交换机和队列
                              model.QueueBind(queueName, exChangeName, key);
                          }
                          //多劳多得模式需要如下配置
                          //告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
                          model.BasicQos(0, 1, false);
                          //创建消费者对象
                          var customer = new EventingBasicConsumer(model);
                          customer.Received += (sender, e) =>
                          {
                              byte[] body = e.Body.ToArray();
                              string msg = Encoding.UTF8.GetString(body);
                              Console.WriteLine("收到消息:" + msg);
                              //多劳多得
                              //返回消息确认
                              model.BasicAck(e.DeliveryTag, true);
                          };
                          //开启监听
                          model.BasicConsume(queue: queueName
                              , autoAck: false
                              , consumer: customer);
                          Console.WriteLine("Press any key to exit");
                          Console.ReadLine();
                      }
                  }
              }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55

      参考:RabbitMQ中文文档 · RabbitMQ in Chinese

    先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦

  • 相关阅读:
    指针之野指针系列(2):如何规避野指针
    C# 图解教程 第5版 —— 第3章 C# 编程概述
    【Linux】关于进程
    最优化理论笔记及期末复习(《数值最优化》——高立)
    简单的学生网页作业源码 基于html css javascript jquery bootstarp响应式网页设计——大理我的家乡旅游景点
    聚L-精氨酸/纳米金/石墨烯/聚苯胺复合膜/铝粉/稀土粒子修饰多巴胺的制备
    发展前景好、薪资高,计算机行业成为许多人改变命运的首选!
    Mybatis学习周报总结
    【matplotlib】matplotlib的颜色表
    【大数据入门核心技术-Hadoop】(一)Hadoop简介
  • 原文地址:https://blog.csdn.net/web15085599741/article/details/126113970