• 源码解析flink文件连接源TextInputFormat


    背景:

    kafka的文件系统数据源可以支持精准一次的一致性,本文就从源码看下如何TextInputFormat如何支持状态的精准一致性

    TextInputFormat源码解析

    首先flink会把输入的文件进行切分,分成多个数据块的形式,每个数据源算子任务会被分配以读取其中的数据块,但是不是所有的文件都能进行分块,判断文件是否可以进行分块的代码如下:

    protected boolean testForUnsplittable(FileStatus pathFile) {
        if (getInflaterInputStreamFactory(pathFile.getPath()) != null) {
            unsplittable = true;
            return true;
        }
        return false;
    }
    
    private InflaterInputStreamFactory<?> getInflaterInputStreamFactory(Path path) {
        String fileExtension = extractFileExtension(path.getName());
        if (fileExtension != null) {
            return getInflaterInputStreamFactory(fileExtension);
        } else {
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    在这里插入图片描述

    后缀名称是.gz,.bzip2等的文件都没法切分,如果可以切分,切分的具体代码如下所示:

    while (samplesTaken < numSamples && fileNum < allFiles.size()) {
        // make a split for the sample and use it to read a record
        FileStatus file = allFiles.get(fileNum);
    // 根据偏移量进行切分
        FileInputSplit split = new FileInputSplit(0, file.getPath(), offset, file.getLen() - offset, null);
        // we open the split, read one line, and take its length
        try {
            open(split);
            if (readLine()) {
                totalNumBytes += this.currLen + this.delimiter.length;
                samplesTaken++;
            }
        } finally {
            // close the file stream, do not release the buffers
            super.close();
        }
    // 偏移量迁移
        offset += stepSize;
    
        // skip to the next file, if necessary
        while (fileNum < allFiles.size()
                && offset >= (file = allFiles.get(fileNum)).getLen()) {
            offset -= file.getLen();
            fileNum++;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    再来看一下TextInputFormat如何支持checkpoint操作,保存文件的偏移量的代码:

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
    
        checkState(
                checkpointedState != null, "The operator state has not been properly initialized.");
    
        int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
        // 算子列表状态
        checkpointedState.clear();
        // 获取文件的当前读取的偏移
        List<T> readerState = getReaderState();
    
        try {
            for (T split : readerState) {
               //保存到检查点路径中
                checkpointedState.add(split);
            }
        } catch (Exception e) {
            checkpointedState.clear();
    
            throw new Exception(
                    "Could not add timestamped file input splits to to operator "
                            + "state backend of operator "
                            + getOperatorName()
                            + '.',
                    e);
        }
    
        if (LOG.isDebugEnabled()) {
            LOG.debug(
                    "{} (taskIdx={}) checkpointed {} splits: {}.",
                    getClass().getSimpleName(),
                    subtaskIdx,
                    readerState.size(),
                    readerState);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    从检查点中恢复状态的代码如下:

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
    
        checkState(checkpointedState == null, "The reader state has already been initialized.");
    
        // 初始化算子操作状态
        checkpointedState =
                context.getOperatorStateStore()
                        .getListState(new ListStateDescriptor<>("splits", new JavaSerializer<>()));
    
        int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask();
        
        LOG.info(
                "Restoring state for the {} (taskIdx={}).", getClass().getSimpleName(), subtaskIdx);
    
        splits = splits == null ? new PriorityQueue<>() : splits;
        for (T split : checkpointedState.get()) {//从检查点状态中恢复各个切分的分块
            splits.add(split);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
  • 相关阅读:
    Python遥感开发之批量掩膜和裁剪
    CAN FD canfd适配器USBCANFD的功能简介
    stl算法的使用(函数及谓词)
    请求参类型params,json,data 含义理解
    #FreeRTOS中断管理简介
    Rocky Linux 运维工具 mv
    10.20作业
    C++ 背包问题——多重背包
    mysql巧妙化解递归查询树形数据 | 纯sql
    leetcode:2347. 最好的扑克手牌(python3解法)
  • 原文地址:https://blog.csdn.net/lixia0417mul2/article/details/133897397