• 扩展ABP的Webhook功能,推送数据到第三方接口(企业微信群、钉钉群等)


    前言

    在上一篇文章【基于ASP.NET ZERO,开发SaaS版供应链管理系统】中有提到对Webhook功能的扩展改造,本文详细介绍一下具体过程。

    Webhook功能操作说明,请参见此文档链接:Webhook数据推送

    Webhook功能发布日期:

    • ASP.NET Boilerplate(以下简称ABP)在v5.2(2020-02-18)版本中发布了Webhook功能,详细说明,请参见:官方帮助链接
    • ASP.NET ZERO(以下简称ZERO)在v8.2.0(2020-02-20)版本中发布了Webhook功能;
    • 我们系统是在2021年4月完成了对Webhook功能的改造:内部接口(用户自行设定接口地址的)、第三方接口(微信内部群、钉钉群、聚水潭API等)。

    1、Webhook定义

    • 为了区分内部接口与第三方接口,在第三方接口名称前统一附加特定前缀,如:Third.WX.XXX、Third.DD.XXX等;
    • 添加定义条目时候设定对应的特性(featureDependency),基于特性功能对不同租户显示或者隐藏定义的条目。
        public class AppWebhookDefinitionProvider : WebhookDefinitionProvider
        {
            public override void SetWebhooks(IWebhookDefinitionContext context)
            {
                //物料档案 - 全部可见
                context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Created));
                context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Updated));
                context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T11071001_Deleted));
    
                //生产订单 - 生产管理可见
                var featureC = new SimpleFeatureDependency("SCM.C");
                context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Created, featureDependency: featureC));
                context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Updated, featureDependency: featureC));
                context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_Deleted, featureDependency: featureC));
                context.Manager.Add(new WebhookDefinition(name: AppWebHookNames.T13041001_MRP_Data, featureDependency: featureC));
    
                //...
            }
        }
    
    • CoreModule中添加Webhook定义,并设定参数选项:
        public class SCMCoreModule : AbpModule
        {
            public override void PreInitialize()
            {
                Configuration.Webhooks.Providers.Add();
                Configuration.Webhooks.TimeoutDuration = TimeSpan.FromMinutes(1);
                Configuration.Webhooks.IsAutomaticSubscriptionDeactivationEnabled = true;
                Configuration.Webhooks.MaxSendAttemptCount = 3;
                Configuration.Webhooks.MaxConsecutiveFailCountBeforeDeactivateSubscription = 10;
    
                //...
            }
    
            //...
        }
    

    2、Webhook订阅

    • 前端用户创建Webhook订阅记录(WebhookUri、Webhooks、Headers等),之后传递到后端API;
    • 后端API通过WebhookSubscriptionManager添加保存WebhookSubscription(Webhook订阅):
        [AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription)]
        public class WebhookSubscriptionAppService : SCMAppServiceBase, IWebhookSubscriptionAppService
        {
            //...
    
            [AbpAuthorize(AppPermissions.Pages_Administration_WebhookSubscription_Create)]
            public async Task AddSubscription(WebhookSubscription subscription)
            {
                subscription.TenantId = AbpSession.TenantId;
    
                await _webHookSubscriptionManager.AddOrUpdateSubscriptionAsync(subscription);
            }
    
            //...
        }
    

    3、Webhook发布(数据推送)

    监测实体事件(CreatedEvent、UpdatedEvent、DeletedEvent)数据,按租户用户创建的Webhook订阅,推送数据:

        public class T11071001Syncronizer : 
            IEventHandler>,
            IEventHandler>,
            IEventHandler>,
            ITransientDependency
        {
            private readonly IAppWebhookPublisher _appWebhookPublisher;
    
            public T11071001Syncronizer(IAppWebhookPublisher appWebhookPublisher) 
            {
                _appWebhookPublisher = appWebhookPublisher;
            }
            public void HandleEvent(EntityCreatedEventData eventData)
            {
                DoWebhook("N", eventData.Entity);
            }
    
            public void HandleEvent(EntityUpdatedEventData eventData)
            {
                DoWebhook("U", eventData.Entity);
            }
    
            public void HandleEvent(EntityDeletedEventData eventData)
            {
                int? tenantId = eventData.Entity.TenantId; 
                string whName = AppWebHookNames.T11071001_Deleted;
                var subscriptions = _appWebhookPublisher.GetSubscriptions(tenantId, whName); 
                if (subscriptions == null) { return; }
    
                _appWebhookPublisher.PublishWebhookUOW(whName, eventData.Entity, tenantId, subscriptions);
            }
    
        }
    
    • DoWebhook()方法:基于具体的订阅(内部接口、第三方接口等)推送对应的内容:
            private void DoWebhook(string nu, T11071001 entity)
            {
                int? tenantId = entity.TenantId;
                var whCache = _appWebhookPublisher.GetWebhookCache(tenantId); if (whCache.Count == 0) { return; }
    
                string whName = nu == "N" ? AppWebHookNames.T11071001_Created : AppWebHookNames.T11071001_Updated;
                string whNameWX = AppWebHookNames.WX_T11071001_Created;
                string whNameDD = AppWebHookNames.DD_T11071001_Created;
    
                bool isWH = whCache.Names.ContainsKey(whName);
                bool isWX = whCache.Names.ContainsKey(whNameWX);
                bool isDD = whCache.Names.ContainsKey(whNameDD);
    
                if (!(isWH || isWX || isDD)) { return; }
    
                var data = ObjectMapper.Map(entity);
    
                //内部接口
                if (isWH)
                {
                    _appWebhookPublisher.PublishWebhookUOW(whName, data, tenantId, whCache.Names[whName], false);
                }
    
                //企业微信内部群
                if (isWX)
                {
                    var wxData = new WxTCardWebhookDto { template_card = GetWxTCard(data, tenantId, nu) };
                    _appWebhookPublisher.PublishWebhookUOW(whNameWX, wxData, tenantId, whCache.Names[whNameWX], true);
                }
    
                //钉钉内部群
                if (isDD)
                {
                    var title = GetNUTitle(nu, L(T));
                    var mdText = GetNewMarkdown(data, title);
                    var ddData = new DdMarkdownWebhookDto { markdown = new DdMarkdownContentDto { title = title, text = mdText } };
                    _appWebhookPublisher.PublishWebhookUOW(whNameDD, ddData, tenantId, whCache.Names[whNameDD], true);
                }
            }
    
    • GetWebhookCache()方法:实现按租户缓存Webhook订阅的数据:
            public SCMWebhookCacheItem GetWebhookCache(int? tenantId)
            {
               return SetAndGetCache(tenantId);
            }
    
            private SCMWebhookCacheItem SetAndGetCache(int? tenantId, string keyName = "SubscriptionCount")
            {
               int tid = tenantId ?? 0; var cacheKey = $"{keyName}-{tid}";
    
               return _cacheManager.GetSCMWebhookCache().Get(cacheKey, () =>
               {
                    int count = 0;
                    var names = new Dictionary<string, List>();
    
                    UnitOfWorkManager.WithUnitOfWork(() =>
                    {
                        using (UnitOfWorkManager.Current.SetTenantId(tenantId))
                        {
                            if (_featureChecker.IsEnabled(tid, "SCM.H")) //Feature核查
                            {
                                var items = _webhookSubscriptionRepository.GetAllList(e => e.TenantId == tenantId && e.IsActive == true);
                                count = items.Count;
    
                                foreach (var item in items)
                                {
                                    if (string.IsNullOrWhiteSpace(item.Webhooks)) { continue; }
                                    var whNames = JsonHelper.DeserializeObject<string[]>(item.Webhooks); if (whNames == null) { continue; }
                                    foreach (string whName in whNames)
                                    {
                                        if (names.ContainsKey(whName))
                                        {
                                            names[whName].Add(item.ToWebhookSubscription());
                                        }
                                        else
                                        {
                                            names.Add(whName, new List { item.ToWebhookSubscription() });
                                        }
                                    }
                                }
                            }
                        }
                    });
    
                    return new SCMWebhookCacheItem(count, names);
                });
            }
    
    • PublishWebhookUOW()方法:替换ABP中WebHookPublisher的默认实现,直接按传入的订阅,通过WebhookSenderJob推送数据:
            public void PublishWebhookUOW(string webHookName, object data, int? tenantId, List webhookSubscriptions = null, bool sendExactSameData = false)
            {
                UnitOfWorkManager.WithUnitOfWork(() =>
                {
                    using (UnitOfWorkManager.Current.SetTenantId(tenantId))   
                    {
                        Publish(webHookName, data, tenantId, webhookSubscriptions, sendExactSameData);
                    }
                });
            }
    
            private void Publish(string webhookName, object data, int? tenantId, List webhookSubscriptions, bool sendExactSameData = false)
            {
                if (string.IsNullOrWhiteSpace(webhookName)) { return; }
    
                //若无直接传入订阅则按webhookName查询
                webhookSubscriptions ??= _webhookSubscriptionRepository.GetAllList(subscriptionInfo =>
                        subscriptionInfo.TenantId == tenantId &&
                        subscriptionInfo.IsActive &&
                        subscriptionInfo.Webhooks.Contains("\"" + webhookName + "\"")
                    ).Select(subscriptionInfo => subscriptionInfo.ToWebhookSubscription()).ToList();
    
                if (webhookSubscriptions.IsNullOrEmpty()) { return; }
    
                var webhookInfo = SaveAndGetWebhookEvent(tenantId, webhookName, data);
    
                foreach (var webhookSubscription in webhookSubscriptions)
                {
                    var jobArgs = new WebhookSenderArgs
                    {
                        TenantId = webhookSubscription.TenantId,
                        WebhookEventId = webhookInfo.Id,
                        Data = webhookInfo.Data,
                        WebhookName = webhookInfo.WebhookName,
                        WebhookSubscriptionId = webhookSubscription.Id,
                        Headers = webhookSubscription.Headers,
                        Secret = webhookSubscription.Secret,
                        WebhookUri = webhookSubscription.WebhookUri,
                        SendExactSameData = sendExactSameData
                    };
    
                    //指定队列执行任务,由触发事件的server执行
                    IBackgroundJobClient hangFireClient = new BackgroundJobClient();
                    hangFireClient.Create(x => x.ExecuteAsync(jobArgs), new EnqueuedState(AppVersionHelper.MachineName));
                }
            }
    
    
    • WebhookSenderJob:重写WebhookManager的SignWebhookRequest方法,对于第三方接口,不添加签名的Header:
            public override void SignWebhookRequest(HttpRequestMessage request, string serializedBody, string secret)
            {
                if (request == null)
                {
                    throw new ArgumentNullException(nameof(request));
                }
    
                //第三方接口,不添加签名Header
                if (IsThirdAPI(request))
                {
                    return;
                }
    
                if (string.IsNullOrWhiteSpace(serializedBody))
                {
                    throw new ArgumentNullException(nameof(serializedBody));
                }
    
                var secretBytes = Encoding.UTF8.GetBytes(secret);
    
                using (var hasher = new HMACSHA256(secretBytes))
                {
                    request.Content = new StringContent(serializedBody, Encoding.UTF8, "application/json");
    
                    var data = Encoding.UTF8.GetBytes(serializedBody);
                    var sha256 = hasher.ComputeHash(data);
    
                    var headerValue = string.Format(CultureInfo.InvariantCulture, SignatureHeaderValueTemplate, BitConverter.ToString(sha256));
    
                    request.Headers.Add(SignatureHeaderName, headerValue);
                }
            }
    
    • WebhookSenderJob:重写WebhookSender的CreateWebhookRequestMessage方法,对于第三方接口,进行特殊处理:
            protected override HttpRequestMessage CreateWebhookRequestMessage(WebhookSenderArgs webhookSenderArgs)
            {
                return webhookSenderArgs.WebhookName switch
                {
                    AppWebHookNames.JST_supplier_upload => JSTHttpRequestMessage(webhookSenderArgs), //聚水潭 - 供应商上传
                    //...
                    _ => new HttpRequestMessage(HttpMethod.Post, webhookSenderArgs.WebhookUri)
                };
            }
    
    • WebhookSenderJob:重写WebhookSender的AddAdditionalHeaders方法, 对于第三方接口,不添加Headers:
            protected override void AddAdditionalHeaders(HttpRequestMessage request, WebhookSenderArgs webhookSenderArgs)
            {
                //第三方接口,不添加Header
                if (IsThirdAPI(request))
                {
                    return;
                }
    
                foreach (var header in webhookSenderArgs.Headers)
                {
                    if (request.Headers.TryAddWithoutValidation(header.Key, header.Value))
                    {
                        continue;
                    }
    
                    if (request.Content.Headers.TryAddWithoutValidation(header.Key, header.Value))
                    {
                        continue;
                    }
    
                    throw new Exception($"Invalid Header. SubscriptionId:{webhookSenderArgs.WebhookSubscriptionId},Header: {header.Key}:{header.Value}");
                }
            }
    
    • WebhookSenderJob:重写WebhookSender的SendHttpRequest方法,处理第三方接口的回传数据:
            protected override async Task<(bool isSucceed, HttpStatusCode statusCode, string content)> SendHttpRequest(HttpRequestMessage request)
            {
                using var client = _httpClientFactory.CreateClient(); //避免使用 new HttpClient()方式
                client.Timeout = _webhooksConfiguration.TimeoutDuration;
    
                var response = await client.SendAsync(request);
    
                var isSucceed = response.IsSuccessStatusCode;
                var statusCode = response.StatusCode;
                var content = await response.Content.ReadAsStringAsync();
    
                //第三方接口,需要处理回传的数据     
                if (IsThirdAPI(request))
                {
                    string method = TryGetHeader(request.Headers, "ThirdAPI1");
                    int tenantId = Convert.ToInt32(TryGetHeader(request.Headers, "ThirdAPI2"));
                    switch (method)
                    {
                        case AppWebHookNames.JST_supplier_upload: await JSTSupplierUploadResponse(method, content, tenantId); break;
                        //...
                        default: break;
                    }
                }
    
                return (isSucceed, statusCode, content);
            }
    

    总结

    基于ABP/ZERO的Webhook功能实现,进行一些扩展改造,可以实现业务数据按用户订阅进行推送,包括推送到第三方接口(企业微信群、钉钉等),在很大程度上提升了业务系统的灵活性与实用性。

  • 相关阅读:
    Linux系统中安装Redis-7.0.4
    数值分析思考题(钟尔杰版)参考解答——第六章
    算法|344.反转字符串 541. 反转字符串II 卡码网:54.替换数字 151.翻转字符串里的单词 卡码网:55.右旋转字符串
    【LeetCode-简单】746. 使用最小花费爬楼梯(详解)
    使用jdk自带的VisualVM分析hprof堆转储文件
    Python爬虫编程思想(161):Scrapy中的通用爬虫
    java计算机毕业设计进出口食品安全信息管理系统源码+mysql数据库+系统+lw文档+部署
    卷积公式和分布函数去解决Z=X+Y,Z=XY,Z=Y/X等Z的概率密度函数以及如何确定他们的范围
    国风数字人:数字时代的传统戏剧文化代言人
    web前端期末大作业——餐品后台管理系统(html+css+javascript)
  • 原文地址:https://www.cnblogs.com/freedyang/p/17684464.html