• 在Biwen.QuickApi中整合一个极简的发布订阅(事件总线)


    闲来无聊在我的Biwen.QuickApi中实现一下极简的事件总线,其实代码还是蛮简单的,对于初学者可能有些帮助 就贴出来,有什么不足的地方也欢迎板砖交流~

    首先定义一个事件约定的空接口

        public interface IEvent{}
    

    然后定义事件订阅者接口

    public interface IEventSubscriber<T> where T : IEvent
        {
            Task HandleAsync(T @event, CancellationToken ct);
            /// 
            /// 执行排序
            /// 
            int Order { get; }
    
            /// 
            /// 如果发生错误是否抛出异常,将阻塞后续Handler
            /// 
            bool ThrowIfError { get; }
        }
        public abstract class EventSubscriber<T> : IEventSubscriber<T> where T : IEvent
        {
            public abstract Task HandleAsync(T @event, CancellationToken ct);
            public virtual int Order => 0;
            /// 
            /// 默认不抛出异常
            /// 
            public virtual bool ThrowIfError => false;
        }
    

    接着就是发布者

    
    internal class Publisher(IServiceProvider serviceProvider)
    {
    	public async Task PublishAsync<T>(T @event, CancellationToken ct) where T : IEvent
    	{
    		var handlers = serviceProvider.GetServices>();
    		if (handlers is null) return;
    		foreach (var handler in handlers.OrderBy(x => x.Order))
    		{
    			try
    			{
    				await handler.HandleAsync(@event, ct);
    			}
    			catch
    			{
    				if (handler.ThrowIfError)
    				{
    					throw;
    				}
    				//todo:
    			}
    		}
    	}
    }
    

    到此发布订阅的基本代码也就写完了.接下来就是注册发布者和所有的订阅者了

    核心代码如下:

            static readonly Type InterfaceEventSubscriber = typeof(IEventSubscriber<>);
            static readonly object _lock = new();//锁
            static bool IsToGenericInterface(this Type type, Type baseInterface)
            {
                if (type == null) return false;
                if (baseInterface == null) return false;
                return type.GetInterfaces().Any(x => x.IsGenericType && x.GetGenericTypeDefinition() == baseInterface);
            }
            static IEnumerable _eventHanlers = null!;
            static IEnumerable EventHandlers
            {
                get
                {
                    lock (_lock)
                        return _eventHanlers ??= ASS.InAllRequiredAssemblies.Where(x =>
                        !x.IsAbstract && x.IsPublic && x.IsClass && x.IsToGenericInterface(InterfaceEventSubscriber));
                }
            }
    		    //注册EventSubscribers
                foreach (var subscriberType in EventSubscribers)
                {
                    //存在一个订阅者订阅多个事件的情况:
                    var baseTypes = subscriberType.GetInterfaces().Where(x => x.IsGenericType && x.GetGenericTypeDefinition() == InterfaceEventSubscriber).ToArray();
                    foreach (var baseType in baseTypes)
                    {
                        services.AddScoped(baseType, subscriberType);
                    }
                }
                //注册Publisher
                services.AddScoped();
    		
    

    至此发布订阅的代码也就完成了!
    现在我们将发布订阅封装到QuickApi中使用:

    
    internal interface IPublisher
    {
    	/// 
    	/// Event Publish
    	/// 
    	/// 
    	/// Event
    	/// 
    	Task PublishAsync<T>(T @event, CancellationToken cancellationToken) where T : IEvent;
    }
    
    

    然后BaseQuickApi实现IPublisher接口

    
    internal interface IQuickApi<Req, Rsp> : IHandlerBuilder, IQuickApiMiddlewareHandler, IAntiforgeryApi, IPublisher
    {
        ValueTask<Rsp> ExecuteAsync(Req request);
    }
    
    // BaseQuickApi.PublishAsync
    public virtual async Task PublishAsync<T>(T @event, CancellationToken cancellationToken = default) where T : IEvent
    {
        using var scope = ServiceRegistration.ServiceProvider.CreateScope();
        var publisher = scope.ServiceProvider.GetRequiredService();
        await publisher.PublishAsync(@event, cancellationToken);
    }
    

    至此功能完成,接下来我们测试一下:

    
    using Biwen.QuickApi.Events;
    using Microsoft.AspNetCore.Mvc;
    
    namespace Biwen.QuickApi.DemoWeb.Apis
    {
        public class MyEvent : BaseRequest<MyEvent>,IEvent
        {
            [FromQuery]
            public string? Message { get; set; }
        }
    
        public class MyEventHandler : EventSubscriber<MyEvent>
        {
            private readonly ILogger _logger;
            public MyEventHandler(ILogger logger)
            {
                _logger = logger;
            }
    
            public override Task HandleAsync(MyEvent @event, CancellationToken ct)
            {
                _logger.LogInformation($"msg 2 : {@event.Message}");
                return Task.CompletedTask;
            }
        }
    
        /// 
        /// 更早执行的Handler
        /// 
        public class MyEventHandler2 : EventSubscriber<MyEvent>
        {
            private readonly ILogger _logger;
            public MyEventHandler2(ILogger logger)
            {
                _logger = logger;
            }
    
            public override Task HandleAsync(MyEvent @event, CancellationToken ct)
            {
                _logger.LogInformation($"msg 1 : {@event.Message}");
                return Task.CompletedTask;
            }
    
            public override int Order => -1;
    
        }
    
        /// 
        /// 抛出异常的Handler
        /// 
        public class MyEventHandler3 : EventSubscriber<MyEvent>
        {
            private readonly ILogger _logger;
            public MyEventHandler3(ILogger logger)
            {
                _logger = logger;
            }
    
            public override Task HandleAsync(MyEvent @event, CancellationToken ct)
            {
                throw new Exception("error");
            }
    
            public override int Order => -2;
    
            public override bool ThrowIfError => false;
    
        }
    
        [QuickApi("event")]
        public class EventApi : BaseQuickApi<MyEvent>
        {
            public override async ValueTask ExecuteAsync(MyEvent request)
            {
                //publish
                await PublishAsync(request);
                return IResultResponse.Content("send event");
            }
        }
    }
    
    

    最后我们运行项目测试一下功能:

    curl -X 'GET' \
      'http://localhost:5101/quick/event?Message=hello%20world' \
      -H 'accept: */*'
    

    image

    源代码我发布到了GitHub,欢迎star! https://github.com/vipwan/Biwen.QuickApi

  • 相关阅读:
    WordPress(5)在主题中添加文章字数和预计阅读时间
    Matlab 对连续时间信号的运算
    iphone如何使用itunes恢复数据?
    【读博感悟】关于读博期间科研训练的想法
    【视频加水印】Video Watermark Pro视频添加动态水印(附工具下载地址)
    C++设计模式之---单例模式
    AIR32F103(三) Linux环境基于标准外设库的项目模板
    深度时空残差网络在城市人流量预测中的应用
    JVM理解(二)
    OpenCV中world模块介绍
  • 原文地址:https://www.cnblogs.com/vipwan/p/18184088/biwen-quickapi-events