• 在ASP.NET Core微服务架构下使用RabbitMQ如何实现CQRS模式


    前言

    在现代软件开发中,微服务架构和CQRS模式都是备受关注的技术趋势。微服务架构通过将应用程序拆分为一系列小型、自治的服务,提供了更好的可伸缩性和灵活性。而CQRS模式则通过将读操作和写操作分离,优化了系统的性能和可维护性。本文小编将为大家介绍如何在ASP.NET Core微服务架构下使用RabbitMQ来实现CQRS模式。

    微服务架构的简要概览

    微服务架构是一种软件架构模式,它将一个大型的单体应用程序拆分为一组小型、自治的服务,每个服务都可以独立部署、扩展和管理。每个服务都专注于一个特定的业务功能,并通过轻量级的通信机制相互协作,形成一个完整的分布式系统。

    RabbitMQ在微服务中的作用

    消息代理,以RabbitMQ作为示例,是微服务架构的枢纽,为服务间异步通信提供了一个健壮的机制。它们使得分离组件间的通信变得解耦合、可靠和可扩展。在下面的这段代码里面,RabbitMQ被用于给特定队列发送消息,确保服务间通信可靠。

    // Example of using RabbitMQ with RabbitMQ.Client in C#
    using RabbitMQ.Client;
    class RabbitMQService {
        public void SendMessageToQueue(string queueName, string message) {
            var factory = new ConnectionFactory(){HostName="localhost"};
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel;
            channel.QueueDeclare(queue:queueName,durable:false,exclusive:false,autoDelete:false,arguments:null);
            var body=Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange:"",routingKey:queueName,basicProperties:null,body:body);
            Console.WriteLines($"Message sent to {queueName}:{message}");
        }
    }
    

    RabbitMQ提供了很多功能,使得针对微服务架构高度适合:

    • 可靠性:它确保消息可靠传输,支持消息识别机制。
    • 灵活性:支持多种消息模式(发布订阅,点对点)和协议(AMQP,MQTT)。
    • 可扩展:允许通过发布横跨不同节点或集群的消息来横向伸缩。

    下面这段代码演示了RabbitMQ如何实现一个发布和订阅的功能。

    // Example of using RabbitMQ for Publish-Subscribe
    public class Publisher
    {
        public void Publish(string exchangeName, string message)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Fanout);
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: null, body: body);
            Console.WriteLine($"Message published to {exchangeName}: {message}");
        }
    }
    

    CQRS 模式

    CQRS从根本上来说是把处理命令(改变系统状态)的职责从查询(不更改状态下获取数据)中分离出来。这种分离允许对每种类型操作进行优化和裁剪。如下方的代码所示,Command Handler(命令程序)处理写操作,负责执行更新、创建或删除等改变系统状态的操作。Query Handler(查询程序)处理读操作,负责提供数据查询和展示的功能。

    // Example of Command and Query models in C#
    public class Command {
        public string Id {get;set;}
        public object Payload{get;set}
    }
    
    public class Query {
        public string Id(get;set;)
    }
    // Command Handler
    public class CommandHandler {
        public void HandleCommand(Command command) {
            // Logic to process and update the system state based on the command
        }
    }
    // Query Handler
    public class QueryHandler {
        public object HandleQuery(Query query) {
            // Logic to retrieve and return data without altering the system state
            return null;
        }
    }
    

    分离读和写操作的优势

    • 易于优化:不同模型可以为它们特定的任务进行优化。
    • 可扩展:系统可以为读和写独立扩展,优化性能。
    • 灵活性:修改写逻辑不影响读操作,在设计和迭代上提供了更大的灵活性。
    // Command Handler
    public class CommandHandler {
        public void HandleCommand(Command command){
            // Logic to process and update the system state based on the command
        }
    }
    // Query handler
    public class QueryHandler{
        public object HandlerQuery(Query query) {
            // Logic to retrieve and return data without altering the system state
            return null;
        }
    }
    

    RabbitMQ与CQRS集成

    在集成CQRS与RabbitMQ时,需要考虑以下因素:

    • 消息结构:以一种清晰一致的格式为命令和事件设计消息。
    • 错误处理:在消息处理中实现针对错误处理和重试的策略。
    • 消息持久性:配置队列来确保消息持久,避免数据丢失。
    • 可伸缩性:通过考虑RabbitMQ集群和负载均衡,为可伸缩提前谋划。

    现在,小编以在线订单系统为场景,介绍如何集成RabbitMQ和CQRS来实现订单的异步处理。

    场景:

    在一个在线订单系统中,放置了新订单后,它就需要被异步处理。小编将会使用RabbitMQ来处理命令(放置订单)和事件(订单处理)。这个系统将会用队列来分离命令和事件,同时遵循CQRS原则。

    设计注意事项:

    • OrderCommand:表示下订单的命令。
    • OrderEvent:表示已处理的订单。
    • Error Handling:对失败订单实施重试机制。

    命令处理:

    public class OrderCommandHandler
    {
        private readonly string commandQueueName = "order_commands";
    
        public void SendOrderCommand(OrderCommand command)
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(command));
            channel.BasicPublish(exchange: "", routingKey: commandQueueName, basicProperties: null, body: body);
            Console.WriteLine($"Order command sent: {JsonConvert.SerializeObject(command)}");
        }
        
        public void ConsumeOrderCommands()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            channel.QueueDeclare(queue: commandQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var commandMessage = Encoding.UTF8.GetString(body);
                var orderCommand = JsonConvert.DeserializeObject(commandMessage);
    
                // 处理订单命令
                Task.Run(() => ProcessOrderCommand(orderCommand));
    
                // 确认消息
                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            };
            channel.BasicConsume(queue: commandQueueName, autoAck: false, consumer: consumer);
        }
        
        private void ProcessOrderCommand(OrderCommand orderCommand)
        {
            // 异步处理订单命令的逻辑
            Console.WriteLine($"Processing order command: {JsonConvert.SerializeObject(orderCommand)}");
            
            // 下订单,执行验证
            // 如果成功,发布一个订单处理事件
            var orderEvent = new OrderEvent { OrderId = orderCommand.OrderId, Status = "Processed" };
            SendOrderProcessedEvent(orderEvent);
        }
        
        private void SendOrderProcessedEvent(OrderEvent orderEvent)
        {
            var eventQueueName = "order_events";
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderEvent));
            channel.BasicPublish(exchange: "", routingKey: eventQueueName, basicProperties: null, body: body);
            Console.WriteLine($"Order processed event sent: {JsonConvert.SerializeObject(orderEvent)}");
        }
    }
    

    为命令和事件实现消息队列

    在集成RabbitMQ的基于CQRS系统中,为命令和事件建立的分离的队列能使得组件间异步通信。

    public class OrderEventConsumer
    {
        private readonly string eventQueueName = "order_events";
    
        public void ConsumeOrderEvents()
        {
            var factory = new ConnectionFactory() { HostName = "localhost" };
            using var connection = factory.CreateConnection();
            using var channel = connection.CreateModel();
            channel.QueueDeclare(queue: eventQueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var eventMessage = Encoding.UTF8.GetString(body);
                var orderEvent = JsonConvert.DeserializeObject(eventMessage);
                Console.WriteLine($"Received order processed event: {JsonConvert.SerializeObject(orderEvent)}");
                // 处理已处理订单事件的逻辑
            };
            channel.BasicConsume(queue: eventQueueName, autoAck: true, consumer: consumer);
        }
    }
    

    异步通信和事件驱动架构

    事件驱动架构中,RabbitMQ使得异步通信更加便捷,这是因为它允许组件以一种非阻塞方式对事件和消息进行响应。

    public class Program
    {
        public static void Main(string[] args)
        {
            var orderCommandHandler = new OrderCommandHandler();
            var orderEventConsumer = new OrderEventConsumer();
    
            // 举例:发送订单命令
            var orderCommand = new OrderCommand { OrderId = Guid.NewGuid(), Product = "Product A", Quantity = 2 };
            orderCommandHandler.SendOrderCommand(orderCommand);
    
            // 异步使用订单命令和事件
            Task.Run(() => orderCommandHandler.ConsumeOrderCommands());
            Task.Run(() => orderEventConsumer.ConsumeOrderEvents());
            Console.ReadLine(); // 保持应用程序运行
        }
    }
    

    在微服务中集成CQRS和RabbitMQ

    创建服务

    现在小编创建两个服务,一个用于订单消息处理(OrderComandService),一个用于订单查询处理(OrderQueryService)。

    OrderComandService(订单命令服务)

    // 处理命令(下订单)
    public class OrderCommandService
    {
        private readonly string commandQueueName = "order_commands";
        public void SendOrderCommand(OrderCommand command)
        {
            // 向RabbitMQ队列发送order命令的代码(具体可以参考前面SendOrderCommand的代码)
        }
        public void ConsumeOrderCommands()
        {
            // 从RabbitMQ队列中消费订单命令的代码(具体可以参考前面的ConsumeOrderCommands代码)
            // 异步处理接收到的命令并相应地触发事件
        }
    }
    

    OrderQueryService(订单查询服务)

    // 处理查询(获取订单)
    public class OrderQueryService
    {
        private readonly string queryQueueName = "order_queries";
        public void SendOrderQuery(Query query)
        {
            // 向RabbitMQ队列发送order命令的代码(具体可以参考前面SendOrderCommand的代码)
        }
        public void ConsumeOrderQueries()
        {
            // 从RabbitMQ队列中接受消费订单命令的代码(具体可以参考前面的ConsumeOrderCommands代码)
            // 异步处理接收到的查询并检索订单数据
        }
    }
    

    在微服务中定义命令和查询模型

    命令和查询模型

    // 命令模型
    public class OrderCommand
    {
        public string OrderId { get; set; }
        // 其他与订单相关的字段(省略)
    }
    // 查询模型
    public class OrderQuery
    {
        public string QueryId { get; set; }
        // 其他与订单相关的字段(省略)
    }
    

    使用RabbitMQ编写订单命令和订单查询:

    OrderCommandService(订单命令服务)

    // 发送订单命令
    OrderCommandService orderCommandService = new OrderCommandService();
    OrderCommand orderCommand = new OrderCommand { OrderId = "123", /* 其他订单属性 */ };
    orderCommandService.SendOrderCommand(orderCommand);
    // 消费订单命令
    orderCommandService.ConsumeOrderCommands();
    

    OrderQueryService(订单查询服务)

    // 发送订单查询
    OrderQueryService orderQueryService = new OrderQueryService();
    OrderQuery orderQuery = new OrderQuery { QueryId = "456", /* 其他订单属性 */ };
    orderQueryService.SendOrderQuery(orderQuery);
    // 消费订单查询
    orderQueryService.ConsumeOrderQueries();
    

    总结

    在ASP.NET Core微服务架构中,使用RabbitMQ作为消息队列服务,通过实现CQRS模式(Command Query Responsibility Segregation),将写操作和读操作分离,以提高系统的性能和可伸缩性。这种组合能够实现异步通信和事件驱动架构,通过将命令发送到命令处理器执行写操作,同时使用订阅模式将事件发布给查询服务,实现实时的数据查询和更新。这样的架构使系统更具弹性和扩展性,并为开发者提供更好的工具和方法来构建复杂的分布式系统,以满足不同业务需求。

    扩展链接:

    Redis从入门到实践

    一节课带你搞懂数据库事务!

    Chrome开发者工具使用教程

    如何在Web应用中添加一个JavaScript Excel查看器

    高性能渲染——详解HTML Canvas的优势与性能

  • 相关阅读:
    python基于用户兴趣的java影视推荐系统django
    桂林理工大学计算机考研资料汇总
    mysql分区表的增删改查操作
    嵌入式面试/笔试C相关总结
    Timer,时间堆
    最简单的SpringCloudStream集成Kafka教程
    从数据仓库到数据湖、湖仓一体:概念溯源分析底层逻辑
    【考研英语语法】名词性从句
    Git:基本命令
    2023年软件测试常见面试题
  • 原文地址:https://www.cnblogs.com/powertoolsteam/p/17951775