
当数据文件中同时包含 UPSERT 和 DELETE 操作时,必须添加 __op 字段,并且确保数据文件中包含一个代表操作类型的列,取值为 0 或 1。其中,取值为 0 时代表 UPSERT 操作,取值为 1 时代表 DELETE 操作。
准备数据文件。
a. 在本地文件系统创建一个 CSV 格式的数据文件 example3.csv。文件包含四列,分别代表用户 ID、用户姓名、用户得分和操作类型,如下所示:
- 101,Tom,100,1
- 102,Sam,70,0
- 103,Stan,80,0
b. 把 example3.csv 文件中的数据上传到 Kafka 集群的 topic3 中。
准备 StarRocks 表。
a. 在数据库 test_db 中创建一张名为 table3 的主键模型表。表包含 id、name 和 score 三列,分别代表用户 ID、用户名称和用户得分,主键为 id 列,如下所示:
- CREATE TABLE `table3`
- (
- `id` int(11) NOT NULL COMMENT "用户 ID",
- `name` varchar(65533) NOT NULL COMMENT "用户姓名",
- `score` int(11) NOT NULL COMMENT "用户得分"
- )
- ENGINE=OLAP
- PRIMARY KEY(`id`)
- DISTRIBUTED BY HASH(`id`);
说明
自 2.5.7 版本起,StarRocks 支持在建表和新增分区时自动设置分桶数量 (BUCKETS),您无需手动设置分桶数量。更多信息,请参见 确定分桶数量。
b. 向 table3 表中插入数据,如下所示:
- INSERT INTO table3 VALUES
- (101, 'Tom', 100),
- (102, 'Sam', 90);
通过导入,把 example3.csv 文件中 id 为 101 的数据从 table3 表中删除,把 example3.csv 文件中 id 为 102 的数据更新到 table3 表,并且把 example3.csv 文件中 id 为 103 的数据插入到 table3 表:
通过 Stream Load 导入:
- curl --location-trusted -u
: \ - -H "Expect:100-continue" \
- -H "label:label4" \
- -H "column_separator:," \
- -H "columns: id, name, score, temp, __op = temp" \
- -T example3.csv -XPUT \
- http://
:/api/test_db/table3/_stream_load
说明
上述示例中,通过
columns参数把example3.csv文件中代表组别代码的第四列临时命名为temp,然后定义__op字段等于临时命名的temp列。这样,StarRocks 可以根据example3.csv文件中第四列的取值是0还是1来确定执行 UPSERT 还是 DELETE 操作。
数据转成JSON,自动增加一个是否删除标志
- private String toJsonString(TapTable tapTable, Map<String, Object> record, boolean delete) throws JsonProcessingException {
- if (null == tapTable) throw new IllegalArgumentException("TapTable cannot be null");
- if (null == record) throw new IllegalArgumentException("Record cannot be null");
- LinkedHashMap<String, Object> linkedRecord = new LinkedHashMap<>();
- for (String field : tapTable.getNameFieldMap().keySet()) {
- Object value = record.get(field);
- if (null == value) {
- linkedRecord.put(field, null);
- } else {
- linkedRecord.put(field, value.toString());
- }
- }
- linkedRecord.put(Constants.STARROCKS_DELETE_SIGN, delete ? 1 : 0);
- return objectMapper.writeValueAsString(linkedRecord);
- }
stream load导入
- public RespContent put(final TapTable table) throws StreamLoadException, StarRocksRetryableException {
- StarRocksConfig config = starRocksContext.getStarRocksConfig();
- StarRocksContext.WriteFormat writeFormat = starRocksContext.getWriteFormat();
- String loadUrl = null;
- try {
- String[] httpNodes = config.getStarRocksHttp().split(",");
- loadUrl = buildLoadUrl(httpNodes[new Random().nextInt(httpNodes.length)], config.getDatabase(), table.getId());
- TapLogger.info("starrocks-load: loadUrl = {}", loadUrl);
- final String prefix = buildPrefix(table.getId());
-
-
- String label = prefix + "-" + UUID.randomUUID();
- List<String> columns = new ArrayList<>();
- for (Map.Entry<String, TapField> entry : table.getNameFieldMap().entrySet()) {
- columns.add(entry.getKey());
- }
- // add the STARROCKS_DELETE_SIGN at the end of the column
- columns.add(Constants.STARROCKS_DELETE_SIGN);
- columns.add("__op = "+Constants.STARROCKS_DELETE_SIGN);
- HttpPutBuilder putBuilder = new HttpPutBuilder();
- InputStreamEntity entity = new InputStreamEntity(recordStream, recordStream.getContentLength());
- Collection<String> primaryKeys = table.primaryKeys(true);
- if (CollectionUtils.isEmpty(primaryKeys)) {
- putBuilder.setUrl(loadUrl)
- // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
- .baseAuth(config.getUser(), config.getPassword())
- .addCommonHeader()
- .addFormat(writeFormat)
- .addColumns(columns)
- .setLabel(label)
- .enableAppend()
- .setEntity(entity);
- } else {
- putBuilder.setUrl(loadUrl)
- // 前端表单传出来的值和tdd json加载的值可能有差别,如前端传的pwd可能是null,tdd的是空字符串
- .baseAuth(config.getUser(), config.getPassword())
- .addCommonHeader()
- .addFormat(writeFormat)
- .addColumns(columns)
- .setLabel(label)
- .enableDelete()
- .setEntity(entity);
- }
- HttpPut httpPut = putBuilder.build();
- TapLogger.debug(TAG, "Call stream load http api, url: {}, headers: {}", loadUrl, putBuilder.header);
- return handlePreCommitResponse(httpClient.execute(httpPut));
- } catch (StarRocksRetryableException e) {
- metrics.clear();
- throw e;
- } catch (Exception e) {
- throw new StreamLoadException(String.format("Call stream load error: %s", e.getMessage()), e);
- }
- }