• Doris写入数据异常提示actual column number in csv file is less than schema column number


    版本信息:

    • Flink 1.17.1
    • Doris 1.2.3
    • Flink Doris Connector 1.4.0

    写入方式

    采用 String 数据流,依照社区网站的样例代码,在sink之前将数据转换为DataStream,分隔符采用"\t"。

    运行异常

    通过Stream Load返回结果json中的ErrorUrl可以看到如题的异常

    Reason: actual column number in csv file is less than schema column number. actual number: 10, ..., schema column number: 11; src line: [...]
    

    数据库表明明只有10个字段,提示schema column number却是11个。是自己眼花数错字段了吗?经过反复确认及同事确认,没有错,目标表就是10个字段,我写入的也是10个字段,是Flink Doris Connector 的bug吗?

    分析过程

    既然怀疑是bug,那就去扒代码。
    实际数据写入逻辑封装在org.apache.doris.flink.sink.writer.DorisWriter,该类实现了org.apache.flink.api.connector.sink.SinkWriter接口。查看该类发现,写入Doris的过程实际是使用微批写入的。

        @Override
        public void write(IN in, Context context) throws IOException {
            checkLoadException();
            byte[] serialize = serializer.serialize(in);
            if(Objects.isNull(serialize)){
                //ddl record
                return;
            }
            if(!loading) {
                //Start streamload only when there has data
                dorisStreamLoad.startLoad(currentLabel);
                loading = true;
            }
            dorisStreamLoad.writeRecord(serialize);
        }
        @Override
        public List prepareCommit(boolean flush) throws IOException {
            if(!loading){
                //There is no data during the entire checkpoint period
                return Collections.emptyList();
            }
            // disable exception checker before stop load.
            loading = false;
            Preconditions.checkState(dorisStreamLoad != null);
            RespContent respContent = dorisStreamLoad.stopLoad(currentLabel);
            if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                String errMsg = String.format("stream load error: %s, see more in %s", respContent.getMessage(), respContent.getErrorURL());
                throw new DorisRuntimeException(errMsg);
            }
            if (!executionOptions.enabled2PC()) {
                return Collections.emptyList();
            }
            long txnId = respContent.getTxnId();
            return ImmutableList.of(new DorisCommittable(dorisStreamLoad.getHostPort(), dorisStreamLoad.getDb(), txnId));
        }
    

    每一条记录都会触发write操作,从上述代码可以看到根据boolean变量loading的值,程序将会触发dorisStreamLoad.startLoad(currentLabel);,而loading的状态在preCommit方法中进行修改,而preCommit是在checkpoint时触发,所以数据提交动作是通过checkpoint触发的。查看startLoad源代码

    
        /**
         * start write data for new checkpoint.
         * @param label
         * @throws IOException
         */
        public void startLoad(String label) throws IOException{
            loadBatchFirstRecord = true;
            HttpPutBuilder putBuilder = new HttpPutBuilder();
            recordStream.startInput();
            LOG.info("stream load started for {} on host {}", label, hostPort);
            try {
                InputStreamEntity entity = new InputStreamEntity(recordStream);
                putBuilder.setUrl(loadUrlStr)
                        .baseAuth(user, passwd)
                        .addCommonHeader()
                        .addHiddenColumns(enableDelete)
                        .setLabel(label)
                        .setEntity(entity)
                        .addProperties(streamLoadProp);
                if (enable2PC) {
                   putBuilder.enable2PC();
                }
                pendingLoadFuture = executorService.submit(() -> {
                    LOG.info("start execute load");
                    return httpClient.execute(putBuilder.build());
                });
            } catch (Exception e) {
                String err = "failed to stream load data with label: " + label;
                LOG.warn(err, e);
                throw e;
            }
        }
    

    DorisStreamLoad类负责将数据实际写入Doris,在上面的代码中我看到了一个陌生的词汇HiddenColumns,“隐藏列”,什么是隐藏列?.addHiddenColumns(enableDelete)的参数enableDelete 是一个boolean值,继续扒代码发现,默认值enableDelete = true;,addHiddenColumn(true)?是否意味着我的put操作数据中必须包含隐藏列?继续扒

        public HttpPutBuilder addHiddenColumns(boolean add) {
            if(add){
                header.put("hidden_columns", LoadConstants.DORIS_DELETE_SIGN);
            }
            return this;
        }
    

    在http请求header中添加了一个配置,似乎是指明了"hidden_columns"="DORIS_DELETE_SIGN",看着好像是一个列名称,使用IDEA的跟踪调用功能,查看下哪里用到了这个变量。

    跟踪这些代码更确信,这是一个列名称。我的10列加上这一列就是11列啊,设置enableDelete = false,是否意味着我的put操作不再包含这一隐含列?

    解决方案

    修改构造DorisSink的代码添加.setDeletable(false);

            DorisExecutionOptions.Builder  executionBuilder = DorisExecutionOptions.builder();
            executionBuilder.setLabelPrefix(labelPrefix) //streamload label prefix
                    .setDeletable(false);
    

    重新运行代码,写入成功,问题解决。

    总结

    出现该异常是因为,Flink Doris Connector 在构造Sink时默认用户写入数据中包含了隐藏列__DORIS_DELETE_SIGN__
    尽管问题解决了,但是还是有很多疑问,什么是隐藏列,__DORIS_DELETE_SIGN__这个隐藏列是什么意思,从前面的代码中可以看出其取值为0或1,导入数据时为什么默认需要传递该列,该列在最前面还是在最后面?不传递该列是否会有问题?

  • 相关阅读:
    LeetCode刷题笔记之图论
    高并发挑战?盘点这些架构优化篇技巧,让你的系统焕发新生!
    企业级大数据平台智能运维好帮手——星环科技多模数据平台监控软件Aquila Insight
    栈(Stack) · 队列(Queue) · 循环队列 · 双端队列
    RabbitMQ-交换机(发布订阅模式/fanout/direct/topics)
    如何高效解决 C++内存问题,Apache Doris 实践之路|技术解析
    Python【控制台输出案例】
    大数据ClickHouse(十七):Java 读写ClickHouse API
    225. 用队列实现栈-C语言
    设计模式——装饰器模式
  • 原文地址:https://www.cnblogs.com/aaronking/p/17554705.html