• Azure Data Factory(十二)传参调用 Azure Function


    一,引言

     在实际的项目中,Azure Data Factroy 中的 Data Flow 并不能彻底帮我们完成一系列复制逻辑计算, 比如我们需要针对数据集的每一行数据进行判断计算,Data Flow 就显的有些吃力。别怕,Azure Data Factory 提供了调用 Azure Function 的组件,有了代码的加持,那么解决更复杂的都能迎刃而解!!那么就开始今天的表演吧

    --------------------我是分割线--------------------↳

    1,Azure Data Factory(一)入门简介

    2,Azure Data Factory(二)复制数据

    3,Azure Data Factory(三)集成 Azure Devops 实现CI/CD

    4,Azure Data Factory(四)集成 Logic App 的邮件通知提醒

    5,Azure Data Factory(五)Blob Storage 密钥管理问题

     6,Azure Data Factory(六)数据集类型为Dataverse的Link测试

    7,Azure Data Factory(七)数据集验证之用户托管凭证

    8,Azure Data Factory(八)数据集验证之服务主体(Service Principal)

    9,Azure Data Factory(九)基础知识回顾

    10,Azure Data Factory(十)Data Flow 组件详解 

    11,Azure Data Factory(十一)Data Flow 的使用解析

    12,Azure Data Factory(十二)传参调用 Azure Function

    二,正文

    1,准备 Azure Function

    打开 Azure Portal ,点击 "Create a resource" 快速创建 Azure Function

    以下就是刚创建好的  Azure  Function,Operating System 选择 "Windows",Runtime 选择:"node js"

    添加 名字为 “Http_skip_holiday” 的 Function 

    Function Code:

    复制代码
     1 const intercept = require("azure-function-log-intercept");
     2 
     3 module.exports = async function (context, req) {
     4     context.log('JavaScript HTTP trigger function processed a request.');
     5     intercept(context);
     6     let lo_date = (req.query.lo_date || (req.body && req.body.lo_date));
     7     let skipday = (req.query.skipday || (req.body && req.body.skipday));
     8     context.log("req.body:"+req.body);
     9     context.log("lo_date:"+req.body.lo_date);
    10     context.log("req.body:"+req.body.skipday);
    11     //server Info
    12 
    13     // Holiday Handling
    14     let holidayArray = ['2023-01-01','2023-01-06','2023-01-07','2023-01-13','2023-01-14','2023-01-21','2023-01-27','2023-01-28'];
    15     context.log("holidayArray.length: ", holidayArray.length);
    16 
    17     let due_dateObj= calculate_dueDate(context,lo_date,holidayArray,skipday)
    18     context.log("due_dateObj.Step: ", due_dateObj.Step);
    19     context.res = {
    20         status: 200, /* Defaults to 200 */
    21         body: due_dateObj
    22     };
    23 }
    24 
    25 function calculate_dueDate(context,lodate, holidayArray, num) {
    26     "use strict";
    27     let DueDateObj={};
    28     let lo_date = new Date(lodate);
    29     let Year = lo_date.getFullYear();
    30     let Month = lo_date.getMonth();
    31     let day = lo_date.getDate();
    32  
    33     let dueDate;
    34     let step = num;
    35     let isWorkDay = false;
    36     do {
    37 
    38         let currentDate = new Date(Year, Month, day + step);
    39 
    40         if (currentDate.toDateString() in holidayArray || (currentDate.getDay() < 1)) {
    41             step++;
    42         } else {
    43             isWorkDay = true;
    44         }
    45     } while (!isWorkDay);
    46  
    47 
    48     dueDate = new Date(Year, Month, day + step);
    49     DueDateObj.DueDate=dueDate.toString("yyyy-MM-dd");
    50     DueDateObj.Step=step;
    51     context.log("dueDate:"+dueDate.toString("yyyy-MM-dd"));
    52     return DueDateObj;
    53 }
    复制代码

     

     

    开启 Function 后,,我们使用 Postman 进行测试

    注意:1)打开 Function 的 Filesystem Logs

     2)如果Function 的访问基本不是 "" 那么就得在调用 Function 的 Url 后面加上验证身份的 Code 

    Postman 进行结果测试

    2,Data Factory 中配置调用 Function 

    1)使用 LookUp 查询需要更新的数据集

    2)利用 Foreach 循环编辑数据集,并根据每一天数据的 "inputdate","skipday" 作为参数调用  Azure Function

    Foreach 的数据集合:

    1
    @activity('Lookup_Data').output.value

    Function 的 Body 参数配置

    1
    @concat('{"lo_date":"',item().inputdate,'","skipday":',item().skipday,'}')

     

    pipeline code

    复制代码
    {
        "name": "test_pipeline",
        "properties": {
            "activities": [
                {
                    "name": "Lookup_Data",
                    "type": "Lookup",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobStorageReadSettings",
                                "recursive": true,
                                "wildcardFolderPath": "AAA",
                                "wildcardFileName": {
                                    "value": "@concat('User_*.csv')",
                                    "type": "Expression"
                                },
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "dataset": {
                            "referenceName": "AZURE_BLOB_CSV",
                            "type": "DatasetReference",
                            "parameters": {
                                "ContainerName": "test",
                                "DirectoryPath": "AAA",
                                "FileName": {
                                    "value": "@concat('User_*.csv')",
                                    "type": "Expression"
                                }
                            }
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "ForEach UPDATE Date",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Lookup_Data",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Lookup_Data').output.value",
                            "type": "Expression"
                        },
                        "activities": [
                            {
                                "name": "Azure_Function_SkipHoliday",
                                "type": "AzureFunctionActivity",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "functionName": "Http_skip_holiday",
                                    "method": "POST",
                                    "body": {
                                        "value": "@concat('{\"lo_date\":\"',item().inputdate,'\",\"skipday\":',item().skipday,'}')",
                                        "type": "Expression"
                                    }
                                },
                                "linkedServiceName": {
                                    "referenceName": "AzureFunction_LinkService",
                                    "type": "LinkedServiceReference"
                                }
                            }
                        ]
                    }
                }
            ],
            "annotations": []
        }
    }
    复制代码

     

    点击 Debug 进行调试

    三,结尾

      Azure Data Factory(ADF)是Azure中的托管数据集成服务,允许我们迭代地构建、编排和监视您的ETL工作流程。Azure Functions现在已与ADF集成,允许我们在数据工厂管道中运行Azure函数作为步骤。大家多多练习!!!

    作者:Allen 

    版权:转载请在文章明显位置注明作者及出处。如发现错误,欢迎批评指正。

  • 相关阅读:
    小说推文、短剧cps推广和带影达人好做吗
    upload-labs第十七十八关
    【学习笔记】「JOI Open 2022」长颈鹿
    GO 在linux 平台编译android 交叉编译
    mysql数据库迁移达梦
    如何保持电机安全运行
    flask服务鉴权
    (Research)肝肿瘤免疫微环境亚型和中性粒细胞异质性
    Java自学网站--十几个网站的分析与评测
    改进后的 Google Play 管理中心用户管理: 访问请求、权限组等
  • 原文地址:https://www.cnblogs.com/AllenMaster/p/17990816