aspnetcore微服务种服务之间的通信一般都有用到消息中间件,如何确保该服务的持久层保存创建的数据同时又把消息成功投递到了关联服务,关联服务做对应的处理。
下面就以一个简单的例子来演示实现方式之一,即发件箱模式。
下面解决方案有两个服务,做演示用的比较简单,一个是订单服务,一个是账单服务。完成订单的同时把订单信息通过本例的rabbitmq发送到billapi服务中去。
首先trading服务有一个领域内事件接收器
public abstract class IEntity { private int id; public virtual int Id { get { return id; } protected set { id = value; } } private List_domainEvents; public IReadOnlyCollection DomainEvents => _domainEvents?.AsReadOnly(); public void AddDomainEvent(IEvent eventItem) { _domainEvents = _domainEvents ?? new List (); _domainEvents.Add(eventItem); } public void RemoveDomainEvent(IEvent eventItem) { _domainEvents?.Remove(eventItem); } public void ClearDomainEvents() { _domainEvents?.Clear(); } }
public class CreateOrderEvent:IEvent { public Guid EventId { get; set; } public int CustomerId { get; set; } public CreateOrderEvent(Guid EventId,int customerId) { this.EventId = EventId; CustomerId = customerId; } }
我把事件简化到实体类里面,也可以不需要这个IEntity,那每次都需要自己创建order的同时创建一个事件,当然事件集合需要自己定义存起来。
发件箱顾名思义就是所有邮件定时定期的投递到邮箱中,定时定期的取出来往需要的地方去投递。
这里的邮件就是事件了,而投递就是事件发布。
这个实例的事件放到实体类种有领域的味道,因为在一个领域order内可以把关联的事件都放一起。下面代码就是借助efcore的拦截器来统一在savechange的地方来把事件写到数据库中去。
我新建一个order,同时把发布的order事件存到数据,这就是发件箱模式。好多数据库和中间件操作的最终一致性大体都是这个模式,借助数据库的分布式事务。
public sealed class OutBoxMessageInterceptor:SaveChangesInterceptor { public override ValueTaskint>> SavingChangesAsync(DbContextEventData eventData, InterceptionResult<int> result, CancellationToken cancellationToken = default) { DbContext? dbContxt = eventData.Context; if (dbContxt is null) { return base.SavingChangesAsync(eventData, result, cancellationToken); } var events = dbContxt.ChangeTracker.Entries ().Select(x => x.Entity).SelectMany(x => { List entities = new List (); foreach (var item in x.DomainEvents) { if(!(item is null)) entities.Add(item); } x.ClearDomainEvents(); return entities; }).Select(x => new OutBoxMessage { Id = Guid.NewGuid(), OccurredOnUtc = DateTime.UtcNow, Type = x.GetType().Name, Content = System.Text.Json.JsonSerializer.Serialize((CreateOrderEvent)x) }).ToList(); if(events!=null && events.Any()) dbContxt.Set ().AddRangeAsync(events); return base.SavingChangesAsync(eventData, result, cancellationToken); } }
数据库拦截器注入的代码少不了,写是写进去了,下面就是怎么去往另外的服务的发布呢?
builder.Services.AddDbContext((sp,ops) => { ops.UseSqlServer("Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=Traing;Integrated Security=True;Connect Timeout=30;Encrypt=False;Trust Server Certificate=False;Application Intent=ReadWrite;Multi Subnet Failover=False"); var interceptor = sp.GetService (); ops.AddInterceptors(interceptor); }, ServiceLifetime.Scoped);
这里就是后台任务去取数据做处理了
public class OutBoxMessageBackgroundService : BackgroundService { private readonly IServiceProvider _serviceProvider; private readonly IRabbitMQEventBus _publisher; public OutBoxMessageBackgroundService(IServiceProvider serviceProvider, IRabbitMQEventBus publisher) { _serviceProvider = serviceProvider; _publisher = publisher; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { using var scope = _serviceProvider.CreateScope(); var _orderingContext = scope.ServiceProvider.GetService(); var messages = await _orderingContext.Set ().Where(m => m.ProceddedOnUtc == null) .Take(10).ToListAsync(stoppingToken); foreach (var message in messages) { if (string.IsNullOrEmpty(message.Content)) continue; var retries = 3; var retry = Policy.Handle () .WaitAndRetry( retries, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (exception, timeSpan, retry, ctx) => { Console.WriteLine($"发布时间失败:{message}"); }); retry.Execute(() => _publisher.Publish(new { Content=message.Content,Id = message.Id }, exchange: "RabbitMQ.EventBus.Simple", routingKey: "rabbitmq.eventbus.test")); message.ProceddedOnUtc = DateTime.UtcNow; } await _orderingContext.SaveChangesAsync(stoppingToken); } }
就是这么简单,tradinfgapi的任务就这么愉快地完成了,这里保证了数据库写数据和发布事件出去最终是同步的,即使服务出问题重启也一样能完成任务。
下面就是接受事件的billapi的服务了,因为上面代码用来重试机制,而且其他情况也比面不了事件重复发送,下面就简单的处理下订阅事件的幂等性。
public class IDomainEvent : IEvent { public Guid Id { get; set; } public string Content { get; set; } } public class IdempotentDomainEventHandler : IEventResponseHandlerint>,IDisposable { private readonly IServiceProvider _serviceProvider; public IdempotentDomainEventHandler(IServiceProvider serviceProvider) { _serviceProvider = serviceProvider; } public void Dispose() { Console.WriteLine("MessageBodyHandle Disposable."); } public async Task<int> HandleAsync(HandlerEventArgs args) { using var scope = _serviceProvider.CreateScope(); BillingDbContext _context = scope.ServiceProvider.GetService (); string consumer = args.GetType().Name; if (await _context.Set ().AnyAsync(o => o.Guid == args.EventObject.Id && o.Name==consumer)) { return default; } Console.WriteLine($"等待处理的消息{args.EventObject.Content}"); CreateOrderEvent createOrderEvent = System.Text.Json.JsonSerializer.Deserialize (args.EventObject.Content); await _context.BillingRecords.AddAsync(new BillingRecord { CreateTime=DateTime.UtcNow, OrderEventId=createOrderEvent.EventId}); Console.WriteLine($"处理的消息完毕"); _context.Set ().Add(new OutboxMessageConsumer { Guid = args.EventObject.Id, Name = consumer }); return await _context.SaveChangesAsync(); } } public class OutboxMessageConsumer { public int Id { get; set; } public Guid Guid { get; set; } public string Name { get; set; } }
////// 来自tradingapi的数据 /// public class CreateOrderEvent { public Guid EventId { get; set; } public int CustomerId { get; set; } }
同样是把事件处理后写入到数据库,每次进来去数据库看看有没有,就这么简单的完成了事件订阅的重复处理。
下面运行一下程序看看效果,创建order前billingrecord是没有记录的。
这里出现了一个喜闻乐见的事情,trading服务已经发布了事件,billing服务没收到,可能是rabbitmq卡住了,不过没关系,因为有这个发件箱模式可以重启下服务,这个时间丢不了。
重启了下服务就消费掉了这条数据。
至于重复消费的测试就省了,有需要自己下载源码去测试