• starrock通过导入实现数据变更


    770dacbf535836e27a8c9df7777ae0cf.jpeg

    数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加 __op 字段,并且确保数据文件中包含一个代表操作类型的列,取值为 0 或 1。其中,取值为 0 时代表 UPSERT 操作,取值为 1 时代表 DELETE 操作。

    数据样例
    1. 准备数据文件。

      a. 在本地文件系统创建一个 CSV 格式的数据文件 example3.csv。文件包含四列,分别代表用户 ID、用户姓名、用户得分和操作类型,如下所示:

      1. 101,Tom,100,1
      2. 102,Sam,70,0
      3. 103,Stan,80,0

      b. 把 example3.csv 文件中的数据上传到 Kafka 集群的 topic3 中。

    2. 准备 StarRocks 表。

      a. 在数据库 test_db 中创建一张名为 table3 的主键模型表。表包含 idname 和 score 三列,分别代表用户 ID、用户名称和用户得分,主键为 id 列,如下所示:

      1. CREATE TABLE `table3`
      2. (
      3. `id` int(11) NOT NULL COMMENT "用户 ID",
      4. `name` varchar(65533) NOT NULL COMMENT "用户姓名",
      5. `score` int(11) NOT NULL COMMENT "用户得分"
      6. )
      7. ENGINE=OLAP
      8. PRIMARY KEY(`id`)
      9. DISTRIBUTED BY HASH(`id`);

      说明

      自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 确定分桶数量。

      b. 向 table3 表中插入数据,如下所示:

      1. INSERT INTO table3 VALUES
      2. (101, 'Tom', 100),
      3. (102, 'Sam', 90);
    导入数据

    通过导入,把 example3.csv 文件中 id 为 101 的数据从 table3 表中删除,把 example3.csv 文件中 id 为 102 的数据更新到 table3 表,并且把 example3.csv 文件中 id 为 103 的数据插入到 table3 表:

    • 通过 Stream Load 导入:

      1. curl --location-trusted -u : \
      2. -H "Expect:100-continue" \
      3. -H "label:label4" \
      4. -H "column_separator:," \
      5. -H "columns: id, name, score, temp, __op = temp" \
      6. -T example3.csv -XPUT \
      7. http://:/api/test_db/table3/_stream_load

      说明

      上述示例中,通过 columns 参数把 example3.csv 文件中代表组别代码的第四列临时命名为 temp,然后定义 __op 字段等于临时命名的 temp 列。这样,StarRocks 可以根据 example3.csv 文件中第四列的取值是 0 还是 1 来确定执行 UPSERT 还是 DELETE 操作。

    代码实现

    数据转成JSON,自动增加一个是否删除标志

    1. private String toJsonString(TapTable tapTable, Map<String, Object> record, boolean delete) throws JsonProcessingException {
    2. if (null == tapTable) throw new IllegalArgumentException("TapTable cannot be null");
    3. if (null == record) throw new IllegalArgumentException("Record cannot be null");
    4. LinkedHashMap<String, Object> linkedRecord = new LinkedHashMap<>();
    5. for (String field : tapTable.getNameFieldMap().keySet()) {
    6. Object value = record.get(field);
    7. if (null == value) {
    8. linkedRecord.put(field, null);
    9. } else {
    10. linkedRecord.put(field, value.toString());
    11. }
    12. }
    13. linkedRecord.put(Constants.STARROCKS_DELETE_SIGN, delete ? 1 : 0);
    14. return objectMapper.writeValueAsString(linkedRecord);
    15. }

    stream load导入

    1. public RespContent put(final TapTable table) throws StreamLoadException, StarRocksRetryableException {
    2. StarRocksConfig config = starRocksContext.getStarRocksConfig();
    3. StarRocksContext.WriteFormat writeFormat = starRocksContext.getWriteFormat();
    4. String loadUrl = null;
    5. try {
    6. String[] httpNodes = config.getStarRocksHttp().split(",");
    7. loadUrl = buildLoadUrl(httpNodes[new Random().nextInt(httpNodes.length)], config.getDatabase(), table.getId());
    8. TapLogger.info("starrocks-load: loadUrl = {}", loadUrl);
    9. final String prefix = buildPrefix(table.getId());
    10. String label = prefix + "-" + UUID.randomUUID();
    11. List<String> columns = new ArrayList<>();
    12. for (Map.Entry<String, TapField> entry : table.getNameFieldMap().entrySet()) {
    13. columns.add(entry.getKey());
    14. }
    15. // add the STARROCKS_DELETE_SIGN at the end of the column
    16. columns.add(Constants.STARROCKS_DELETE_SIGN);
    17. columns.add("__op = "+Constants.STARROCKS_DELETE_SIGN);
    18. HttpPutBuilder putBuilder = new HttpPutBuilder();
    19. InputStreamEntity entity = new InputStreamEntity(recordStream, recordStream.getContentLength());
    20. Collection<String> primaryKeys = table.primaryKeys(true);
    21. if (CollectionUtils.isEmpty(primaryKeys)) {
    22. putBuilder.setUrl(loadUrl)
    23. // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
    24. .baseAuth(config.getUser(), config.getPassword())
    25. .addCommonHeader()
    26. .addFormat(writeFormat)
    27. .addColumns(columns)
    28. .setLabel(label)
    29. .enableAppend()
    30. .setEntity(entity);
    31. } else {
    32. putBuilder.setUrl(loadUrl)
    33. // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
    34. .baseAuth(config.getUser(), config.getPassword())
    35. .addCommonHeader()
    36. .addFormat(writeFormat)
    37. .addColumns(columns)
    38. .setLabel(label)
    39. .enableDelete()
    40. .setEntity(entity);
    41. }
    42. HttpPut httpPut = putBuilder.build();
    43. TapLogger.debug(TAG, "Call stream load http api, url: {}, headers: {}", loadUrl, putBuilder.header);
    44. return handlePreCommitResponse(httpClient.execute(httpPut));
    45. } catch (StarRocksRetryableException e) {
    46. metrics.clear();
    47. throw e;
    48. } catch (Exception e) {
    49. throw new StreamLoadException(String.format("Call stream load error: %s", e.getMessage()), e);
    50. }
    51. }
  • 相关阅读:
    官宣 | 效率源文档修复神器正式出道:超高性价比工具,破损文档1秒修复
    (论文阅读40-45)图像描述1
    JAVA计算机毕业设计医疗病历交互系统Mybatis+系统+数据库+调试部署
    Vue-2.9单页应用程序
    小程序中的事件处理
    Vue3中快速简单使用CKEditor 5富文本编辑器
    【内网安全】横向移动-IPC
    Linux安装和使用Android Debug Bridge(ADB)
    澳洲猫罐头如何?我亲自喂养过的优质猫罐头分享
    Nvm任意切换node版本号
  • 原文地址:https://blog.csdn.net/dot_life/article/details/134025272