• .Net Core `RabbitMQ`封装


    分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

    例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

    然后了解一下RabbitMQ

    RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

    1. RabbitMQ的主要功能包括:
      1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
      2. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
      3. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
      4. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
      5. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

    本文将讲解使用RabbitMQ实现分布式事件

    实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

    在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

    1. <ItemGroup>
    2. <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
    3. <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
    4. <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
    5. <PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
    6. </ItemGroup>

    创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

    EventsBusOptions.cs

    1. namespace EventsBus.Contract;
    2. public class EventsBusOptions
    3. {
    4. /// <summary>
    5. /// 接收时异常事件
    6. /// </summary>
    7. public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
    8. }

    IEventsBusHandle.cs

    1. namespace EventsBus.Contract;
    2. public interface IEventsBusHandle<in TEto> where TEto : class
    3. {
    4. Task HandleAsync(TEto eventData);
    5. }

    ILoadEventBus.cs

    1. namespace EventsBus.Contract;
    2. public interface ILoadEventBus
    3. {
    4. /// <summary>
    5. /// 发布事件
    6. /// </summary>
    7. /// <param name="eto"></param>
    8. /// <typeparam name="TEto"></typeparam>
    9. /// <returns></returns>
    10. Task PushAsync<TEto>(TEto eto) where TEto : class;
    11. }

    EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

    1. namespace EventsBus.RabbitMQ;
    2. [AttributeUsage(AttributeTargets.Class)]
    3. public class EventsBusAttribute : Attribute
    4. {
    5. public readonly string Name;
    6. public EventsBusAttribute(string name)
    7. {
    8. Name = name;
    9. }
    10. }

    然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.ContractRabbitMQ实现

    创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

    Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

    1. using EventsBus.Contract;
    2. using EventsBus.RabbitMQ;
    3. using EventsBus.RabbitMQ.Options;
    4. using Microsoft.Extensions.Configuration;
    5. namespace Microsoft.Extensions.DependencyInjection;
    6. public static class EventsBusRabbitMQExtensions
    7. {
    8. public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
    9. IConfiguration configuration)
    10. {
    11. services.AddSingleton<RabbitMQFactory>();
    12. services.AddSingleton(typeof(RabbitMQEventsManage<>));
    13. services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
    14. services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
    15. return services;
    16. }
    17. }

    Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

    1. using RabbitMQ.Client;
    2. namespace EventsBus.RabbitMQ.Options;
    3. public class RabbitMQOptions
    4. {
    5. /// <summary>
    6. /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
    7. /// 指示应使用的协议的缺省值。
    8. /// </summary>
    9. public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;
    10. /// <summary>
    11. /// 地址
    12. /// </summary>
    13. public string HostName { get; set; }
    14. /// <summary>
    15. /// 账号
    16. /// </summary>
    17. public string UserName { get; set; }
    18. /// <summary>
    19. /// 密码
    20. /// </summary>
    21. public string Password { get; set; }
    22. }

    RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

    1. using System.Reflection;
    2. using System.Text.Json;
    3. using EventsBus.Contract;
    4. using Microsoft.Extensions.DependencyInjection;
    5. using RabbitMQ.Client;
    6. using RabbitMQ.Client.Events;
    7. namespace EventsBus.RabbitMQ;
    8. public class RabbitMQEventsManage<TEto> where TEto : class
    9. {
    10. private readonly IServiceProvider _serviceProvider;
    11. private readonly RabbitMQFactory _rabbitMqFactory;
    12. public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    13. {
    14. _serviceProvider = serviceProvider;
    15. _rabbitMqFactory = rabbitMqFactory;
    16. _ = Task.Run(Start);
    17. }
    18. private void Start()
    19. {
    20. var channel = _rabbitMqFactory.CreateRabbitMQ();
    21. var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
    22. var name = eventBus?.Name ?? typeof(TEto).Name;
    23. channel.QueueDeclare(name, false, false, false, null);
    24. var consumer = new EventingBasicConsumer(channel); //消费者
    25. channel.BasicConsume(name, true, consumer); //消费消息
    26. consumer.Received += async (model, ea) =>
    27. {
    28. var bytes = ea.Body.ToArray();
    29. try
    30. {
    31. // 这样就可以实现多个订阅
    32. var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
    33. foreach (var handle in events)
    34. {
    35. await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
    36. }
    37. }
    38. catch (Exception e)
    39. {
    40. EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
    41. }
    42. };
    43. }
    44. }

    RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

    1. using EventsBus.RabbitMQ.Options;
    2. using Microsoft.Extensions.Options;
    3. using RabbitMQ.Client;
    4. namespace EventsBus.RabbitMQ;
    5. public class RabbitMQFactory : IDisposable
    6. {
    7. private readonly RabbitMQOptions _options;
    8. private readonly ConnectionFactory _factory;
    9. private IConnection? _connection;
    10. public RabbitMQFactory(IOptions<RabbitMQOptions> options)
    11. {
    12. _options = options?.Value;
    13. //Options中的参数添加到ConnectionFactory
    14. _factory = new ConnectionFactory
    15. {
    16. HostName = _options.HostName,
    17. UserName = _options.UserName,
    18. Password = _options.Password,
    19. Port = _options.Port
    20. };
    21. }
    22. public IModel CreateRabbitMQ()
    23. {
    24. // 当第一次创建RabbitMQ的时候进行链接
    25. _connection ??= _factory.CreateConnection();
    26. return _connection.CreateModel();
    27. }
    28. public void Dispose()
    29. {
    30. _connection?.Dispose();
    31. }
    32. }

    RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

    1. using System.Reflection;
    2. using System.Text.Json;
    3. using EventsBus.Contract;
    4. using Microsoft.Extensions.DependencyInjection;
    5. namespace EventsBus.RabbitMQ;
    6. public class RabbitMQLoadEventBus : ILoadEventBus
    7. {
    8. private readonly IServiceProvider _serviceProvider;
    9. private readonly RabbitMQFactory _rabbitMqFactory;
    10. public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    11. {
    12. _serviceProvider = serviceProvider;
    13. _rabbitMqFactory = rabbitMqFactory;
    14. }
    15. public async Task PushAsync<TEto>(TEto eto) where TEto : class
    16. {
    17. //创建一个通道
    18. //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
    19. using var channel = _rabbitMqFactory.CreateRabbitMQ();
    20. // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称
    21. var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
    22. var name = eventBus?.Name ?? typeof(TEto).Name;
    23. // 使用获取的名称创建一个通道
    24. channel.QueueDeclare(name, false, false, false, null);
    25. var properties = channel.CreateBasicProperties();
    26. properties.DeliveryMode = 1;
    27. // 将数据序列号,然后发布
    28. channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息
    29. // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,
    30. var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
    31. await Task.CompletedTask;
    32. }
    33. }

    在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

    然后我们需要使用RabbitMQ分布式事件总线工具包

  • 相关阅读:
    计算机硬件的读写速度差异
    水滴邮件营销:让企业营销更简单
    【博主推荐】SpringBoot API接口对数据库增删改查,路由,TOKEN,WebSocket完整版(附源码)
    【Python机器学习】单变量非线性变换
    使用element-ui+sortablejs实现表格的拖拽排序
    电脑文件夹加密怎么做?6步教你设置文件夹密码
    MySQL进阶-存储引擎
    二进制格式mysql安装和密码破解
    C++面向对象Day02:C++ 类 & 对象
    【从零开始一步步学习VSOA开发】同步RPC客户端
  • 原文地址:https://blog.csdn.net/u010918911/article/details/130898197