• 简单写一个eventbus


    前言

    闲暇之余,简单写一个eventbus。

    正文

    什么是eventbus?

    eventbus 是一个开源的发布订阅模式的框架,用于简化程序间不同组件的通信。
    它允许不同组件间松耦合通信,组件之间不通过直接引用的方式,而是事件的方式进行消息传递。

    下面进行代码演示:

    首先是发布订阅,那么就应该有发布方法和订阅方法,因为是消息传递,那么就应该还有启动消费消息的方法。

    public interface IEventBus : IDisposable
    {
        Task Publish(T @event) where T : IntegrationEvent;
    
        Task Subscribe(IIntegrationEventHandler handler)
            where T : IntegrationEvent;
    
        Task StartConsume();
    }
    

    大体我们要实现上面的功能。

    然后我们可以定义事件的基础信息:

    public class IntegrationEvent
    {
        public Guid Id { get; set; }
    
        public DateTime OccurredOn { get; set; }
    
        public IntegrationEvent()
        {
            Id = Guid.NewGuid();
            OccurredOn = DateTime.Now;
        }
    }
    

    比如说要有唯一的id,同时要有事件发生的时间。

    订阅的话,那么需要指定处理的对象。

    public interface IIntegrationEventHandler
    {
    }
    
    public interface IIntegrationEventHandler :
        IIntegrationEventHandler where TIntegrationEvent : IntegrationEvent
    {
        Task Handler(TIntegrationEvent @event);
    }
    

    处理对象设计也很简单,就是需要创建一个有能够处理IntegrationEvent的对象即可。

    这里很多人会疑惑,为什么很多框架的泛型接口类,往往会创建一个非泛型的接口。

    这个其实是为了进一步抽象,方便做集合处理,下面将会介绍到。

    然后就可以写一个内存型的eventbus。

    public class InMemoryEventBus : IDisposable
    {
        private Dictionary>
            _dictionary = new Dictionary>();
    
        public async Task Publish(T @event) where T : IntegrationEvent
        {
            var fullName = @event.GetType().FullName;
            if (fullName == null)
            {
                return;
            }
    
            var handlers = _dictionary[fullName];
    
            foreach (var integrationEventHandler in handlers)
            {
                if (integrationEventHandler is IIntegrationEventHandler handler)
                {
                    await handler.Handler(@event);
                }
            }
        }
    
        public async Task Subscribe(IIntegrationEventHandler handler)
            where T : IntegrationEvent
        {
            var fullname = typeof(T).FullName;
            if (fullname == null)
            {
                return;
            }
    
            if (_dictionary.ContainsKey(fullname))
            {
                var handlers = _dictionary[fullname];
                handlers.Add(handler);
            }
            else
            {
                _dictionary.Add(fullname, new List()
                {
                    handler
                });
            }
        }
    
        public void Dispose()
        {
            // 移除相关连接等
        }
    }
    

    里面实现了eventbus的基本功能。可以看到上面的_dictionary,里面就是IIntegrationEventHandler,
    所以泛型接口会继承一个非泛型的接口,是为了进一步抽象声明,对一些集合处理是很方便的。

    然后这里为什么没有直接继承Ieventbus呢? 而是实现eventbus的功能。

    因为Ieventbus 其实是面向用户的,继承ieventbus只是一个门面,相当于适配器。

    而InMemoryEventBus 是为了实现功能。

    可以理解为InMemoryEventBus 是我们电脑主板、cpu等,然后我们只需要一个实现其接口的组件,从而和外部连接。

    而不是整个内核系统和外部直连,那么我们可以使用InMemoryEventBusClient 作为这个组件。

    public class InMemoryEventBusClient : IEventBus
    {
        private readonly InMemoryEventBus _eventBus;
        
        public InMemoryEventBusClient()
        {
            _eventBus = new InMemoryEventBus();
        }
    
        public void Dispose()
        {
            _eventBus.Dispose();
        }
    
        public async Task Publish(T @event) where T : IntegrationEvent
        {
            await _eventBus.Publish(@event);
        }
    
        public async Task Subscribe(IIntegrationEventHandler handler) where T : IntegrationEvent
        {
            await _eventBus.Subscribe(@handler);
        }
    
        public Task StartConsume()
        {
            // 运行相关的消费
            return Task.CompletedTask;
        }
    }
    

    InMemoryEventBusClient 负责实现外部接口,InMemoryEventBus 负责实现功能。

    从而达到解耦的目的。

    同样的例子还有polly,这个框架应该很出名了,其中他里面就有很多衍生的组件,都是调用内核来适配其他框架定义的接口。

    上面可以看到StartConsume什么都没有做,其功能被Publish给融合了。

    只要publish就消费了。

    如果我们扩展kafka的话,那么consume其实就是拉取数据然后消费,publish其实就是推向kafka,中间就是序列号和反序列话的过程。

    eventbus 完善篇后续再补。

  • 相关阅读:
    【NOI模拟赛】毒药(交互,构造(?))
    财经资讯网站--某联社参数破解
    STL总结
    Linux ipc通信(消息对列)
    Ubuntu上Qt安装和配置的完整步骤
    C++类和对象2
    基于Spring Boot + Vue的信息化在线教学平台
    解决Edge游览器龟速下载问题
    4.6版本Wordpress漏洞复现
    Nexus私服仓库Linux、Windows部署教程
  • 原文地址:https://www.cnblogs.com/aoximin/p/18068563