• Dapr实现.Net Grpc服务之间的发布和订阅,并采用WebApi类似的事件订阅方式


    大家好,我是失业在家,正在找工作的博主Jerry,找工作之余,总结和整理以前的项目经验,动手写了个洋葱架构(整洁架构)示例解决方案 OnionArch。其目的是为了更好的实现基于DDD(领域驱动分析)和命令查询职责分离(CQRS)的洋葱架构。

    OnionArch 是用来实现单个微服务的。它提供了Grpc接口和Dapr Side Car进行交互,通过Dapr来实现微服务之间的接口调用、事件发布订阅等微服务特性。但是,Dapr官方文档上只有Go语言的Grpc的微服务调用示例,没有事件发布和订阅示例,更没有基于Grpc通讯用.Net实现的事件订阅和发布示例。

    一、实现目标

    为了方便大家写代码,本文旨在介绍如何通过Dapr实现.Net Grpc服务之间的发布和订阅,并采用与WebApi类似的事件订阅方式。

    如果是Dapr Side Car通过Web Api和微服务引用交互,在WebApi中实现事件订阅非常简单,只要在Action 上增加“[Topic("pubsub", "TestTopic")]” Attribute即可,可如果Dapr是通过Grpc和Grpc服务交互就不能这样写了。

    为了保持WebApi和Grpc事件订阅代码的一致性,本文就是要在Grpc通讯的情况下实现如下写法来订阅并处理事件。

    复制代码
            [Topic("pubsub", "TestTopic")]
            public override Task TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)
            {
                string message = "TestTopicEvent" + request.EventData.Name;
                Console.WriteLine(message);
                return Task.FromResult(new HelloReply
                {
                    Message = message
                });
            }
    复制代码

    二、实现方案

    Dapr实现.Net Grpc服务之间的发布和订阅,根据官方文档,需要重写AppCallback.AppCallbackBase Grpc类的ListTopicSubscriptions方法和OnTopicEvent方法,ListTopicSubscriptions是给Dapr调用获取该微服务已订阅的事件,OnTopicEvent给Dapr调用以触发事件到达处理逻辑。但是这样就需要在AppCallback.AppCallbackBase实现类中硬编码已订阅的事件和事件处理逻辑。显然不符合我们的实现目标。

    参考Dapr SDK中关于WebApi 订阅查询接口“http://localhost:/dapr/subscribe”的实现代码,可以在AppCallback.AppCallbackBase实现类的ListTopicSubscriptions方法中,采用相同的方式,在Grpc方法中查询Topic Attribute的方式来搜索已订阅的事件。这样就不用在ListTopicSubscriptions中硬编码已订阅的事件了。

    为了避免在OnTopicEvent方法中应编码事件处理逻辑,就需要在接收到事件触发后动态调用Grpc方法。理论上,只要有proto文件就可以动态调用Grpc方法,而proto文件本来就在项目中。但是,我没找到.Net动态调用Grpc方法的相关资料,不知道大家有没有?

    我这里采用了另一种方式,根据我上一篇关于.Net 7.0 RC gRPC JSON 转码为 Swagger/OpenAPI文档。Grpc方法可以增加一个转码为Json的WebApi调用。这样就可以在OnTopicEvent方法中接收到事件触发后,通过HttpClient post到对应的WebApi地址,曲线实现动态调用Grpc方法。是不是有点脱裤子放屁的感觉?

    三、代码实现

    我的解决方案如下,GrpcServiceA发布事件,GrpcServiceB接收事件并处理。

    实现事件发布

    GrpcServiceA发布事件比较简单,和WebApi的方式是一样一样的。

    复制代码
     public async override Task SayHello(HelloRequest request, ServerCallContext context)
            {
                //await _daprClient.SaveStateAsync("statestore", "testKey", request.Name);
                EventData eventData = new EventData() { Id = 6, Name = request.Name, Description = "Looking for a job" };
                await _daprClient.PublishEventAsync("pubsub", "TestTopic", eventData);
                return new HelloReply
                {
                    Message = "Hello" + request.Name
                };
            }
    复制代码
    _daprClient怎么来的?我参考Dapr .Net SDK的代码,给IGrpcServerBuilder 增加了扩展方法:
     public static IGrpcServerBuilder AddDapr(this IGrpcServerBuilder builder, Action configureClient = null)
            {
                if (builder is null)
                {
                    throw new ArgumentNullException(nameof(builder));
                }
    
                // This pattern prevents registering services multiple times in the case AddDapr is called
                // by non-user-code.
                if (builder.Services.Any(s => s.ImplementationType == typeof(DaprMvcMarkerService)))
                {
                    return builder;
                }
    
                builder.Services.AddDaprClient(configureClient);
    
                builder.Services.AddSingleton();
    
                return builder;
            }
    
    
            private class DaprMvcMarkerService
            {
            }
    View Code

    然后就可以这样把DaprClient依赖注入到服务中。

    builder.Services.AddGrpc().AddJsonTranscoding().AddDapr();

    实现事件订阅

    根据上述实现方案,GrpcServiceB接收事件并处理有点复杂,参考我Grpc接口转码Json的的内容,在要接收事件的Grpc方法上增加转码Json WebApi 配置。

    复制代码
     rpc TestTopicEvent (TestTopicEventRequest) returns (HelloReply){
        option (google.api.http) = {
          post: "/v1/greeter/testtopicevent",
          body: "eventData"
        };
      }
    复制代码

    增加google.api.http选项,,可以通过post eventData 数据到地址“/v1/greeter/testtopicevent”调用该Grpc方法。然后实现该Grpc接口。

    复制代码
     [Topic("pubsub", "TestTopic")]
            public override Task TestTopicEvent(TestTopicEventRequest request, ServerCallContext context)
            {
                string message = "TestTopicEvent" + request.EventData.Name;
                Console.WriteLine(message);
                return Task.FromResult(new HelloReply
                {
                    Message = message
                });
            }
    复制代码

    我重用了Dapr .Net SDK 的Topic Attribute来标记该Grpc的实现接口,这样就可以搜索所有带Topic Attribute的Grpc方法来获取已经订阅的事件。

    接下来才是重头戏,重写AppCallback.AppCallbackBase Grpc接口类的ListTopicSubscriptions方法和OnTopicEvent方法

    复制代码
     public async override Task ListTopicSubscriptions(Empty request, ServerCallContext context)
            {
                var result = new ListTopicSubscriptionsResponse();
    
                var subcriptions = _endpointDataSource.GetDaprSubscriptions(_loggerFactory);
                foreach (var subscription in subcriptions)
                {
                    TopicSubscription subscr = new TopicSubscription()
                    {
                        PubsubName = subscription.PubsubName,
                        Topic = subscription.Topic,
                        Routes = new TopicRoutes()
                    };
                    subscr.Routes.Default = subscription.Route;
                    result.Subscriptions.Add(subscr);
                }
                return result;
            }
    复制代码

    该方法返回所有已订阅的事件和对应的WebApi Url,将事件对应的WebApi地址放入subscr.Routes.Default中。

    其中_endpointDataSource.GetDaprSubscriptions 方法参考了Dapr .Net SDK的实现。

    复制代码
    public static List GetDaprSubscriptions(this EndpointDataSource dataSource, ILoggerFactory loggerFactory, SubscribeOptions options = null)
            {
                var logger = loggerFactory.CreateLogger("DaprTopicSubscription");
                var subscriptions = dataSource.Endpoints
                    .OfType()
                    .Where(e => e.Metadata.GetOrderedMetadata().Any(t => t.Name != null)) // only endpoints which have TopicAttribute with not null Name.
                    .SelectMany(e =>
                    {
                        var topicMetadata = e.Metadata.GetOrderedMetadata();
                        var originalTopicMetadata = e.Metadata.GetOrderedMetadata();
    
                        var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();
    
                        for (int i = 0; i < topicMetadata.Count(); i++)
                        {
                            subs.Add((topicMetadata[i].PubsubName,
                                topicMetadata[i].Name,
                                (topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,
                                (topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,
                                topicMetadata[i].Match,
                                topicMetadata[i].Priority,
                                originalTopicMetadata.Where(m => (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.OwnedMetadatas?.Any(o => o.Equals(m.Id)) == true || string.IsNullOrEmpty(m.Id))
                                                     .GroupBy(c => c.Name)
                                                     .ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),
                                (topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,
                                e.RoutePattern));
                        }
    
                        return subs;
                    })
                    .Distinct()
                    .GroupBy(e => new { e.PubsubName, e.Name })
                    .Select(e => e.OrderBy(e => e.Priority))
                    .Select(e =>
                    {
                        var first = e.First();
                        var rawPayload = e.Any(e => e.EnableRawPayload.GetValueOrDefault());
                        var metadataSeparator = e.FirstOrDefault(e => !string.IsNullOrEmpty(e.MetadataSeparator)).MetadataSeparator ?? ",";
                        var rules = e.Where(e => !string.IsNullOrEmpty(e.Match)).ToList();
                        var defaultRoutes = e.Where(e => string.IsNullOrEmpty(e.Match)).Select(e => RoutePatternToString(e.RoutePattern)).ToList();
                        //var defaultRoute = defaultRoutes.FirstOrDefault();
                        var defaultRoute = defaultRoutes.LastOrDefault();
    
                        //multiple identical names. use comma separation.
                        var metadata = new Metadata(e.SelectMany(c => c.OriginalTopicMetadata).GroupBy(c => c.Key).ToDictionary(c => c.Key, c => string.Join(metadataSeparator, c.SelectMany(c => c.Value).Distinct())));
                        if (rawPayload || options?.EnableRawPayload is true)
                        {
                            metadata.Add(Metadata.RawPayload, "true");
                        }
    
                        if (logger != null)
                        {
                            if (defaultRoutes.Count > 1)
                            {
                                logger.LogError("A default subscription to topic {name} on pubsub {pubsub} already exists.", first.Name, first.PubsubName);
                            }
    
                            var duplicatePriorities = rules.GroupBy(e => e.Priority)
                              .Where(g => g.Count() > 1)
                              .ToDictionary(x => x.Key, y => y.Count());
    
                            foreach (var entry in duplicatePriorities)
                            {
                                logger.LogError("A subscription to topic {name} on pubsub {pubsub} has duplicate priorities for {priority}: found {count} occurrences.", first.Name, first.PubsubName, entry.Key, entry.Value);
                            }
                        }
    
                        var subscription = new Subscription()
                        {
                            Topic = first.Name,
                            PubsubName = first.PubsubName,
                            Metadata = metadata.Count > 0 ? metadata : null,
                        };
    
                        if (first.DeadLetterTopic != null)
                        {
                            subscription.DeadLetterTopic = first.DeadLetterTopic;
                        }
    
                        // Use the V2 routing rules structure
                        if (rules.Count > 0)
                        {
                            subscription.Routes = new Routes
                            {
                                Rules = rules.Select(e => new Rule
                                {
                                    Match = e.Match,
                                    Path = RoutePatternToString(e.RoutePattern),
                                }).ToList(),
                                Default = defaultRoute,
                            };
                        }
                        // Use the V1 structure for backward compatibility.
                        else
                        {
                            subscription.Route = defaultRoute;
                        }
    
                        return subscription;
                    })
                    .OrderBy(e => (e.PubsubName, e.Topic));
    
                return subscriptions.ToList();
            }
    
            private static string RoutePatternToString(RoutePattern routePattern)
            {
                return string.Join("/", routePattern.PathSegments
                                        .Select(segment => string.Concat(segment.Parts.Cast()
                                        .Select(part => part.Content))));
            }
    复制代码

    注意标红的哪一行是我唯一改动的地方,因为Grpc接口增加了Web Api配置后会返回两个Route,一个是原始Grpc的,一个是WebApi的,我们需要后面那个。

    接着重写OnTopicEvent方法

    复制代码
    public async override Task OnTopicEvent(TopicEventRequest request, ServerCallContext context)
            {
                TopicEventResponse topicResponse = new TopicEventResponse();
                string payloadString = request.Data.ToStringUtf8();
                Console.WriteLine("OnTopicEvent Data:" + payloadString);
                
                HttpContent postContent = new StringContent(payloadString, new MediaTypeWithQualityHeaderValue("application/json"));
                var response = await _httpClient4TopicEvent.PostAsync("http://" + context.Host + "/" + request.Path, postContent);
                string responseContent = await response.Content.ReadAsStringAsync();
                Console.WriteLine(responseContent);
                if (response.IsSuccessStatusCode)
                {
                    Console.WriteLine("OnTopicEvent Invoke Success.");
                    topicResponse.Status = TopicEventResponseStatus.Success;
                }
                else
                {
                    Console.WriteLine("OnTopicEvent Invoke Error.");
                    topicResponse.Status = TopicEventResponseStatus.Drop;
                }
                return topicResponse;
            }
    复制代码

    这里简单处理了事件触发的返回参数TopicEventResponse ,未处理重试的情况。request.path是在ListTopicSubscriptions方法中返回给Dapr的事件对应的WebApi调用地址。

    参数_httpClient4TopicEvent是这样注入的:

    复制代码
    builder.Services.AddHttpClient("HttpClient4TopicEvent", httpClient =>
    {
        httpClient.DefaultRequestVersion = HttpVersion.Version20;
        httpClient.DefaultVersionPolicy = HttpVersionPolicy.RequestVersionOrHigher;
        httpClient.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
    }
    );
    复制代码

    因为Grpc是基于Http2.0及以上版本的,所以需要修改HtttpClient默认配置,不然无法访基于Http2.0的WebApi。

    然后将我们的AppCallback.AppCallbackBase实现类DaprAppCallbackService Map到GrpcService即可。

    app.MapGrpcService();

    四、实现效果

    分别通过Dapr运行ServiceA和ServiceB微服务,注意指定--app-protocol协议为Grpc,我这里还使用了.Net 热重载技术

    dapr run --app-protocol grpc --app-id serviceA --app-port 5002 --dapr-grpc-port 50002 -- dotnet watch run --launch-profile https
    
    dapr run --app-protocol grpc --app-id serviceB --app-port 5003 --dapr-grpc-port 50003 -- dotnet watch run --launch-profile https

    在ServiceA中发布事件

     

    在ServiceB中查看已订阅的事件和接收到的事件触发

     

    五、找工作

    ▪ 博主有15年以上的软件技术实施经验(Technical Leader),专注于微服务(Dapr)和云原生(K8s)软件架构设计、专注于 .Net Core\Java开发和Devops构建发布。
    ▪ 博主10年以上的软件交付管理经验(Project Manager & Product Ower),致力于敏捷(Scrum)项目管理、软件产品业务需求分析和原型设计。
    ▪ 博主熟练配置和使用 Microsoft Azure云。
    ▪ 博主为人诚恳,积极乐观,工作认真负责。 

    我家在广州,也可以去深圳工作。做架构师、产品经理、项目经理都可以。有工作机会推荐的朋友可以加我微信 15920128707,微信名字叫Jerry。

    本文源代码在这里:iamxiaozhuang/TestDaprGrpcSubscripber (github.com) 大家可以随便取用。

  • 相关阅读:
    具有 1 个射频链的 OFDM-MIMO 系统的波束训练(Matlab代码实现)
    一种新的数据聚类启发式优化方法——黑洞算法(基于Matlab代码实现)
    flutter瀑布式图文列表
    Oracle数据库:oracle内连接inner join on,多表查询各种自链接、内连接、外连接的练习示例
    java基础之数组
    【个人博客搭建】hexo搭建个人博客
    WebAssembly核心编程[1]:wasm模块实例化的N种方式
    Topaz Video AI for Mac 视频无损放大软件安装教程【保姆级,操作简单轻松上手】
    Mac 重新安装系统
    Windows电脑上的多开软件是否安全?
  • 原文地址:https://www.cnblogs.com/xiaozhuang/p/16832868.html