• 9.3DDD之集成事件


    9.3DDD之集成事件

    和领域事件不同,集成事件主要用于在微服务之间进行事件传递,即可能在多个服务器之间进行通信。本文讲解RabbitMQ中间件来完成集成事件的处理。

    1. RabbitMQ的基本概念:
    • 信道(channel),信道是消息的生产者,消费者和服务器之间进行通信的虚拟连接。
    • 队列,队列是用来进行消息收发的地方,生产者将消息放到队列中,消费者从队列中获取消息。
    • 交换机,交换机用于把消息路由到队列中。
    1. RabbitMQ的routing模式:
    • 生产者把消息发布到交换机中,消息会携带routingKey属性,交互机根据routingKey的值吧消息发送到一个或者多个队列,然后消费者从队列中获取消息。这种模式的优点是交换机和队列都位于RabbitMQ服务器的内部,即使消费者不在线,相关消息也会保存在队列中,等消费者上线后就可以获取到消息了。

    使用步骤

    Nuget安装RabbitMQ.Client

    • 发送消息端
    using RabbitMQ.Client;
    using System.Text;
    
    var factory = new ConnectionFactory();
    factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
    factory.DispatchConsumersAsync = true;//兼容消费者异步使用
    string exchangeName = "exchange1";//交换机的名字
    string eventName = "myEvent";// routingKey的值
    using var conn = factory.CreateConnection();//创建一个客户端到RabbitMQ的TCP连接,尽量重复使用
    while (true)
    {
        string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息
        using (var channel = conn.CreateModel())//创建信道,信道可以关闭,关闭后消息才会发出
        {
            var properties = channel.CreateBasicProperties;//创建一个空的内容头
            properties.DeliveryMode = 2;//1为非持久2为持久
            //声明指定名字的交换机,如果已有同名的交换机则不再创建
            //type:direct表示交换机会根据消息routingKey的值进行相等性匹配,消息会发布到和它的routingKey绑定的队列
            channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
            byte[] body = Encoding.UTF8.GetBytes(msg);//RabbitMQ的消息只能按照byte[]类型传递
            channel.BasicPublish(exchange: exchangeName, routingKey: eventName,
                mandatory: true, basicProperties: properties, body: body);//发布消息        
        }
        Console.WriteLine("发布了消息:" + msg);
        Thread.Sleep(1000);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 接收消息端:
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.Text;
    
    var factory = new ConnectionFactory();
    factory.HostName = "127.0.0.1";
    factory.DispatchConsumersAsync = true;
    string exchangeName = "exchange1";
    string eventName = "myEvent";
    using var conn = factory.CreateConnection();
    using var channel = conn.CreateModel();
    string queueName = "queue1";
    //声明和发送端相同的交换机,如果发送端已经声明了相同的,则这天语句会被忽略
    //但是仍然要写,因为不确定是哪端先启动
    channel.ExchangeDeclare(exchange: exchangeName, type: "direct");
    //声明一个队列来接收交换机转发来的消息,如果已经指定了同名的队列,则自动忽略
    //消息从队列中取走后则队列中就没有这个消息了
    //如果A、B两程序都想取同一条消息,则需要声明两个不同名字的队列
    channel.QueueDeclare(queue: queueName, durable: true,
            exclusive: false, autoDelete: false, arguments: null);
    //将队列绑定到交换机上,并设定routingKey参数,这样当交换机收到routingKey的值和设定的值相同时
    //会把消息转发到我们指定的队列,一个交换机可绑定多个队列,如果这些队列的routingKey的值相同
    //那么交换机收到同一个routingKey的时候,会发送给多个队列
    channel.QueueBind(queue: queueName,
        exchange: exchangeName, routingKey: eventName);
    //AsyncEventingBasicConsumer用于从队列中接收消息,当一天消息被接收时,Received事件就会被触发
    var consumer = new AsyncEventingBasicConsumer(channel);
    //Received是阻塞执行的,也就是一条回调方法执行完成后才会触发下一条Received事件
    consumer.Received += Consumer_Received;//增加处理事件
    
    channel.BasicConsume(queue: queueName,autoAck: false, consumer: consumer);//执行
    Console.ReadLine();
    
    
    async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
    {
        //RabbitMQ支持消息的失败重发
        try
        {
            var bytes = args.Body.ToArray();
            string msg = Encoding.UTF8.GetString(bytes);
            Console.WriteLine(DateTime.Now + "收到了消息" + msg);
            //如果消息处理成功,则调用BasicAck通知队列
            //如果消息没有处理成功,则抵用BasicReject通知队列
            channel.BasicAck(args.DeliveryTag, multiple: false);
            await Task.Delay(800);
        }
        catch (Exception ex)
        {
            channel.BasicReject(args.DeliveryTag, true);
            Console.WriteLine("处理收到的消息出错" + ex);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    简化框架

    1. Nuget安装Zack.EventBus
    2. 在配置系统下创建EventBus节点
    "EventBus": {
        "HostName": "127.0.01",//RabbitMQ服务器地址
        "ExchangeName": "EventBusDemo1"//交换机的名字
      }
    
    • 1
    • 2
    • 3
    • 4
    1. 在program.cs中进行配置
    • 发送端
    var eventBusSec = builder.Configuration.GetSection("EventBus");
    builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
    //第一个参数用来设定程序绑定的队列的名字,一般一个微服务使用一个名字
    //但是同一个微服务项目的每个集群实例都要收到消息则不能使用一个名字
    //第二个参数为含有监听继承事件的处理者代码的程序集
    builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());
    var app = builder.Build();
    app.UseEventBus();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 接收端
    var eventBusSec = builder.Configuration.GetSection("EventBus");
    builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
    builder.Services.AddEventBus("EventBusDemo1_Q2", Assembly.GetExecutingAssembly());
    var app = builder.Build();
    app.UseEventBus();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    1. 在需要发布事件的类中注入IEventBus服务,调用Publish方法
    [Route("api/[controller]")]
    [ApiController]
    public class DemoController : ControllerBase
    {
        private IEventBus eventBus;
    
        public DemoController(IEventBus eventBus)
        {
            this.eventBus = eventBus;
        }
    
        [HttpPost]
        public string Publish()
        {
            eventBus.Publish("UserAdded", new { UserName = "yzk", Age = 18 });
            return "ok";
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    1. 编写事件处理者
    • 实现IIntegrationEventHandler接口
    [EventName("UserAdded")] //设定监听的事件名称,和publish中的名称一致,可以增加多个[EventName]来监听多个事件
    public class UserAddesEventHandler : IIntegrationEventHandler
    {
    	private readonly ILogger<UserAddesEventHandler> logger;
    	public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger)
    	{
    		this.logger = logger;
    	}
        //当收到一个事件后,Handle方法就会被调用,第一个参数为事件的名字,第二个是publish设置的数据,事件数据是以JSON格式传入
    	public Task Handle(string eventName, string eventData)
    	{
    		logger.LogInformation("新建了用户:" + eventData);
    		return Task.CompletedTask;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 事件数据是以JSON格式传入,可以使用JsonIntegrationEventHandler接口来解析成.net对象
    public record UserData(string UserName, int Age);
    
    [EventName("UserAdded")]
    public class UserAddesEventHandler3 : JsonIntegrationEventHandler<UserData>
    {
    	private readonly ILogger<UserAddesEventHandler3> logger;
    	public UserAddesEventHandler3(ILogger<UserAddesEventHandler3> logger)
    	{
    		this.logger = logger;
    	}
    	public override Task HandleJson(string eventName, UserData eventData)
    	{
    		logger.LogInformation($"Json:{eventData.UserName}");
    		return Task.CompletedTask;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 进行微服务开发时,为了降低耦合,一般不会新建一个UserData类供多个微服务使用。则可以使用DynamicIntegrationEventHandler接口来将JSON解析为dynamic类型
    [EventName("UserAdded")]
    public class UserAddesEventHandler2 : DynamicIntegrationEventHandler
    {
        private readonly ILogger<UserAddesEventHandler2> logger;
        public UserAddesEventHandler2(ILogger<UserAddesEventHandler2> logger)
        {
            this.logger = logger;
        }
        public override Task HandleDynamic(string eventName, dynamic eventData)
        {
            logger.LogInformation($"Dynamic:{eventData.UserName}");
            return Task.CompletedTask;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    代码随想录刷题Day53 | 1143. 最长公共子序列 | 1035. 不相交的线 | 53. 最大子数组和
    关于企业微信的第三方应用开发vue3开发
    u盘文件突然不见了怎么样才能恢复呢?
    java计算机毕业设计家庭理财管理系统源码+数据库+lw文档+系统
    python 另一种将内容写入记事本的方式
    2023 年 Bitget Wallet 测评
    eclipse写xsd报错:s4s-elt-schema-ns: The namespace of element...且没有自动补全类提示 问题的解决方法
    MySQL 8.0 新特性之不可见主键
    全网最全超详细.htaccess语法讲解
    Support for password authentication was removed on August 13, 2021 解决方案
  • 原文地址:https://blog.csdn.net/weixin_44064908/article/details/126680315