一、RabbitMQ从信息接收者角度可以看做三种模式,一对一,一对多(此一对多并不是发布订阅,而是每条信息只有一个接收者)和发布订阅。其中一对一是简单队列模式,一对多是Worker模式,而发布订阅包括发布订阅模式,路由模式和通配符模式,为什么说发布订阅模式包含三种模式呢,其实发布订阅,路由,通配符三种模式都是使用只是交换机(Exchange)类型不一致
1:一对一
需要建立两个项目(NuGet引入RabbitMQ.Client),一个Send,一个Receive。代码如下
Send:
public static void SendMsg()
{
//创建链接工厂对象
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "guest",
Password = "guest"
};
//创建对象
using (IConnection con = factory.CreateConnection())
{
//创建链接会话对象
using (IModel model = con.CreateModel())
{
string queueName = "myQueue";
//声明一个队列‘
model.QueueDeclare(
queue: queueName //消息队列名称
, durable: true//是否缓存
, exclusive: false//是否独有
, autoDelete: true//自动删除
, arguments: null
);
//发送消息
while (true)
{
Console.WriteLine("请输入发送消息:");
string msg = Console.ReadLine();
byte[] data = Encoding.UTF8.GetBytes(msg);
model.BasicPublish(
exchange: ""
, routingKey: queueName
, basicProperties: null
, body: data
);
}
}
}
}
Receive:
public static void ReceiveMsg()
{
//链接工厂
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5672;
factory.UserName = "guest";
factory.Password = "guest";
//链接对象
using (var conn = factory.CreateConnection())
{
//回话对象
using (var model = conn.CreateModel())
{
model.QueueDeclare(queue: "myQueue"
, durable: true
, exclusive: false
, autoDelete: true,
arguments: null);
//创建消费者对象
var customer = new EventingBasicConsumer(model);
customer.Received += Customer_Received;
//开启监听
model.BasicConsume(queue: "myQueue"
, autoAck: false
, consumer: customer);
Console.WriteLine("Press [enter] to exit");
Console.ReadLine();
}
}
}
private static void Customer_Received(object sender, BasicDeliverEventArgs e)
{
byte[] body = e.Body.ToArray();
string msg = Encoding.UTF8.GetString(body);
Console.WriteLine("收到消息:" + msg);
}
最终运行如下
其中guest是默认的管理账号和密码,如果需要使用这些,可以登录http://localhost:15672/添加新用户
这样你就可以在你的项目中使用新建的账号密码了。
其中我遇到了两个坑,大家可能也需要注意一下
1:端口:如果没有开启5672端口,可以在防火墙中开启端口,一直下一步就行
2:就是账号和密码,我开始的时候没有使用guest,随便设置了两个,程序启动报错,(ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN.)如果遇到类似的情况,可以按照上面说过的,登录http://localhost:15672/#/users网页新建用户就可以了。
3:在创建用户之后,发现程序启动不起来。报错(The AMQP operation was interrupted: AMQP close-reason, initiated by Peer, code=530, text=‘NOT_ALLOWED - vhost / not found’, classId=10, methodId=40)原因是没有权限
点击新建的用户设置virtualhost为/。设置好之后就会发现guest1和上面两个用户一样,Can access virtual hosts 是斜杠了,这样你就可以使用新建的用户发送消息了
2:worker模式(多个1对1)
以上的一对一是有问题的。
可以看到运行两个接收者,然后发送者发送了1-5这五个消息,第一个接收者接收的是奇数,而第二个接收者接收的是偶数,但是现在的worker存在这很大的问题,
1.丢失数据:一旦其中一个宕机,那么另外接收者的无法接收原本这个接收者所要接收的数据
2.无法实现能者多劳:如果其中的接收者接收的较慢,那么便会极大的浪费性能,所以需要实现接收快的多接收
开启多个消费者,然后生产者发送消息会出现下面的情况,每个消费者都会收到一条不同的消息,
在Rabbit中存在两种消息确认模式,
自动确认:只要消息从队列获取,无论消费者获取到消息后是否成功消费,都认为是消息成功消费,也就是说上面第二个接收者其实已经消费了它所接收的数据
手动确认:消费从队列中获取消息后,服务器会将该消息处于不可用状态,等待消费者反馈
也就是说我们只要将消息确认模式改为手动即可,改为手动确认方式只需改两处,1.开启监听时将autoAck参数改为false,2.消息消费成功后返回确认
由于我上面在消费者开启监听的时候将autoAck设置成了false,这样就开启了手动确认模式,且在接受消息时加入了Thread.Sleep(1000),这样在其中一个消费者崩溃的时候,会将本应该发送给崩溃的消费者的消息重新发给未崩溃的消费者。这里模拟了一下,在生产者发消息的时候,将第二个消费者关闭,然后发现本该在第二个消费者接收的消息,分别发送到了第一个和第三个,无论是否已经发送过。
这样就避免了数据丢失。
你可能注意到了,调度依照我们希望的方式运行。例如在有两个工作者的情况下,当所有的奇数任务都很繁重而所有的偶数任务都很轻松的时候,其中一个工作者会一直处于忙碌之中而另一个几乎无事可做。RabbitMQ并不会对此有任何察觉,仍旧会平均分配消息。
这种情况发生的原因是由于当有消息进入队列时,RabbitMQ只负责将消息调度的工作,而不会检查某个消费者有多少未经确认的消息。它只是盲目的将第n个消息发送给第n个消费者而已。
如何实现公平调度呢,其实是在手动确认的基础上实现的,将里面的Thread.Sleep(1000)改成随机数, System.Threading.Thread.Sleep(new Random().Next(1, 5) * 1000);并且在创建消费者之前添加确认机制。
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
model.BasicQos(0, 1, false);
这里可以看到,生产者发了很多消息,但是由于消费者没有确认,所以都没有收到后续的消息,所以,在消费者中还需要添加一个确认代码
添加消息确认:
model.BasicAck(e.DeliveryTag, true);
之后可以看到,由于不同的消费者接收消息的延迟随机,回复确认收到的时间也是随机的,这样,不同的消费者就会收到不定的消息了,这样就实现了公平调度。
消费者完整代码:
//由于消费者确认需要model,所以我将之前的事件写成了 ReceiveMsg里面!
public static void ReceiveMsg()
{
//链接工厂
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5672;
factory.UserName = "fanlin";
factory.Password = "1234";
//链接对象
using (var conn = factory.CreateConnection())
{
//回话对象
using (var model = conn.CreateModel())
{
model.QueueDeclare(queue: "myQueue"
, durable: true
, exclusive: false
, autoDelete: true,
arguments: null);
//公平调度模式需要如下配置
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
model.BasicQos(0, 1, false);
//创建消费者对象
var customer = new EventingBasicConsumer(model);
customer.Received += (sender, e) =>
{
//worker模式 如果其中一个中途崩溃,会出现收到消息,但是没有打印的情况,而另外一个也不会打印
//这里随机休息,表示每个消费者接收的消息的快慢不同,以实现公平调度。
System.Threading.Thread.Sleep(new Random().Next(1, 5) * 1000);
byte[] body = e.Body.ToArray();
string msg = Encoding.UTF8.GetString(body);
Console.WriteLine("收到消息:" + msg);
//公平调度
//返回消息确认
model.BasicAck(e.DeliveryTag, true);
};
//开启监听
model.BasicConsume(queue: "myQueue"
, autoAck: false
, consumer: customer);
Console.WriteLine("Press [enter] to exit");
Console.ReadLine();
}
}
}
补:上面的工作者模式已经将durable设置为true了。表示队列的持久化,这样即使生产者崩溃了,重启之后依旧不会忘记所有的队列,发消息依旧能够给每个消费者发送过去。
接下来我们还需要设置消息的持久化。在Send中添加如下代码,此属性也需要在BasicPublish中进行标注
//这里将消息标记为持久化
//将消息标示为持久化并不能完全保证消息不会丢失。尽管它会告诉RabbitMQ将消息存储到硬盘上,但是在RabbitMQ接收到消息并将其进行存储两个行为之间仍旧会有一个窗口期。同样的,RabbitMQ也不会对每一条消息执行fsync(2),所以消息获取只是存到了缓存之中,而不是硬盘上。虽然持久化的保证不强,但是应对我们简单的任务队列已经足够了。如果你需要更强的保证,可以使用publisher confirms.
var properties = model.CreateBasicProperties();
properties.Persistent = true;
请注意,这是官方的注释:
消息持久化的注释
将消息标示为持久化并不能完全保证消息不会丢失。尽管它会告诉RabbitMQ将消息存储到硬盘上,但是在RabbitMQ接收到消息并将其进行存储两个行为之间仍旧会有一个窗口期。同样的,RabbitMQ也不会对每一条消息执行fsync(2),所以消息获取只是存到了缓存之中,而不是硬盘上。虽然持久化的保证不强,但是应对我们简单的任务队列已经足够了。如果你需要更强的保证,可以使用publisher confirms.
对于消息的持久化暂时不知道如何验证,我会多去了解一下!
3:发布订阅模式(1对多)
发布订阅模式对于生产者唯一的改变就在于需要声明一个交换机,将消息发送到交换机,通过交换机发送出去。
先介绍下交换机的几种类型:
有几个可用的交换机类型:直连交换机(direct
), 主题交换机(topic
), 头交换机(headers
) 和扇形交换机(fanout
),这里先使用fanout,扇形交换机是负责将消息发送给所有他知道的队列
3.1发布订阅
Send
///
/// 发布订阅 生产者
/// 发布订阅模式的区别在与不是直接发送消息,而是声明一个交换机,将消息发送到交换机,而交换机将这些消息发送给消费者
///
public static void SendMsg_Publish()
{
Console.WriteLine("发布订阅模式");
//创建链接工厂对象
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "fanlin",
Password = "1234"
};
//创建对象
using (IConnection con = factory.CreateConnection())
{
//创建链接会话对象
using (IModel model = con.CreateModel())
{
//声明一个交换机
model.ExchangeDeclare(exchange: "myExchange", type: "fanout");
//发送消息
while (true)
{
Console.WriteLine("请输入发送消息:");
string msg = Console.ReadLine();
byte[] data = Encoding.UTF8.GetBytes(msg);
model.BasicPublish(
exchange: "myExchange"
, routingKey:""
, basicProperties: null
, body: data
);
}
}
}
}
而消费者需要绑定队列和交换机,这样交换机就能给每个消费者发送消息了。
Receive
///
/// 发布订阅 消费者
///
public static void ReceiveMsg_Publish()
{
//链接工厂
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5672;
factory.UserName = "fanlin";
factory.Password = "1234";
//链接对象
using (var conn = factory.CreateConnection())
{
//回话对象
using (var model = conn.CreateModel())
{
//声明交换机
model.ExchangeDeclare("myExchange", "fanout");
string queueName = "myExchange_" + new Random().Next(1, 5);
Console.WriteLine("发布订阅模式" + queueName);
model.QueueDeclare(queue: queueName
, durable: true
, exclusive: false
, autoDelete: true,
arguments: null);
//绑定交换机和队列
model.QueueBind(queueName, "myExchange", "");
//公平调度模式需要如下配置
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
model.BasicQos(0, 1, false);
//创建消费者对象
var customer = new EventingBasicConsumer(model);
customer.Received += (sender, e) =>
{
byte[] body = e.Body.ToArray();
string msg = Encoding.UTF8.GetString(body);
Console.WriteLine("收到消息:" + msg);
//公平调度
//返回消息确认
model.BasicAck(e.DeliveryTag, true);
};
//开启监听
model.BasicConsume(queue: queueName
, autoAck: false
, consumer: customer);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
}
}
运行如下:
3.2路由模式
路由模式是指交换机将消息绑定到指定的消费者,绑定的消费者也只会收到定义路由绑定的消息。
如图,我先开启了一个info的工作者,开启了一个info的消费者,一个warrning的消费者,这样info 的工作发送的消息只有info消费者才能收到。
而接下来我又开启了一个warrning的生产者,此时warrning生产者发送的消息就可以让warrning的消费者收到了。如图
这就是路由模式的效果。
接下来上完整代码
Send:
///
/// 发布订阅 生产者
/// 发布订阅模式的区别在与不是直接发送消息,而是声明一个交换机,将消息发送到交换机,而交换机将这些消息发送给消费者
///
public static void SendMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, LogLevel logLevel = LogLevel.Null)
{
Console.WriteLine("发布订阅模式");
//创建链接工厂对象
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "fanlin",
Password = "1234"
};
//创建对象
using (IConnection con = factory.CreateConnection())
{
//创建链接会话对象
using (IModel model = con.CreateModel())
{
//声明一个交换机
string exChangeName = "";
if(exchangeType!=ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
model.ExchangeDeclare(exchange: exChangeName, type: exchangeType.ToString(),true);
//发送消息
while (true)
{
Console.WriteLine("请输入发送消息:");
string msg = Console.ReadLine();
byte[] data = Encoding.UTF8.GetBytes(msg);
model.BasicPublish(
exchange: exChangeName
, routingKey: logLevel.ToString()
, basicProperties: null
, body: data
);
}
}
}
}
两个枚举的定义:
public enum ExchangeType
{
fanout = 0,
direct = 1,
topic = 2,
headers = 3
}
public enum LogLevel
{
Null = -1,
info = 0,
warrning = 1,
debug = 2,
error = 3
}
Receive:
///
/// 发布订阅 消费者
///
public static void ReceiveMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, LogLevel logLevel = LogLevel.Null)
{
//链接工厂
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5672;
factory.UserName = "fanlin";
factory.Password = "1234";
//链接对象
using (var conn = factory.CreateConnection())
{
//回话对象
using (var model = conn.CreateModel())
{
//声明一个交换机
string exChangeName = "";
if (exchangeType != ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
//声明交换机
model.ExchangeDeclare(exChangeName, exchangeType.ToString(),true);
string queueName = "myExchange_" + new Random().Next(1, 5);
Console.WriteLine("发布订阅模式" + queueName);
model.QueueDeclare(queue: queueName
, durable: true
, exclusive: false
, autoDelete: true,
arguments: null);
//绑定时可以多个绑定
if (logLevel == LogLevel.all)
{
foreach (string level in Enum.GetValues(typeof(LogLevel)))
{
if (level != "Null" && level != "all")
{
model.QueueBind(queueName, exChangeName, level);
}
}
}
else if (logLevel == LogLevel.Null)
{
//绑定交换机和队列
model.QueueBind(queueName, exChangeName, "");
}
else
{
//绑定交换机和队列
model.QueueBind(queueName, exChangeName, logLevel.ToString());
}
//多劳多得模式需要如下配置
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
model.BasicQos(0, 1, false);
//创建消费者对象
var customer = new EventingBasicConsumer(model);
customer.Received += (sender, e) =>
{
byte[] body = e.Body.ToArray();
string msg = Encoding.UTF8.GetString(body);
Console.WriteLine("收到消息:" + msg);
//多劳多得
//返回消息确认
model.BasicAck(e.DeliveryTag, true);
};
//开启监听
model.BasicConsume(queue: queueName
, autoAck: false
, consumer: customer);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
}
}
3.3主题交换机
主题交换机也可称为通配符,主要是用过#和*来做匹配的。
`*` (星号) 能够替代一个单词。
`#` (井号) 能够替代零个或多个单词。
规则大致如下图
然后我们开启4个生产者,分别有传入不同的参数
然后开启三个消费者,匹配不同的路由
可以看到,不同匹配的消费者会从不同的生产者收到消息,这样比直连交换机更好的就在于可以根据多个条件进行配置,更加容易扩展,也更加灵活。附上完整代码
Send:
///
/// 发布订阅 生产者
/// 发布订阅模式的区别在与不是直接发送消息,而是声明一个交换机,将消息发送到交换机,而交换机将这些消息发送给消费者
///
public static void SendMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, string routingKey = "")
{
Console.WriteLine("发布订阅模式");
//创建链接工厂对象
var factory = new ConnectionFactory()
{
HostName = "localhost",
Port = 5672,
UserName = "fanlin",
Password = "1234"
};
//创建对象
using (IConnection con = factory.CreateConnection())
{
//创建链接会话对象
using (IModel model = con.CreateModel())
{
//声明一个交换机
string exChangeName = "";
if (exchangeType != ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
Console.WriteLine("交换机名称:" + exChangeName + "routingKey:" + routingKey);
model.ExchangeDeclare(exchange: exChangeName, type: exchangeType.ToString(), true);
//发送消息
while (true)
{
Console.WriteLine("请输入发送消息:");
string msg = Console.ReadLine();
byte[] data = Encoding.UTF8.GetBytes(msg);
model.BasicPublish(
exchange: exChangeName
, routingKey: routingKey
, basicProperties: null
, body: data
);
}
}
}
}
Receive:
///
/// 发布订阅 消费者
///
public static void ReceiveMsg_Publish(ExchangeType exchangeType = ExchangeType.fanout, string routingKey = "")
{
//链接工厂
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5672;
factory.UserName = "fanlin";
factory.Password = "1234";
//链接对象
using (var conn = factory.CreateConnection())
{
//回话对象
using (var model = conn.CreateModel())
{
//声明一个交换机
string exChangeName = "";
if (exchangeType != ExchangeType.fanout) exChangeName = "myExchange" + (int)exchangeType;
//声明交换机
model.ExchangeDeclare(exChangeName, exchangeType.ToString(), true);
string queueName = model.QueueDeclare().QueueName;
Console.WriteLine("交换机名称:" + exChangeName + "queueName:"+ queueName + "routingKey:" + routingKey);
//绑定时可以多个绑定
var arrTmp = routingKey.Split(',');
foreach (string key in arrTmp)
{
//绑定交换机和队列
model.QueueBind(queueName, exChangeName, key);
}
//多劳多得模式需要如下配置
//告诉Rabbit每次只能向消费者发送一条信息,再消费者未确认之前,不再向他发送信息
model.BasicQos(0, 1, false);
//创建消费者对象
var customer = new EventingBasicConsumer(model);
customer.Received += (sender, e) =>
{
byte[] body = e.Body.ToArray();
string msg = Encoding.UTF8.GetString(body);
Console.WriteLine("收到消息:" + msg);
//多劳多得
//返回消息确认
model.BasicAck(e.DeliveryTag, true);
};
//开启监听
model.BasicConsume(queue: queueName
, autoAck: false
, consumer: customer);
Console.WriteLine("Press any key to exit");
Console.ReadLine();
}
}
}
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦