通过calcite,使用sql来读取csv数据
本文争取利用最少的代码来实现此功能,理解calcite是如何工作的
SqlParser parser = SqlParser.create("select * from DEPTS where DEPTNO>=20 ", SqlParser.Config.DEFAULT);
// 解析SQL字符串, 生成SqlNode树
SqlNode sqlNode = parser.parseStmt();
System.out.println(sqlNode);
结果 SqlNode
SELECT *
FROM DEPTS
WHERE DEPTNO
>= 20
CsvSchema csvSchema = new CsvSchema();
CalciteSchema rootSchema = CalciteSchema.createRootSchema(true, false);
rootSchema.add("test_schema", csvSchema);
SchemaPlus schemaPlus = Frameworks.createRootSchema(false);
schemaPlus.add("test_schema", csvSchema);
CalciteConnectionConfigImpl config = new CalciteConnectionConfigImpl(new Properties());
RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl();
Prepare.CatalogReader catalogReader = new CalciteCatalogReader(rootSchema, Collections.singletonList("test_schema"), typeFactory, config);
SqlValidator.Config validatorConfig = SqlValidator.Config.DEFAULT.withLenientOperatorLookup(config.lenientOperatorLookup()).withSqlConformance(config.conformance()).withDefaultNullCollation(config.defaultNullCollation()).withIdentifierExpansion(true);
SqlValidator validator = SqlValidatorUtil.newValidator(SqlStdOperatorTable.instance(), catalogReader, typeFactory, validatorConfig);
// 执行SQL验证
SqlNode validateSqlNode = validator.validate(sqlNode);
System.out.println(sqlNode);
输出 SqlNode
SELECT DEPTS
.DEPTNO
, DEPTS
.NAME
FROM test_schema
.DEPTS
AS DEPTS
WHERE DEPTS
.DEPTNO
>= 20
VolcanoPlanner planner = new VolcanoPlanner(RelOptCostImpl.FACTORY, Contexts.of(config));
planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
// 创建SqlToRelConverter
RelOptCluster cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
SqlToRelConverter.Config converterConfig = SqlToRelConverter.config().withTrimUnusedFields(true).withExpand(false);
SqlToRelConverter converter = new SqlToRelConverter(null, validator, catalogReader, cluster, StandardConvertletTable.INSTANCE, converterConfig);
// 将SqlNode树转化为RelNode树
RelRoot relNode = converter.convertQuery(validateSqlNode, false, true);
System.out.println(relNode);
输出
Root {kind: SELECT, rel: rel#3:LogicalProject.NONE(input=LogicalFilter#2,inputs=0…1), rowType: RecordType(INTEGER DEPTNO, VARCHAR NAME), fields: [<0, DEPTNO>, <1, NAME>], collation: []}
// 规则
RuleSet rules = RuleSets.ofList(CoreRules.FILTER_TO_CALC, CoreRules.PROJECT_TO_CALC, CoreRules.FILTER_CALC_MERGE, CoreRules.PROJECT_CALC_MERGE, CoreRules.FILTER_INTO_JOIN, // 过滤谓词下推到Join之前
EnumerableRules.ENUMERABLE_TABLE_SCAN_RULE, EnumerableRules.ENUMERABLE_PROJECT_TO_CALC_RULE, EnumerableRules.ENUMERABLE_FILTER_TO_CALC_RULE, EnumerableRules.ENUMERABLE_JOIN_RULE, EnumerableRules.ENUMERABLE_SORT_RULE, EnumerableRules.ENUMERABLE_CALC_RULE, EnumerableRules.ENUMERABLE_AGGREGATE_RULE);
Program program = Programs.of(RuleSets.ofList(rules));
RelNode optimizerRelTree = program.run(planner, relNode.rel, relNode.rel.getTraitSet().plus(EnumerableConvention.INSTANCE), Collections.emptyList(), Collections.emptyList());
System.out.println(optimizerRelTree);
// 生成物理执行计划
EnumerableRel enumerable = (EnumerableRel) optimizerRelTree;
Map<String, Object> internalParameters = new LinkedHashMap<>();
EnumerableRel.Prefer prefer = EnumerableRel.Prefer.ARRAY;
Bindable bindable = EnumerableInterpretable.toBindable(internalParameters, null, enumerable, prefer);
final Properties properties = new Properties();
properties.put("caseSensitive", "true");
Connection connection = DriverManager.getConnection("jdbc:calcite:", properties);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
DataContext dataContext = DataContexts.of(calciteConnection, schemaPlus);
Enumerable bind = bindable.bind(dataContext);
Enumerator enumerator = bind.enumerator();
while (enumerator.moveNext()) {
Object current = enumerator.current();
Object[] values = (Object[]) current;
StringBuilder sb = new StringBuilder();
for (Object v : values) {
sb.append(v).append(",");
}
sb.setLength(sb.length() - 1);
System.out.println(sb);
}
public class CsvSchema extends AbstractSchema {
@Override
protected Map<String, Table> getTableMap() {
return createTableMap();
}
private Map<String, Table> createTableMap() {
HashMap<String, Table> hashMap = new HashMap<>();
File file = new File(this.getClass().getClassLoader().getResource("sales/DEPTS.csv").getPath());
Source sourceDepts = Sources.of(file);
hashMap.put("DEPTS", new CsvScannableTable(sourceDepts, null));
return hashMap;
}
}
public class CsvScannableTable extends CsvTable
implements ScannableTable {
CsvScannableTable(Source source, RelProtoDataType protoRowType) {
super(source, protoRowType);
}
@Override public String toString() {
return "CsvScannableTable";
}
@Override public Enumerable< Object[]> scan(DataContext root) {
JavaTypeFactory typeFactory = root.getTypeFactory();
final List<RelDataType> fieldTypes = getFieldTypes(typeFactory);
final List<Integer> fields = ImmutableIntList.identity(fieldTypes.size());
return new AbstractEnumerable<@Nullable Object[]>() {
@Override
public Enumerator<@Nullable Object[]> enumerator() {
try {
return new DemoCsvEnumerator(source, fieldTypes, fields);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
};
}
}
public class DemoCsvEnumerator<E> implements Enumerator<E> {
private final CSVReader reader;
private final DemoCsvEnumerator.RowConverter<E> rowConverter;
private final @Nullable List<@Nullable String> filterValues;
private @Nullable E current;
public DemoCsvEnumerator(Source source,
List<RelDataType> fieldTypes, List<Integer> fields) throws IOException {
//noinspection unchecked
this(source, null,
(DemoCsvEnumerator.RowConverter<E>) converter(fieldTypes, fields));
}
private static DemoCsvEnumerator.RowConverter<?> converter(List<RelDataType> fieldTypes,
List<Integer> fields) {
return new DemoCsvEnumerator.ArrayRowConverter(fieldTypes, fields);
}
public DemoCsvEnumerator(Source source,
@Nullable String @Nullable [] filterValues, RowConverter<E> rowConverter) throws IOException {
this.rowConverter = rowConverter;
this.filterValues = filterValues == null ? null
: ImmutableNullableList.copyOf(filterValues);
this.reader = new CSVReader(source.reader());
this.reader.readNext(); // skip header row
}
abstract static class RowConverter<E> {
abstract E convertRow(@Nullable String[] rows);
@SuppressWarnings("JavaUtilDate")
protected @Nullable Object convert(@Nullable RelDataType fieldType, @Nullable String string) {
if (fieldType == null || string == null) {
return string;
}
switch (fieldType.getSqlTypeName()) {
case INTEGER:
if (string.length() == 0) {
return null;
}
return Integer.parseInt(string);
case VARCHAR:
default:
return string;
}
}
}
public static RelDataType deduceRowType(JavaTypeFactory typeFactory,
Source source, @Nullable List<RelDataType> fieldTypes, Boolean stream) {
final List<RelDataType> types = new ArrayList<>();
final List<String> names = new ArrayList<>();
if (fieldTypes != null) {
fieldTypes.add(toNullableRelDataType(typeFactory, SqlTypeName.INTEGER));
fieldTypes.add(toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR));
}
types.add(toNullableRelDataType(typeFactory, SqlTypeName.INTEGER));
types.add(toNullableRelDataType(typeFactory, SqlTypeName.VARCHAR));
names.add("DEPTNO");
names.add("NAME");
return typeFactory.createStructType(Pair.zip(names, types));
}
private static RelDataType toNullableRelDataType(JavaTypeFactory typeFactory,
SqlTypeName sqlTypeName) {
return typeFactory.createTypeWithNullability(typeFactory.createSqlType(sqlTypeName), true);
}
@Override
public E current() {
return castNonNull(current);
}
@Override
public boolean moveNext() {
try {
final String[] strings = reader.readNext();
if (strings == null) {
current = null;
reader.close();
return false;
}
current = rowConverter.convertRow(strings);
return true;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
static class ArrayRowConverter extends RowConverter<@Nullable Object[]> {
/**
* Field types. List must not be null, but any element may be null.
*/
private final List<RelDataType> fieldTypes;
private final ImmutableIntList fields;
ArrayRowConverter(List<RelDataType> fieldTypes, List<Integer> fields) {
this.fieldTypes = ImmutableNullableList.copyOf(fieldTypes);
this.fields = ImmutableIntList.copyOf(fields);
}
@Override
public @Nullable Object[] convertRow(@Nullable String[] strings) {
return convertNormalRow(strings);
}
public @Nullable Object[] convertNormalRow(@Nullable String[] strings) {
final @Nullable Object[] objects = new Object[fields.size()];
for (int i = 0; i < fields.size(); i++) {
int field = fields.get(i);
objects[i] = convert(fieldTypes.get(field), strings[field]);
}
return objects;
}
}
@Override
public void reset() {
throw new UnsupportedOperationException();
}
@Override
public void close() {
try {
reader.close();
} catch (IOException e) {
throw new RuntimeException("Error closing CSV reader", e);
}
}
}
<dependencies>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.30.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-file</artifactId>
<version>1.30.0</version>
</dependency>
<dependency>
<groupId>net.sf.opencsv</groupId>
<artifactId>opencsv</artifactId>
<version>2.3</version>
</dependency>
</dependencies>
数据格式
DEPTNO:int,NAME:string
10,“Sales”
20,“Marketing”
30,“Accounts”
结果