开源地址:gitee | github
详细介绍:MyData 基于 Web API 的数据集成平台
部署文档:用 Docker 部署 MyData
使用手册:MyData 使用手册
试用体验:https://demo.mydata.work
交流Q群:430089673
本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。
业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗
、数据权限控制
;
数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;
例如:接口返回的某字段值不能为空、字段值长度在指定范围等;
以下代码是 提供数据
类型的任务执行过程:
// 提供数据
case MdConstant.DATA_PRODUCER:
// 调用api 获取json
String json = ApiUtil.read(taskInfo);
// 将json按字段映射 解析为业务数据
jobDataService.parseData(taskInfo, json);
// 根据条件过滤数据
jobDataFilterService.doFilter(taskInfo);
// 保存业务数据
jobDataService.saveTaskData(taskInfo);
// 更新环境变量
jobVarService.saveVarValue(taskInfo, json);
break;
jobDataFilterService.doFilter
是对数据的预过滤处理,详见 JobDataFilterService.java
public void doFilter(TaskInfo task) {
Assert.notNull(task);
// 获取业务数据
List<Map> dataList = task.getProduceDataList();
// 获取配置的过滤条件
List<BizDataFilter> dataFilters = task.getDataFilters();
if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {
return;
}
// 定义新的数据集合,用于存储 过滤后的数据
List<Map> filterDatas = ListUtil.toList();
// 遍历数据,并进行过滤
dataList.forEach(data -> {
boolean isCorrect = false;
for (BizDataFilter filter : dataFilters) {
String key = filter.getKey();
Object filterValue = filter.getValue();
String op = filter.getOp();
// 当数据中 不包含 过滤的字段名,则执行下一项过滤
if (!data.containsKey(key)) {
continue;
}
// 当数据中 指定字段的值 无效,则过滤该数据
Object dataValue = data.get(key);
if (ObjectUtil.isNull(dataValue)) {
isCorrect = true;
break;
}
// 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效
if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {
break;
}
String cDataValue = dataValue.toString();
String cFilterValue = filterValue.toString();
// 根据op类型,过滤数据
switch (op) {
case MdConstant.DATA_OP_EQ:
// 等于
isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);
break;
case MdConstant.DATA_OP_NE:
// 不等于
isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);
break;
case MdConstant.DATA_OP_GT:
// 大于
isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);
break;
case MdConstant.DATA_OP_GTE:
// 大于等于
isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);
break;
case MdConstant.DATA_OP_LT:
// 小于
isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);
break;
case MdConstant.DATA_OP_LTE:
// 小于等于
isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);
break;
default:
throw new RuntimeException("JobDataFilter: 不支持的过滤操作");
}
}
// 当 未被过滤,则添加到过滤结果
if (isCorrect) {
filterDatas.add(data);
}
});
task.setProduceDataList(filterDatas);
task.appendLog("过滤前的业务数据:{}", dataList);
task.appendLog("过滤条件:{}", dataFilters);
task.appendLog("过滤后的业务数据:{}", filterDatas);
}
注:目前0.7版本暂时实现了关系运算,后续增加函数处理;
数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;
以下代码是 消费数据
类型任务的执行过程:
// 消费数据
case MdConstant.DATA_CONSUMER:
List<BizDataFilter> filters = taskInfo.getDataFilters();
if (CollUtil.isNotEmpty(filters)) {
// 解析过滤条件值中的 自定义字符串
parseFilterValue(filters);
// 排除值为null的条件
filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
}
String dataCode = taskInfo.getDataCode();
if (StrUtil.isNotEmpty(dataCode)) {
// 根据过滤条件 查询数据
List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
taskInfo.setConsumeDataList(dataList);
// 根据字段映射转换为api参数
jobDataService.convertData(taskInfo);
}
// 调用api传输数据
ApiUtil.write(taskInfo);
break;
bizDataDAO.list
方法支持按配置条件查询数据,详见 BizDataDAO.java
public List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {
MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);
Query query = new Query();
// 遍历数据过滤条件
if (CollUtil.isNotEmpty(bizDataFilters)) {
// mongodb的查询条件集合
List<Criteria> criteriaList = CollUtil.newArrayList();
for (BizDataFilter bizDataFilter : bizDataFilters) {
// 条件key
String key = bizDataFilter.getKey();
// 条件操作
String op = bizDataFilter.getOp();
// 条件值
Object value = bizDataFilter.getValue();
// 根据条件操作类型 调用mongodb对应的查询方法
Criteria criteria = Criteria.where(key);
switch (op) {
case MdConstant.DATA_OP_EQ:
criteria.is(value);
break;
case MdConstant.DATA_OP_NE:
criteria.ne(value);
break;
case MdConstant.DATA_OP_GT:
criteria.gt(value);
break;
case MdConstant.DATA_OP_GTE:
criteria.gte(value);
break;
case MdConstant.DATA_OP_LT:
criteria.lt(value);
break;
case MdConstant.DATA_OP_LTE:
criteria.lte(value);
break;
default:
throw new RuntimeException("BizDataDAO: 不支持的过滤操作");
}
// 存入mongodb的查询条件集合
criteriaList.add(criteria);
}
// mongodb查询条件集合 加入查询中
query.addCriteria(new Criteria().andOperator(criteriaList));
}
// 执行查询
return mongoTemplate.find(query, Map.class, dataCode);
}
创建任务过程请参考 使用手册
在创建任务界面中,添加数据过滤条件
如下图过滤条件是 salary > 600
执行任务后 通过日志详情可以看到数据入库前预清洗;