分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。
例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。
RabbitMQ
RabbitMQ
是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ
通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。
本文将讲解使用RabbitMQ实现分布式事件
实现我们创建一个EventsBus.Contract
的类库项目,用于提供基本接口,以支持其他实现
在项目中添加以下依赖引用,并且记得添加EventsBus.Contract
项目引用
- <ItemGroup>
- <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
- <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
- <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
- <PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
- </ItemGroup>
创建项目完成以后分别创建EventsBusOptions.cs
,IEventsBusHandle.cs
,RabbitMQEventsManage.cs
,ILoadEventBus.cs
,提供我们的分布式事件基本接口定义
EventsBusOptions.cs
:
- namespace EventsBus.Contract;
-
- public class EventsBusOptions
- {
- /// <summary>
- /// 接收时异常事件
- /// </summary>
- public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
- }
IEventsBusHandle.cs
:
- namespace EventsBus.Contract;
-
- public interface IEventsBusHandle<in TEto> where TEto : class
- {
- Task HandleAsync(TEto eventData);
- }
ILoadEventBus.cs
:
- namespace EventsBus.Contract;
-
- public interface ILoadEventBus
- {
- /// <summary>
- /// 发布事件
- /// </summary>
- /// <param name="eto"></param>
- /// <typeparam name="TEto"></typeparam>
- /// <returns></returns>
- Task PushAsync<TEto>(TEto eto) where TEto : class;
- }
EventsBusAttribute.cs
:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ
的通道
- namespace EventsBus.RabbitMQ;
-
- [AttributeUsage(AttributeTargets.Class)]
- public class EventsBusAttribute : Attribute
- {
- public readonly string Name;
-
- public EventsBusAttribute(string name)
- {
- Name = name;
- }
- }
然后可以创建我们的RabbitMQ
实现了,创建EventsBus.RabbitMQ
类库项目,用于编写EventsBus.Contract
的RabbitMQ
实现
创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs
,Options\RabbitMQOptions.cs
,EventsBusAttribute.cs
,,RabbitMQFactory.cs
,RabbitMQLoadEventBus.cs
Extensions\EventsBusRabbitMQExtensions.cs
:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection
,这样就在注入的时候减少过度使用命名空间了
- using EventsBus.Contract;
- using EventsBus.RabbitMQ;
- using EventsBus.RabbitMQ.Options;
- using Microsoft.Extensions.Configuration;
-
- namespace Microsoft.Extensions.DependencyInjection;
-
- public static class EventsBusRabbitMQExtensions
- {
- public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
- IConfiguration configuration)
- {
- services.AddSingleton<RabbitMQFactory>();
- services.AddSingleton(typeof(RabbitMQEventsManage<>));
- services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
- services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
-
- return services;
- }
- }
Options\RabbitMQOptions.cs
:提供基本的Options
读取配置文件中并且注入,services.Configure
的方法是读取IConfiguration
的名称为RabbitMQOptions
的配置东西,映射到Options中,具体使用往下看。
- using RabbitMQ.Client;
-
- namespace EventsBus.RabbitMQ.Options;
-
- public class RabbitMQOptions
- {
- /// <summary>
- /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
- /// 指示应使用的协议的缺省值。
- /// </summary>
- public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;
-
- /// <summary>
- /// 地址
- /// </summary>
- public string HostName { get; set; }
-
- /// <summary>
- /// 账号
- /// </summary>
- public string UserName { get; set; }
-
- /// <summary>
- /// 密码
- /// </summary>
- public string Password { get; set; }
- }
RabbitMQEventsManage.cs
:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序
- using System.Reflection;
- using System.Text.Json;
- using EventsBus.Contract;
- using Microsoft.Extensions.DependencyInjection;
- using RabbitMQ.Client;
- using RabbitMQ.Client.Events;
-
- namespace EventsBus.RabbitMQ;
-
- public class RabbitMQEventsManage<TEto> where TEto : class
- {
- private readonly IServiceProvider _serviceProvider;
- private readonly RabbitMQFactory _rabbitMqFactory;
-
- public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
- {
- _serviceProvider = serviceProvider;
- _rabbitMqFactory = rabbitMqFactory;
- _ = Task.Run(Start);
- }
-
- private void Start()
- {
- var channel = _rabbitMqFactory.CreateRabbitMQ();
- var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
- var name = eventBus?.Name ?? typeof(TEto).Name;
- channel.QueueDeclare(name, false, false, false, null);
- var consumer = new EventingBasicConsumer(channel); //消费者
- channel.BasicConsume(name, true, consumer); //消费消息
- consumer.Received += async (model, ea) =>
- {
- var bytes = ea.Body.ToArray();
- try
- {
- // 这样就可以实现多个订阅
- var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
- foreach (var handle in events)
- {
- await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
- }
- }
- catch (Exception e)
- {
- EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
- }
- };
- }
- }
RabbitMQFactory.cs
:提供RabbitMQ
链接工厂,在这里你可以自己去定义和管理RabbitMQ
工厂
- using EventsBus.RabbitMQ.Options;
- using Microsoft.Extensions.Options;
- using RabbitMQ.Client;
-
- namespace EventsBus.RabbitMQ;
-
- public class RabbitMQFactory : IDisposable
- {
- private readonly RabbitMQOptions _options;
- private readonly ConnectionFactory _factory;
- private IConnection? _connection;
-
- public RabbitMQFactory(IOptions<RabbitMQOptions> options)
- {
- _options = options?.Value;
- // 将Options中的参数添加到ConnectionFactory
- _factory = new ConnectionFactory
- {
- HostName = _options.HostName,
- UserName = _options.UserName,
- Password = _options.Password,
- Port = _options.Port
- };
- }
-
- public IModel CreateRabbitMQ()
- {
- // 当第一次创建RabbitMQ的时候进行链接
- _connection ??= _factory.CreateConnection();
-
- return _connection.CreateModel();
- }
-
- public void Dispose()
- {
- _connection?.Dispose();
- }
- }
RabbitMQLoadEventBus.cs
:用于实现ILoadEventBus.cs
通过ILoadEventBus
发布事件RabbitMQLoadEventBus.cs
是RabbitMQ的实现
- using System.Reflection;
- using System.Text.Json;
- using EventsBus.Contract;
- using Microsoft.Extensions.DependencyInjection;
-
- namespace EventsBus.RabbitMQ;
-
- public class RabbitMQLoadEventBus : ILoadEventBus
- {
- private readonly IServiceProvider _serviceProvider;
- private readonly RabbitMQFactory _rabbitMqFactory;
-
- public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
- {
- _serviceProvider = serviceProvider;
- _rabbitMqFactory = rabbitMqFactory;
- }
-
- public async Task PushAsync<TEto>(TEto eto) where TEto : class
- {
-
- //创建一个通道
- //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
- using var channel = _rabbitMqFactory.CreateRabbitMQ();
-
- // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称
- var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
- var name = eventBus?.Name ?? typeof(TEto).Name;
-
- // 使用获取的名称创建一个通道
- channel.QueueDeclare(name, false, false, false, null);
- var properties = channel.CreateBasicProperties();
- properties.DeliveryMode = 1;
- // 将数据序列号,然后发布
- channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息
- // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,
- var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
-
- await Task.CompletedTask;
- }
- }
在这里我们的RabbitMQ
分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;
然后我们需要使用RabbitMQ分布式事件总线工具包