一,引言
在实际的项目中,Azure Data Factroy 中的 Data Flow 并不能彻底帮我们完成一系列复制逻辑计算, 比如我们需要针对数据集的每一行数据进行判断计算,Data Flow 就显的有些吃力。别怕,Azure Data Factory 提供了调用 Azure Function 的组件,有了代码的加持,那么解决更复杂的都能迎刃而解!!那么就开始今天的表演吧
--------------------我是分割线--------------------↳
1,Azure Data Factory(一)入门简介
2,Azure Data Factory(二)复制数据
3,Azure Data Factory(三)集成 Azure Devops 实现CI/CD
4,Azure Data Factory(四)集成 Logic App 的邮件通知提醒
5,Azure Data Factory(五)Blob Storage 密钥管理问题
6,Azure Data Factory(六)数据集类型为Dataverse的Link测试
7,Azure Data Factory(七)数据集验证之用户托管凭证
8,Azure Data Factory(八)数据集验证之服务主体(Service Principal)
9,Azure Data Factory(九)基础知识回顾
10,Azure Data Factory(十)Data Flow 组件详解
11,Azure Data Factory(十一)Data Flow 的使用解析
12,Azure Data Factory(十二)传参调用 Azure Function
二,正文
1,准备 Azure Function
打开 Azure Portal ,点击 "Create a resource" 快速创建 Azure Function
以下就是刚创建好的 Azure Function,Operating System 选择 "Windows",Runtime 选择:"node js"
添加 名字为 “Http_skip_holiday” 的 Function
Function Code:
1 const intercept = require("azure-function-log-intercept"); 2 3 module.exports = async function (context, req) { 4 context.log('JavaScript HTTP trigger function processed a request.'); 5 intercept(context); 6 let lo_date = (req.query.lo_date || (req.body && req.body.lo_date)); 7 let skipday = (req.query.skipday || (req.body && req.body.skipday)); 8 context.log("req.body:"+req.body); 9 context.log("lo_date:"+req.body.lo_date); 10 context.log("req.body:"+req.body.skipday); 11 //server Info 12 13 // Holiday Handling 14 let holidayArray = ['2023-01-01','2023-01-06','2023-01-07','2023-01-13','2023-01-14','2023-01-21','2023-01-27','2023-01-28']; 15 context.log("holidayArray.length: ", holidayArray.length); 16 17 let due_dateObj= calculate_dueDate(context,lo_date,holidayArray,skipday) 18 context.log("due_dateObj.Step: ", due_dateObj.Step); 19 context.res = { 20 status: 200, /* Defaults to 200 */ 21 body: due_dateObj 22 }; 23 } 24 25 function calculate_dueDate(context,lodate, holidayArray, num) { 26 "use strict"; 27 let DueDateObj={}; 28 let lo_date = new Date(lodate); 29 let Year = lo_date.getFullYear(); 30 let Month = lo_date.getMonth(); 31 let day = lo_date.getDate(); 32 33 let dueDate; 34 let step = num; 35 let isWorkDay = false; 36 do { 37 38 let currentDate = new Date(Year, Month, day + step); 39 40 if (currentDate.toDateString() in holidayArray || (currentDate.getDay() < 1)) { 41 step++; 42 } else { 43 isWorkDay = true; 44 } 45 } while (!isWorkDay); 46 47 48 dueDate = new Date(Year, Month, day + step); 49 DueDateObj.DueDate=dueDate.toString("yyyy-MM-dd"); 50 DueDateObj.Step=step; 51 context.log("dueDate:"+dueDate.toString("yyyy-MM-dd")); 52 return DueDateObj; 53 }
开启 Function 后,,我们使用 Postman 进行测试
注意:1)打开 Function 的 Filesystem Logs
2)如果Function 的访问基本不是 "" 那么就得在调用 Function 的 Url 后面加上验证身份的 Code
Postman 进行结果测试
2,Data Factory 中配置调用 Function
1)使用 LookUp 查询需要更新的数据集
2)利用 Foreach 循环编辑数据集,并根据每一天数据的 "inputdate","skipday" 作为参数调用 Azure Function
Foreach 的数据集合:
1 | @activity( 'Lookup_Data' ).output.value |
Function 的 Body 参数配置
1 | @concat( '{"lo_date":"' ,item().inputdate, '","skipday":' ,item().skipday, '}' ) |
pipeline code
{ "name": "test_pipeline", "properties": { "activities": [ { "name": "Lookup_Data", "type": "Lookup", "dependsOn": [], "policy": { "timeout": "0.12:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "source": { "type": "DelimitedTextSource", "storeSettings": { "type": "AzureBlobStorageReadSettings", "recursive": true, "wildcardFolderPath": "AAA", "wildcardFileName": { "value": "@concat('User_*.csv')", "type": "Expression" }, "enablePartitionDiscovery": false }, "formatSettings": { "type": "DelimitedTextReadSettings" } }, "dataset": { "referenceName": "AZURE_BLOB_CSV", "type": "DatasetReference", "parameters": { "ContainerName": "test", "DirectoryPath": "AAA", "FileName": { "value": "@concat('User_*.csv')", "type": "Expression" } } }, "firstRowOnly": false } }, { "name": "ForEach UPDATE Date", "type": "ForEach", "dependsOn": [ { "activity": "Lookup_Data", "dependencyConditions": [ "Succeeded" ] } ], "userProperties": [], "typeProperties": { "items": { "value": "@activity('Lookup_Data').output.value", "type": "Expression" }, "activities": [ { "name": "Azure_Function_SkipHoliday", "type": "AzureFunctionActivity", "dependsOn": [], "policy": { "timeout": "0.12:00:00", "retry": 0, "retryIntervalInSeconds": 30, "secureOutput": false, "secureInput": false }, "userProperties": [], "typeProperties": { "functionName": "Http_skip_holiday", "method": "POST", "body": { "value": "@concat('{\"lo_date\":\"',item().inputdate,'\",\"skipday\":',item().skipday,'}')", "type": "Expression" } }, "linkedServiceName": { "referenceName": "AzureFunction_LinkService", "type": "LinkedServiceReference" } } ] } } ], "annotations": [] } }
点击 Debug 进行调试
三,结尾
Azure Data Factory(ADF)是Azure中的托管数据集成服务,允许我们迭代地构建、编排和监视您的ETL工作流程。Azure Functions现在已与ADF集成,允许我们在数据工厂管道中运行Azure函数作为步骤。大家多多练习!!!
作者:Allen
版权:转载请在文章明显位置注明作者及出处。如发现错误,欢迎批评指正。