• Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式


    找工作之余,总结和整理以前的项目经验,动手写了个洋葱架构(整洁架构)示例解决方案 OnionArch。其目的是为了更好的实现基于DDD(领域驱动分析)和命令查询职责分离(CQRS)的洋葱架构。

    OnionArch 是用来实现单个微服务的。它提供了Grpc接口和Dapr Side Car进行交互,通过Dapr来实现微服务之间的接口调用、事件发布订阅等微服务特性。但是,Dapr官方文档上只有Go语言的Grpc的微服务调用示例,没有事件发布和订阅示例,更没有基于Grpc通讯用.Net实现的事件订阅和发布示例。

    一、实现目标

    为了方便大家写代码,本文旨在介绍如何通过Dapr实现.Net Grpc服务之间的发布和订阅,并采用与WebApi类似的事件订阅方式。

    如果是Dapr Side Car通过Web Api和微服务引用交互,在WebApi中实现事件订阅非常简单,只要在Action 上增加“[Topic("pubsub", "TestTopic")]” Attribute即可,可如果Dapr是通过Grpc和Grpc服务交互就不能这样写了。

    为了保持WebApi和Grpc事件订阅代码的一致性,本文就是要在Grpc通讯的情况下实现如下写法来订阅并处理事件。

            [Topic("pubsub", "TestTopic")]
            public override Task TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)
            {
                string message = "TestTopicEvent" + request.EventData.Name;
                Console.WriteLine(message);
                return Task.FromResult(new HelloReply
                {
                    Message = message
                });
            }

    二、实现方案

    Dapr实现.Net Grpc服务之间的发布和订阅,根据官方文档,需要重写AppCallback.AppCallbackBase Grpc类的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是给Dapr调用获取该微服务已订阅的事件,OnTopicEvent给Dapr调用以触发事件到达处理逻辑。但是这样就需要在AppCallback.AppCallbackBase实现类中硬编码已订阅的事件和事件处理逻辑。显然不符合我们的实现目标。

    参考Dapr SDK中关于WebApi 订阅查询接口“http://localhost:/dapr/subscribe”的实现代码,可以在AppCallback.AppCallbackBase实现类的ListTopicSubscriptions方法中,采用相同的方式,在Grpc方法中查询Topic Attribute的方式来搜索已订阅的事件。这样就不用在ListTopicSubscriptions中硬编码已订阅的事件了。

    为了避免在OnTopicEvent方法中应编码事件处理逻辑,就需要在接收到事件触发后动态调用Grpc方法。理论上,只要有proto文件就可以动态调用Grpc方法,而proto文件本来就在项目中。但是,我没找到.Net动态调用Grpc方法的相关资料,不知道大家有没有?

    我这里采用了另一种方式,根据我上一篇关于.Net 7.0 RC gRPC JSON 转码为 Swagger/OpenAPI文档。Grpc方法可以增加一个转码为Json的WebApi调用。这样就可以在OnTopicEvent方法中接收到事件触发后,通过HttpClient post到对应的WebApi地址,曲线实现动态调用Grpc方法。是不是有点脱裤子放屁的感觉?

    三、代码实现

    我的解决方案如下,GrpcServiceA发布事件,GrpcServiceB接收事件并处理。

    实现事件发布

    GrpcServiceA发布事件比较简单,和WebApi的方式是一样一样的。

     public async override Task SayHello(HelloRequest request, ServerCallContext context)
            {
                //await _daprClient.SaveStateAsync("statestore", "testKey", request.Name);
                EventData eventData = new EventData() {
  • 相关阅读:
    走进SpringBoot源码吃透Spring扩展点「扩展点实战系列」- 第450篇
    Go中的工作池:并发任务的优雅管理
    计算机毕业设计ssm儿童福利院管理系统5d7wb系统+程序+源码+lw+远程部署
    Java中如何检测一个元素是否存在于HashSet对象中呢?
    北理工嵩天Python语言程序设计笔记(7 组合数据类型)
    DDR CTRL介绍
    Chromebook文件夹应用新功能
    java内存泄漏和内存溢出oom排查思路
    【细读经典】delay model and timing analysis
    如何报考PMP项目管理认证考试?
  • 原文地址:https://blog.csdn.net/Q54665642ljf/article/details/127587415