• [自研开源] MyData 数据集成之数据过滤 v0.7.2


    开源地址:gitee | github
    详细介绍:MyData 基于 Web API 的数据集成平台
    部署文档:用 Docker 部署 MyData
    使用手册:MyData 使用手册
    试用体验:https://demo.mydata.work
    交流Q群:430089673

    概述

    本篇基于 数据集成之任务流程 介绍任务执行时的数据过滤的使用场景和配置操作。

    使用场景

    业务系统与mydata集成时,核心是数据的来和去,在这两个方向上分别实现:数据预清洗数据权限控制

    1. 数据预清洗,从api获取数据后 过滤排除掉“脏”数据,然后再入库用于数据集成;

      在这里插入图片描述

      例如:接口返回的某字段值不能为空、字段值长度在指定范围等;

      以下代码是 提供数据 类型的任务执行过程:

      // 提供数据
      case MdConstant.DATA_PRODUCER:
          // 调用api 获取json
          String json = ApiUtil.read(taskInfo);
          // 将json按字段映射 解析为业务数据
          jobDataService.parseData(taskInfo, json);
          // 根据条件过滤数据
          jobDataFilterService.doFilter(taskInfo);
          // 保存业务数据
          jobDataService.saveTaskData(taskInfo);
          // 更新环境变量
          jobVarService.saveVarValue(taskInfo, json);
      	break;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13

      jobDataFilterService.doFilter 是对数据的预过滤处理,详见 JobDataFilterService.java

      public void doFilter(TaskInfo task) {
          Assert.notNull(task);
      	// 获取业务数据
          List<Map> dataList = task.getProduceDataList();
          // 获取配置的过滤条件
          List<BizDataFilter> dataFilters = task.getDataFilters();
      
          if (CollUtil.isEmpty(dataList) || CollUtil.isEmpty(dataFilters)) {
              return;
          }
      
          // 定义新的数据集合,用于存储 过滤后的数据
          List<Map> filterDatas = ListUtil.toList();
          // 遍历数据,并进行过滤
          dataList.forEach(data -> {
      
              boolean isCorrect = false;
      
              for (BizDataFilter filter : dataFilters) {
                  String key = filter.getKey();
                  Object filterValue = filter.getValue();
                  String op = filter.getOp();
      
                  // 当数据中 不包含 过滤的字段名,则执行下一项过滤
                  if (!data.containsKey(key)) {
                      continue;
                  }
      
                  // 当数据中 指定字段的值 无效,则过滤该数据
                  Object dataValue = data.get(key);
                  if (ObjectUtil.isNull(dataValue)) {
                      isCorrect = true;
                      break;
                  }
      
                  // 判断业务数据值 和 过滤数据值 都可对比,否则过滤条件无效
                  if (!(dataValue instanceof Comparable && filterValue instanceof Comparable)) {
                      break;
                  }
      
                  String cDataValue = dataValue.toString();
                  String cFilterValue = filterValue.toString();
                  // 根据op类型,过滤数据
                  switch (op) {
                      case MdConstant.DATA_OP_EQ:
                          // 等于
                          isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) == 0);
                          break;
                      case MdConstant.DATA_OP_NE:
                          // 不等于
                          isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) != 0);
                          break;
                      case MdConstant.DATA_OP_GT:
                          // 大于
                          isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) > 0);
                          break;
                      case MdConstant.DATA_OP_GTE:
                          // 大于等于
                          isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) >= 0);
                          break;
                      case MdConstant.DATA_OP_LT:
                          // 小于
                          isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) < 0);
                          break;
                      case MdConstant.DATA_OP_LTE:
                          // 小于等于
                          isCorrect = (ObjectUtil.compare(cDataValue, cFilterValue) <= 0);
                          break;
      
                      default:
                          throw new RuntimeException("JobDataFilter: 不支持的过滤操作");
                  }
              }
      
              // 当 未被过滤,则添加到过滤结果
              if (isCorrect) {
                  filterDatas.add(data);
              }
          });
      
          task.setProduceDataList(filterDatas);
      
          task.appendLog("过滤前的业务数据:{}", dataList);
          task.appendLog("过滤条件:{}", dataFilters);
          task.appendLog("过滤后的业务数据:{}", filterDatas);
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76
      • 77
      • 78
      • 79
      • 80
      • 81
      • 82
      • 83
      • 84
      • 85
      • 86

      注:目前0.7版本暂时实现了关系运算,后续增加函数处理;

    2. 数据权限控制,限制应用接收的数据范围,即符合条件的数据才能共享给应用;

      在这里插入图片描述

      以下代码是 消费数据 类型任务的执行过程:

      // 消费数据
      case MdConstant.DATA_CONSUMER:
          List<BizDataFilter> filters = taskInfo.getDataFilters();
          if (CollUtil.isNotEmpty(filters)) {
              // 解析过滤条件值中的 自定义字符串
              parseFilterValue(filters);
              // 排除值为null的条件
              filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
          }
          String dataCode = taskInfo.getDataCode();
          if (StrUtil.isNotEmpty(dataCode)) {
              // 根据过滤条件 查询数据
              List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
              taskInfo.setConsumeDataList(dataList);
              // 根据字段映射转换为api参数
              jobDataService.convertData(taskInfo);
          }
          // 调用api传输数据
          ApiUtil.write(taskInfo);
          break;
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20

      bizDataDAO.list 方法支持按配置条件查询数据,详见 BizDataDAO.java

      public List<Map> list(String dbCode, String dataCode, List<BizDataFilter> bizDataFilters) {
              MongoTemplate mongoTemplate = mongoFactory.getTemplate(dbCode);
              Query query = new Query();
              // 遍历数据过滤条件
              if (CollUtil.isNotEmpty(bizDataFilters)) {
                  // mongodb的查询条件集合
                  List<Criteria> criteriaList = CollUtil.newArrayList();
                  for (BizDataFilter bizDataFilter : bizDataFilters) {
                      // 条件key
                      String key = bizDataFilter.getKey();
                      // 条件操作
                      String op = bizDataFilter.getOp();
                      // 条件值
                      Object value = bizDataFilter.getValue();
      
                      // 根据条件操作类型 调用mongodb对应的查询方法
                      Criteria criteria = Criteria.where(key);
                      switch (op) {
                          case MdConstant.DATA_OP_EQ:
                              criteria.is(value);
                              break;
                          case MdConstant.DATA_OP_NE:
                              criteria.ne(value);
                              break;
                          case MdConstant.DATA_OP_GT:
                              criteria.gt(value);
                              break;
                          case MdConstant.DATA_OP_GTE:
                              criteria.gte(value);
                              break;
                          case MdConstant.DATA_OP_LT:
                              criteria.lt(value);
                              break;
                          case MdConstant.DATA_OP_LTE:
                              criteria.lte(value);
                              break;
      
                          default:
                              throw new RuntimeException("BizDataDAO: 不支持的过滤操作");
                      }
                      // 存入mongodb的查询条件集合
                      criteriaList.add(criteria);
                  }
      
                  // mongodb查询条件集合 加入查询中
                  query.addCriteria(new Criteria().andOperator(criteriaList));
              }
      
              // 执行查询
              return mongoTemplate.find(query, Map.class, dataCode);
          }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51

    配置操作

    1. 创建任务过程请参考 使用手册

    2. 在创建任务界面中,添加数据过滤条件

      如下图过滤条件是 salary > 600
      在这里插入图片描述

    3. 执行任务后 通过日志详情可以看到数据入库前预清洗;
      在这里插入图片描述

  • 相关阅读:
    常见的 NoSQL 数据库有哪些?
    【深度学习】第二章:全连接神经网络
    做测试5年,熬到阿里P6,月薪25k,我总结了这些技能点
    数据结构与算法之堆: Leetcode 347. 前K个高频元素 (Typescript版)
    计算机视觉岗实习面经
    LRU的原理与实现(java)
    2
    Unity3D Entity_CacheService实现详解
    初始化一个Android项目时,Android Studio会自动生成一些文件和目录结构,以帮助你快速上手开发
    Linux中线程的介绍
  • 原文地址:https://blog.csdn.net/tle3212000/article/details/136723426