和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解RabbitMQ
中间件来完成集成事件的处理。
Nuget安装RabbitMQ.Client
using RabbitMQ.Client;
using System.Text;
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;//兼容消费者异步使用
string exchangeName = "exchange1";//交换机的名字
string eventName = "myEvent";// routingKey的值
using var conn = factory.CreateConnection();//创建一个客户端到RabbitMQ的TCP连接,尽量重复使用
while (true)
{
string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
using (var channel = conn.CreateModel())//创建信道,信道可以关闭,关闭后消息才会发出
{
var properties = channel.CreateBasicProperties;//创建一个空的内容头
properties.DeliveryMode = 2;//1为非持久2为持久
//声明指定名字的交换机,如果已有同名的交换机则不再创建
//type:direct表示交换机会根据消息routingKey的值进行相等性匹配,消息会发布到和它的routingKey绑定的队列
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
byte[] body = Encoding.UTF8.GetBytes(msg);//RabbitMQ的消息只能按照byte[]类型传递
channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
mandatory: true, basicProperties: properties, body: body);//发布消息
}
Console.WriteLine("发布了消息:" + msg);
Thread.Sleep(1000);
}
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";
string eventName = "myEvent";
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();
string queueName = "queue1";
//声明和发送端相同的交换机,如果发送端已经声明了相同的,则这天语句会被忽略
//但是仍然要写,因为不确定是哪端先启动
channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
//声明一个队列来接收交换机转发来的消息,如果已经指定了同名的队列,则自动忽略
//消息从队列中取走后则队列中就没有这个消息了
//如果A、B两程序都想取同一条消息,则需要声明两个不同名字的队列
channel.QueueDeclare(queue: queueName, durable: true,
exclusive: false, autoDelete: false, arguments: null);
//将队列绑定到交换机上,并设定routingKey参数,这样当交换机收到routingKey的值和设定的值相同时
//会把消息转发到我们指定的队列,一个交换机可绑定多个队列,如果这些队列的routingKey的值相同
//那么交换机收到同一个routingKey的时候,会发送给多个队列
channel.QueueBind(queue: queueName,
exchange: exchangeName, routingKey: eventName);
//AsyncEventingBasicConsumer用于从队列中接收消息,当一天消息被接收时,Received事件就会被触发
var consumer = new AsyncEventingBasicConsumer(channel);
//Received是阻塞执行的,也就是一条回调方法执行完成后才会触发下一条Received事件
consumer.Received += Consumer_Received;//增加处理事件
channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);//执行
Console.ReadLine();
async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{
//RabbitMQ支持消息的失败重发
try
{
var bytes = args.Body.ToArray();
string msg = Encoding.UTF8.GetString(bytes);
Console.WriteLine(DateTime.Now + "收到了消息" + msg);
//如果消息处理成功,则调用BasicAck通知队列
//如果消息没有处理成功,则抵用BasicReject通知队列
channel.BasicAck(args.DeliveryTag, multiple: false);
await Task.Delay(800);
}
catch (Exception ex)
{
channel.BasicReject(args.DeliveryTag, true);
Console.WriteLine("处理收到的消息出错" + ex);
}
}
Zack.EventBus
"EventBus": {
"HostName": "127.0.01",//RabbitMQ服务器地址
"ExchangeName": "EventBusDemo1"//交换机的名字
}
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
//第一个参数用来设定程序绑定的队列的名字,一般一个微服务使用一个名字
//但是同一个微服务项目的每个集群实例都要收到消息则不能使用一个名字
//第二个参数为含有监听继承事件的处理者代码的程序集
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
[Route("api/[controller]")]
[ApiController]
public class DemoController : ControllerBase
{
private IEventBus eventBus;
public DemoController(IEventBus eventBus)
{
this.eventBus = eventBus;
}
[HttpPost]
public string Publish()
{
eventBus.Publish("UserAdded", new { UserName = "yzk", Age = 18 });
return "ok";
}
}
IIntegrationEventHandler
接口[EventName("UserAdded")] //设定监听的事件名称,和publish中的名称一致,可以增加多个[EventName]来监听多个事件
public class UserAddesEventHandler : IIntegrationEventHandler
{
private readonly ILogger<UserAddesEventHandler> logger;
public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger)
{
this.logger = logger;
}
//当收到一个事件后,Handle方法就会被调用,第一个参数为事件的名字,第二个是publish设置的数据,事件数据是以JSON格式传入
public Task Handle(string eventName, string eventData)
{
logger.LogInformation("新建了用户:" + eventData);
return Task.CompletedTask;
}
}
JsonIntegrationEventHandler
接口来解析成.net对象public record UserData(string UserName, int Age);
[EventName("UserAdded")]
public class UserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>
{
private readonly ILogger<UserAddesEventHandler3> logger;
public UserAddesEventHandler3(ILogger<UserAddesEventHandler3> logger)
{
this.logger = logger;
}
public override Task HandleJson(string eventName, UserData eventData)
{
logger.LogInformation($"Json:{eventData.UserName}");
return Task.CompletedTask;
}
}
DynamicIntegrationEventHandler
接口来将JSON解析为dynamic类型[EventName("UserAdded")]
public class UserAddesEventHandler2 : DynamicIntegrationEventHandler
{
private readonly ILogger<UserAddesEventHandler2> logger;
public UserAddesEventHandler2(ILogger<UserAddesEventHandler2> logger)
{
this.logger = logger;
}
public override Task HandleDynamic(string eventName, dynamic eventData)
{
logger.LogInformation($"Dynamic:{eventData.UserName}");
return Task.CompletedTask;
}
}