• calcite 初试-使用sql读取csv文件


    背景

    通过calcite,使用sql来读取csv数据
    本文争取利用最少的代码来实现此功能,理解calcite是如何工作的

    主流程代码

    生成AST

            SqlParser parser = SqlParser.create("select * from DEPTS where DEPTNO>=20 ", SqlParser.Config.DEFAULT);
           // 解析SQL字符串, 生成SqlNode树
            SqlNode sqlNode = parser.parseStmt();
            System.out.println(sqlNode);
    
    • 1
    • 2
    • 3
    • 4

    结果 SqlNode

    SELECT *
    FROM DEPTS
    WHERE DEPTNO >= 20

    对sql 进行验证,是否合法

     CsvSchema csvSchema = new CsvSchema();
            CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
            rootSchema.add("test_schema", csvSchema);
            SchemaPlus schemaPlus = Frameworks.createRootSchema(false);
            schemaPlus.add("test_schema", csvSchema);
            CalciteConnectionConfigImpl config = new CalciteConnectionConfigImpl(new Properties());
            RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
            Prepare.CatalogReader catalogReader = new CalciteCatalogReader(rootSchema, Collections.singletonList("test_schema"), typeFactory, config);
            SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT.withLenientOperatorLookup(config.lenientOperatorLookup()).withSqlConformance(config.conformance()).withDefaultNullCollation(config.defaultNullCollation()).withIdentifierExpansion(true);
            SqlValidator validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, typeFactory, validatorConfig);
           // 执行SQL验证
            SqlNode validateSqlNode = validator.validate(sqlNode);
            System.out.println(sqlNode);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    输出 SqlNode
    SELECT DEPTS.DEPTNO, DEPTS.NAME
    FROM test_schema.DEPTS AS DEPTS
    WHERE DEPTS.DEPTNO >= 20

    sqlNode转成RelRoot

     VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
            planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
            // 创建SqlToRelConverter
            RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
            SqlToRelConverter.Config converterConfig = SqlToRelConverter.config().withTrimUnusedFields(true).withExpand(false);
            SqlToRelConverter converter = new SqlToRelConverter(null, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, converterConfig);
           // 将SqlNode树转化为RelNode树
            RelRoot relNode = converter.convertQuery(validateSqlNode, false, true);
            System.out.println(relNode);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    输出
    Root {kind: SELECT, rel: rel#3:LogicalProject.NONE(input=LogicalFilter#2,inputs=0…1), rowType: RecordType(INTEGER DEPTNO, VARCHAR NAME), fields: [<0, DEPTNO>, <1, NAME>], collation: []}

    RelNode 根据规则进行优化

      // 规则
            RuleSet rules = RuleSets.ofList(CoreRules.FILTER_TO_CALC, CoreRules.PROJECT_TO_CALC, CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE, CoreRules.FILTER_INTO_JOIN,  // 过滤谓词下推到Join之前
                    EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE, EnumerableRules.ENUMERABLE_PROJECT_TO_CALC_RULE, EnumerableRules.ENUMERABLE_FILTER_TO_CALC_RULE, EnumerableRules.ENUMERABLE_JOIN_RULE, EnumerableRules.ENUMERABLE_SORT_RULE, EnumerableRules.ENUMERABLE_CALC_RULE, EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
            Program program = Programs.of(RuleSets.ofList(rules));
            RelNode optimizerRelTree = program.run(planner, relNode.rel, relNode.rel.getTraitSet().plus(EnumerableConvention.INSTANCE), Collections.emptyList(), Collections.emptyList());
            System.out.println(optimizerRelTree);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    RelRoot 生成物理执行计划

     // 生成物理执行计划
            EnumerableRel enumerable = (EnumerableRel) optimizerRelTree;
            Map<String, Object> internalParameters = new LinkedHashMap<>();
            EnumerableRel.Prefer prefer = EnumerableRel.Prefer.ARRAY;
            Bindable bindable = EnumerableInterpretable.toBindable(internalParameters, null, enumerable, prefer);
            final Properties properties = new Properties();
            properties.put("caseSensitive", "true");
            Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
            CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
            DataContext dataContext = DataContexts.of(calciteConnection, schemaPlus);
            Enumerable bind = bindable.bind(dataContext);
            Enumerator enumerator = bind.enumerator();
            while (enumerator.moveNext()) {
                Object current = enumerator.current();
                Object[] values = (Object[]) current;
                StringBuilder sb = new StringBuilder();
                for (Object v : values) {
                    sb.append(v).append(",");
                }
                sb.setLength(sb.length() - 1);
                System.out.println(sb);
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    schema

    public class CsvSchema extends AbstractSchema {
        @Override
        protected Map<String, Table> getTableMap() {
            return createTableMap();
        }
    
        private Map<String, Table> createTableMap() {
            HashMap<String, Table> hashMap = new HashMap<>();
            File file = new File(this.getClass().getClassLoader().getResource("sales/DEPTS.csv").getPath());
            Source sourceDepts = Sources.of(file);
            hashMap.put("DEPTS", new CsvScannableTable(sourceDepts, null));
            return hashMap;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    CsvTable

    public class CsvScannableTable  extends CsvTable
            implements ScannableTable {
    
        CsvScannableTable(Source source, RelProtoDataType protoRowType) {
            super(source, protoRowType);
        }
    
        @Override public String toString() {
            return "CsvScannableTable";
        }
    
        @Override public Enumerable< Object[]> scan(DataContext root) {
            JavaTypeFactory typeFactory = root.getTypeFactory();
            final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
            final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
            return new AbstractEnumerable<@Nullable Object[]>() {
                @Override
                public Enumerator<@Nullable Object[]> enumerator() {
                    try {
                        return new DemoCsvEnumerator(source, fieldTypes, fields);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    DemoCsvEnumerator

    
    public class DemoCsvEnumerator<E> implements Enumerator<E> {
    
        private final CSVReader reader;
    
        private final DemoCsvEnumerator.RowConverter<E> rowConverter;
        private final @Nullable List<@Nullable String> filterValues;
    
        private @Nullable E current;
    
        public DemoCsvEnumerator(Source source,
                                 List<RelDataType> fieldTypes, List<Integer> fields) throws IOException {
            //noinspection unchecked
            this(source, null,
                    (DemoCsvEnumerator.RowConverter<E>) converter(fieldTypes, fields));
        }
    
        private static DemoCsvEnumerator.RowConverter<?> converter(List<RelDataType> fieldTypes,
                                                                   List<Integer> fields) {
            return new DemoCsvEnumerator.ArrayRowConverter(fieldTypes, fields);
        }
    
    
        public DemoCsvEnumerator(Source source,
                                 @Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) throws IOException {
            this.rowConverter = rowConverter;
            this.filterValues = filterValues == null ? null
                    : ImmutableNullableList.copyOf(filterValues);
            this.reader = new CSVReader(source.reader());
            this.reader.readNext(); // skip header row
        }
    
    
        abstract static class RowConverter<E> {
            abstract E convertRow(@Nullable String[] rows);
    
            @SuppressWarnings("JavaUtilDate")
            protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
                if (fieldType == null || string == null) {
                    return string;
                }
                switch (fieldType.getSqlTypeName()) {
                    case INTEGER:
                        if (string.length() == 0) {
                            return null;
                        }
                        return Integer.parseInt(string);
                    case VARCHAR:
                    default:
                        return string;
                }
            }
        }
    
    
        public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
                                                Source source, @Nullable List<RelDataType> fieldTypes, Boolean stream) {
    
            final List<RelDataType> types = new ArrayList<>();
            final List<String> names = new ArrayList<>();
    
            if (fieldTypes != null) {
                fieldTypes.add(toNullableRelDataType(typeFactory, SqlTypeName.INTEGER));
                fieldTypes.add(toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR));
            }
    
    
            types.add(toNullableRelDataType(typeFactory, SqlTypeName.INTEGER));
            types.add(toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR));
            names.add("DEPTNO");
            names.add("NAME");
            return typeFactory.createStructType(Pair.zip(names, types));
        }
    
        private static RelDataType toNullableRelDataType(JavaTypeFactory typeFactory,
                                                         SqlTypeName sqlTypeName) {
            return typeFactory.createTypeWithNullability(typeFactory.createSqlType(sqlTypeName), true);
        }
    
    
        @Override
        public E current() {
            return castNonNull(current);
        }
    
        @Override
        public boolean moveNext() {
            try {
                final String[] strings = reader.readNext();
                if (strings == null) {
                    current = null;
                    reader.close();
                    return false;
                }
                current = rowConverter.convertRow(strings);
                return true;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    
        static class ArrayRowConverter extends RowConverter<@Nullable Object[]> {
    
            /**
             * Field types. List must not be null, but any element may be null.
             */
            private final List<RelDataType> fieldTypes;
            private final ImmutableIntList fields;
    
    
            ArrayRowConverter(List<RelDataType> fieldTypes, List<Integer> fields) {
                this.fieldTypes = ImmutableNullableList.copyOf(fieldTypes);
                this.fields = ImmutableIntList.copyOf(fields);
            }
    
            @Override
            public @Nullable Object[] convertRow(@Nullable String[] strings) {
                return convertNormalRow(strings);
            }
    
            public @Nullable Object[] convertNormalRow(@Nullable String[] strings) {
                final @Nullable Object[] objects = new Object[fields.size()];
                for (int i = 0; i < fields.size(); i++) {
                    int field = fields.get(i);
                    objects[i] = convert(fieldTypes.get(field), strings[field]);
                }
                return objects;
            }
        }
    
        @Override
        public void reset() {
            throw new UnsupportedOperationException();
        }
    
        @Override
        public void close() {
            try {
                reader.close();
            } catch (IOException e) {
                throw new RuntimeException("Error closing CSV reader", e);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145

    Pom.xml

    <dependencies>
            <dependency>
                <groupId>org.apache.calcite</groupId>
                <artifactId>calcite-core</artifactId>
                <version>1.30.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.calcite</groupId>
                <artifactId>calcite-file</artifactId>
                <version>1.30.0</version>
            </dependency>
    
            <dependency>
                <groupId>net.sf.opencsv</groupId>
                <artifactId>opencsv</artifactId>
                <version>2.3</version>
            </dependency>
        </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    测试

    数据格式

    DEPTNO:int,NAME:string
    10,“Sales”
    20,“Marketing”
    30,“Accounts”

    结果
    在这里插入图片描述

  • 相关阅读:
    史上最方便的Linux教程
    每日一题 2511. 最多可以摧毁的敌人城堡数目
    我们该如何运营Facebook账号呢?
    谐波减速机轻量组合型在工业机器人中的应用
    鸿蒙项目实战-月木学途:1.编写首页,包括搜索栏、轮播图、宫格
    MATLAB中Stem3函数用法
    不会代码的时候,如何使用Jmeter完成接口测试?
    【Java 进阶篇】JDBC 数据库连接池详解
    151. 关于 SAP UI5 XML 视图里控件事件处理函数名称中的 . (点号) 问题的讨论
    分享5款同类软件中的翘楚,属于是WIN10必备良品
  • 原文地址:https://blog.csdn.net/qq_22222499/article/details/126843383