跨进程事件总线允许发布和订阅跨服务传输的消息, 服务的发布与订阅不在同一个进程中
在Masa Framework中, 跨进程总线事件提供了一个可以被开箱即用的程序
跨进程事件与Dapr
并不是强绑定的, Masa Framework使用了Dapr
提供的pub/sub的能力, 如果你不想使用它, 你也可以更换为其它实现, 但目前Masa Framwork中仅提供了Dapr
的实现
Assignment.IntegrationEventBus
,并安装Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
、Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
、Masa.Contrib.Data.EFCore.Sqlite
、Masa.Contrib.Data.UoW.EFCore
、Masa.Contrib.Development.DaprStarter.AspNetCore
、Microsoft.EntityFrameworkCore.Design
dotnet new web -o Assignment.IntegrationEventBus
cd Assignment.IntegrationEventBus
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.Dapr --version 0.7.0-preview.8 // 使用dapr提供的pubsub能力
dotnet add package Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore --version 0.7.0-preview.8 //本地消息表
dotnet add package Masa.Contrib.Data.EFCore.Sqlite --version 0.7.0-preview.8 //使用EfCore.Sqlite
dotnet add package Masa.Contrib.Data.UoW.EFCore --version 0.7.0-preview.8 //使用工作单元
dotnet add package Masa.Contrib.Development.DaprStarter.AspNetCore --version 0.7.0-preview.8 //开发环境使用DaprStarter协助管理Dapr Sidecar
dotnet add package Microsoft.EntityFrameworkCore.Design --version 6.0.6 //方便后续通过CodeFirst迁移数据库
UserDbContext
,并继承MasaDbContext
public class UserDbContext : MasaDbContext
{
public UserDbContext(MasaDbContextOptions options) : base(options)
{
}
}
DaprStarter
, 协助管理Dapr Sidecar
, 修改Program.cs
if (builder.Environment.IsDevelopment())
{
builder.Services.AddDaprStarter();
}
通过
Dapr
发布集成事件需要运行Dapr
, 线上环境可通过Kubernetes
来运行, 开发环境可借助Dapr Starter运行Dapr
, 因此仅需要在开发环境使用它
Program
builder.Services.AddIntegrationEventBus(option =>
{
option.UseDapr()
.UseEventLog()
.UseUoW(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
});
var app = builder.Build();
#region dapr 订阅集成事件使用
app.UseRouting();
app.UseCloudEvents();
app.UseEndpoints(endpoints =>
{
endpoints.MapSubscribeHandler();
});
#endregion
RegisterUserEvent
public record RegisterUserEvent : IntegrationEvent
{
public override string Topic { get; set; } = nameof(RegisterUserEvent);
public string Account { get; set; }
public string Mobile { get; set; }
}
Assignment.IntegrationEventBus
所在文件夹,打开cmd或Powershell执行dotnet ef migrations add init //创建迁移
dotnet ef database update //更新数据库
Program
app.MapPost("/register", async (IIntegrationEventBus eventBus) =>
{
//todo: 模拟注册用户并发布注册用户事件
await eventBus.PublishAsync(new RegisterUserEvent()
{
Account = "Tom",
Mobile = "19999999999"
});
});
Program
app.MapPost("/IntegrationEvent/RegisterUser", [Topic("pubsub", nameof(RegisterUserEvent))](RegisterUserEvent @event) =>
{
Console.WriteLine($"注册用户成功: {@event.Account}");
});
订阅事件暂时未抽象,目前使用的是
Dapr
原生的订阅方式,后续我们会支持Bind,届时不会由于更换pubsub的实现而导致订阅方式的改变
尽管跨进程事件目前仅支持了Dapr
,但这不代表你与RabbitMq
、Kafka
等无缘,发布/订阅是Dapr
抽象出的能力,实现发布订阅的组件有很多种,RabbitMq
、Kafka
是其中一种实现,如果你想深入了解他们之间的关系,可以参考:
首先我们先要知道的基础知识点:
RowVersion
赋值)提供了集成事件接口的实现类, 并支持了发件箱模式, 其中:
在Masa.Contrib.Dispatcher.IntegrationEvents
中仅提供了发件箱的功能, 但集成事件的发布是由 IPublisher
的实现类来提供, 由Db获取本地消息表的功能是由IIntegrationEventLogService
的实现类来提供, 它们分别属于Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
、Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
的功能, 这也是为什么使用集成事件需要引用包
Masa.Contrib.Dispatcher.IntegrationEvents
Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
那会有小伙伴问了, 我现在没有使用Dapr
, 未来一段时间暂时也还不希望接入Dapr
, 我想自己接入, 以实现集成事件的发布可以吗?
当然是可以的, 如果你希望自行实现集成事件, 那么这个时候你会遇到两种情况
以社区用的较多的库CAP为例, 由于它本身已经完成了发件箱模式, 我们不需要再处理本地消息表, 也无需考虑本地消息记录的管理, 那我们可以这样做
Masa.Contrib.Dispatcher.IntegrationEvents.Cap
, 添加Masa.BuildingBlocks.Dispatcher.IntegrationEvents
的引用, 并安装DotNetCore.CAP
dotnet add package DotNetCore.CAP
IntegrationEventBus
, 并实现IIntegrationEventBus
public class IntegrationEventBus : IIntegrationEventBus
{
private readonly ICapPublisher _publisher;
private readonly ICapTransaction _capTransaction;
private readonly IUnitOfWork? _unitOfWork;
public IntegrationEventBus(ICapPublisher publisher, ICapTransaction capTransaction, IUnitOfWork? unitOfWork = null)
{
_publisher = publisher;
_capTransaction = capTransaction;
_unitOfWork = unitOfWork;
}
public Task PublishAsync(TEvent @event) where TEvent : IEvent
{
// 如果使用事务
// _publisher.Transaction.Value.DbTransaction = unitOfWork.Transaction;
// _publisher.Publish(@event.Topic, @event);
throw new NotImplementedException();
}
public IEnumerable GetAllEventTypes()
{
throw new NotImplementedException();
}
public Task CommitAsync(CancellationToken cancellationToken = default)
{
throw new NotImplementedException();
}
}
CAP已支持本地事务, 使用当前
IUnitOfWork
提供的事务, 确保数据的原子性
ServiceCollectionExtensions
, 将自定义Publisher
注册到服务集合public static class ServiceCollectionExtensions
{
public static DispatcherOptions UseRabbitMq(this IServiceCollection services)
{
//todo: 注册RabbitMq信息
services.TryAddScoped();
return dispatcherOptions;
}
}
已经实现发件箱模式的可以直接使用, 而不需要引用
Masa.Contrib.Dispatcher.IntegrationEvents
Masa.Contrib.Dispatcher.IntegrationEvents.Dapr
Masa.Contrib.Dispatcher.IntegrationEvents.EventLogs.EFCore
以上未经过实际验证, 感兴趣的可以尝试下, 欢迎随时提
pr
我希望直接接入RabbitMq
, 但我自己没有做发件箱模式, 那我可以怎么做呢?
由于Masa.Contrib.Dispatcher.IntegrationEvents
已提供发件箱模式, 如果仅仅希望更换一个发布事件的实现者, 那我们仅需要实现IPublisher
即可
Masa.Contrib.Dispatcher.IntegrationEvents.RabbitMq
, 添加Masa.Contrib.Dispatcher.IntegrationEvents
项目引用, 并安装RabbitMQ.Client
dotnet add package RabbitMQ.Client //使用RabbitMq
Publisher
,并实现IPublisher
public class Publisher : IPublisher
{
public async Task PublishAsync(string topicName, T @event, CancellationToken stoppingToken = default) where T : IIntegrationEvent
{
//todo: 通过 RabbitMQ.Client 发送消息到RabbitMq
throw new NotImplementedException();
}
}
DispatcherOptionsExtensions
, 将自定义Publisher
注册到服务集合public static class DispatcherOptionsExtensions
{
public static DispatcherOptions UseRabbitMq(this Masa.Contrib.Dispatcher.IntegrationEvents.Options.DispatcherOptions options)
{
//todo: 注册RabbitMq信息
dispatcherOptions.Services.TryAddSingleton();
return dispatcherOptions;
}
}
RabbitMq
builder.Services.AddIntegrationEventBus(option =>
{
option.UseRabbitMq();//修改为使用RabbitMq
option.UseUoW(optionBuilder => optionBuilder.UseSqlite($"Data Source=./Db/{Guid.NewGuid():N}.db;"));
option.UseEventLog();
});
Assignment12
https://github.com/zhenlei520/MasaFramework.Practice
MASA.Framework:https://github.com/masastack/MASA.Framework
如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们