• Azure Data Factory(十二)传参调用 Azure Function


    一,引言

     在实际的项目中,Azure Data Factroy 中的 Data Flow 并不能彻底帮我们完成一系列复制逻辑计算, 比如我们需要针对数据集的每一行数据进行判断计算,Data Flow 就显的有些吃力。别怕,Azure Data Factory 提供了调用 Azure Function 的组件,有了代码的加持,那么解决更复杂的都能迎刃而解!!那么就开始今天的表演吧

    --------------------我是分割线--------------------↳

    1,Azure Data Factory(一)入门简介

    2,Azure Data Factory(二)复制数据

    3,Azure Data Factory(三)集成 Azure Devops 实现CI/CD

    4,Azure Data Factory(四)集成 Logic App 的邮件通知提醒

    5,Azure Data Factory(五)Blob Storage 密钥管理问题

     6,Azure Data Factory(六)数据集类型为Dataverse的Link测试

    7,Azure Data Factory(七)数据集验证之用户托管凭证

    8,Azure Data Factory(八)数据集验证之服务主体(Service Principal)

    9,Azure Data Factory(九)基础知识回顾

    10,Azure Data Factory(十)Data Flow 组件详解 

    11,Azure Data Factory(十一)Data Flow 的使用解析

    12,Azure Data Factory(十二)传参调用 Azure Function

    二,正文

    1,准备 Azure Function

    打开 Azure Portal ,点击 "Create a resource" 快速创建 Azure Function

    以下就是刚创建好的  Azure  Function,Operating System 选择 "Windows",Runtime 选择:"node js"

    添加 名字为 “Http_skip_holiday” 的 Function 

    Function Code:

    复制代码
     1 const intercept = require("azure-function-log-intercept");
     2 
     3 module.exports = async function (context, req) {
     4     context.log('JavaScript HTTP trigger function processed a request.');
     5     intercept(context);
     6     let lo_date = (req.query.lo_date || (req.body && req.body.lo_date));
     7     let skipday = (req.query.skipday || (req.body && req.body.skipday));
     8     context.log("req.body:"+req.body);
     9     context.log("lo_date:"+req.body.lo_date);
    10     context.log("req.body:"+req.body.skipday);
    11     //server Info
    12 
    13     // Holiday Handling
    14     let holidayArray = ['2023-01-01','2023-01-06','2023-01-07','2023-01-13','2023-01-14','2023-01-21','2023-01-27','2023-01-28'];
    15     context.log("holidayArray.length: ", holidayArray.length);
    16 
    17     let due_dateObj= calculate_dueDate(context,lo_date,holidayArray,skipday)
    18     context.log("due_dateObj.Step: ", due_dateObj.Step);
    19     context.res = {
    20         status: 200, /* Defaults to 200 */
    21         body: due_dateObj
    22     };
    23 }
    24 
    25 function calculate_dueDate(context,lodate, holidayArray, num) {
    26     "use strict";
    27     let DueDateObj={};
    28     let lo_date = new Date(lodate);
    29     let Year = lo_date.getFullYear();
    30     let Month = lo_date.getMonth();
    31     let day = lo_date.getDate();
    32  
    33     let dueDate;
    34     let step = num;
    35     let isWorkDay = false;
    36     do {
    37 
    38         let currentDate = new Date(Year, Month, day + step);
    39 
    40         if (currentDate.toDateString() in holidayArray || (currentDate.getDay() < 1)) {
    41             step++;
    42         } else {
    43             isWorkDay = true;
    44         }
    45     } while (!isWorkDay);
    46  
    47 
    48     dueDate = new Date(Year, Month, day + step);
    49     DueDateObj.DueDate=dueDate.toString("yyyy-MM-dd");
    50     DueDateObj.Step=step;
    51     context.log("dueDate:"+dueDate.toString("yyyy-MM-dd"));
    52     return DueDateObj;
    53 }
    复制代码

     

     

    开启 Function 后,,我们使用 Postman 进行测试

    注意:1)打开 Function 的 Filesystem Logs

     2)如果Function 的访问基本不是 "" 那么就得在调用 Function 的 Url 后面加上验证身份的 Code 

    Postman 进行结果测试

    2,Data Factory 中配置调用 Function 

    1)使用 LookUp 查询需要更新的数据集

    2)利用 Foreach 循环编辑数据集,并根据每一天数据的 "inputdate","skipday" 作为参数调用  Azure Function

    Foreach 的数据集合:

    1
    @activity('Lookup_Data').output.value

    Function 的 Body 参数配置

    1
    @concat('{"lo_date":"',item().inputdate,'","skipday":',item().skipday,'}')

     

    pipeline code

    复制代码
    {
        "name": "test_pipeline",
        "properties": {
            "activities": [
                {
                    "name": "Lookup_Data",
                    "type": "Lookup",
                    "dependsOn": [],
                    "policy": {
                        "timeout": "0.12:00:00",
                        "retry": 0,
                        "retryIntervalInSeconds": 30,
                        "secureOutput": false,
                        "secureInput": false
                    },
                    "userProperties": [],
                    "typeProperties": {
                        "source": {
                            "type": "DelimitedTextSource",
                            "storeSettings": {
                                "type": "AzureBlobStorageReadSettings",
                                "recursive": true,
                                "wildcardFolderPath": "AAA",
                                "wildcardFileName": {
                                    "value": "@concat('User_*.csv')",
                                    "type": "Expression"
                                },
                                "enablePartitionDiscovery": false
                            },
                            "formatSettings": {
                                "type": "DelimitedTextReadSettings"
                            }
                        },
                        "dataset": {
                            "referenceName": "AZURE_BLOB_CSV",
                            "type": "DatasetReference",
                            "parameters": {
                                "ContainerName": "test",
                                "DirectoryPath": "AAA",
                                "FileName": {
                                    "value": "@concat('User_*.csv')",
                                    "type": "Expression"
                                }
                            }
                        },
                        "firstRowOnly": false
                    }
                },
                {
                    "name": "ForEach UPDATE Date",
                    "type": "ForEach",
                    "dependsOn": [
                        {
                            "activity": "Lookup_Data",
                            "dependencyConditions": [
                                "Succeeded"
                            ]
                        }
                    ],
                    "userProperties": [],
                    "typeProperties": {
                        "items": {
                            "value": "@activity('Lookup_Data').output.value",
                            "type": "Expression"
                        },
                        "activities": [
                            {
                                "name": "Azure_Function_SkipHoliday",
                                "type": "AzureFunctionActivity",
                                "dependsOn": [],
                                "policy": {
                                    "timeout": "0.12:00:00",
                                    "retry": 0,
                                    "retryIntervalInSeconds": 30,
                                    "secureOutput": false,
                                    "secureInput": false
                                },
                                "userProperties": [],
                                "typeProperties": {
                                    "functionName": "Http_skip_holiday",
                                    "method": "POST",
                                    "body": {
                                        "value": "@concat('{\"lo_date\":\"',item().inputdate,'\",\"skipday\":',item().skipday,'}')",
                                        "type": "Expression"
                                    }
                                },
                                "linkedServiceName": {
                                    "referenceName": "AzureFunction_LinkService",
                                    "type": "LinkedServiceReference"
                                }
                            }
                        ]
                    }
                }
            ],
            "annotations": []
        }
    }
    复制代码

     

    点击 Debug 进行调试

    三,结尾

      Azure Data Factory(ADF)是Azure中的托管数据集成服务,允许我们迭代地构建、编排和监视您的ETL工作流程。Azure Functions现在已与ADF集成,允许我们在数据工厂管道中运行Azure函数作为步骤。大家多多练习!!!

    作者:Allen 

    版权:转载请在文章明显位置注明作者及出处。如发现错误,欢迎批评指正。

  • 相关阅读:
    突破编程_C++_设计模式(备忘录模式)
    Linux 目录结构介绍
    安装MySQL Sample Database
    CleanMyMac X免费电脑清理加速软件-清理内存磁盘缓存注册表
    JVM中常量池存放在哪里
    设计模式【1】-- 单例模式
    requests模块
    黑马笔记---常见数据结构
    【机械】基于广义Hoek-Brown实现应变软化岩体GRC曲线及围岩位移应力塑性区绘制附matlab代码
    使用ubantu+pyspark完成对美国疫情的数据分析和可视化显示
  • 原文地址:https://www.cnblogs.com/AllenMaster/p/17990816