引入程序集:RabbitMQ.Client
生产者
///
/// ProducerWrites 写入消息 ConsumerConsumption 消费消息
///
public class ProducerWrites
{
public static void Send()
{
string path = AppDomain.CurrentDomain.BaseDirectory;
string tag = path.Split('/', '\\').Last(s => !string.IsNullOrEmpty(s));
Console.WriteLine($"这里是 {tag} 启动了。。");
//创建链接:通过一个connection工厂来创建链接
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "localhost";//RabbitMQ服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
//factory.VirtualHost = "/Richard";
//创建链接
using (IConnection connection = factory.CreateConnection())
{
//创建一个信道;
using (IModel channel = connection.CreateModel())
{
//删除队列
channel.QueueDelete("ProducerWrites");
//删除交换机
channel.ExchangeDelete("ProducerWritesExChange");
//创建队列
channel.QueueDeclare(queue: "ProducerWrites", durable: true, exclusive: false, autoDelete: false, arguments: null);
//创建交换机
channel.ExchangeDeclare(exchange: "ProducerWritesExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
//交换机和队列绑定
channel.QueueBind(queue: "ProducerWrites", exchange: "ProducerWritesExChange", routingKey: "advanced", arguments: null);
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"生产者{tag}已准备就绪~~~");
{
for (int i = 0; i < 20; i++)
{
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;
//basicProperties.DeliveryMode = 2;
string message = $"{tag}:大家伙欢迎大家来到.NET高级班的VIP课程_{i+1}";
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "ProducerWritesExChange",
routingKey: "advanced",
basicProperties: basicProperties,
body: body);
Console.WriteLine($"{message} 已发送~");
Thread.Sleep(500);
}
while (true)
{
IBasicProperties basicProperties = channel.CreateBasicProperties();
basicProperties.Persistent = true;
//basicProperties.DeliveryMode = 2;
Console.WriteLine("请输入消息内容:");
string message =Console.ReadLine();
byte[] body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "ProducerWritesExChange",
routingKey: "advanced",
basicProperties: basicProperties,
body: body);
Console.WriteLine($"{message} 已发送~");
Thread.Sleep(500);
}
}
}
}
}
}
消费者
public class ConsumerConsumption
{
///
/// ProducerWrites 写入消息 ConsumerConsumption 消费消息
///
public static void Consumption()
{
string path = AppDomain.CurrentDomain.BaseDirectory;
string tag = path.Split('/', '\\').Last(s => !string.IsNullOrEmpty(s));
Console.WriteLine($"这里是 {tag} 启动了。。");
var factory = new ConnectionFactory();
factory.HostName = "localhost";//RabbitMQ服务在本地运行
factory.UserName = "guest";//用户名
factory.Password = "guest";//密码
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
Console.ForegroundColor = ConsoleColor.Green;
try
{
channel.QueueDeclare(queue: "ProducerWrites", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(exchange: "ProducerWritesExChange", type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null);
channel.QueueBind(queue: "ProducerWrites", exchange: "ProducerWritesExChange", routingKey: "advanced", arguments: null);
//rabbitMq消费消息是通过事件驱动的:
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => //如果有消息进入到Rabbitmq,就会触发这个事件来完成消息的消费;
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"{tag}接受消息: {message}");
};
channel.BasicConsume(queue: "ProducerWrites",
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
}
}
}