• Flink RowData 与 Row 相互转化工具类


    RowData与Row区别

    (0)都代表了一条记录。都可以设置RowKind,和列数量Aritry。
    (1)RowData 属于Table API,而Row属于Stream API
    (2)RowData 属于Table内部接口,对用户不友好。而Row使用简单。
    (3)RowData 要拿到field值必须提供列索引和LogicalType类型。而Row只需要提供列名或列索引即可。

    请自己阅读注释内容。

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.table.data.GenericRowData;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.table.types.logical.LogicalType;
    import org.apache.flink.table.types.logical.LogicalTypeRoot;
    import org.apache.flink.table.types.logical.RowType;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.Preconditions;
    
    
    import java.util.*;
    import java.util.stream.Collectors;
    
    /**
     * @author: lisai
     * @create: 2023-03-15 16:51
     * @Description:
     */
    public class RowUtils {
    
        public interface TypedMapFunc<IN, OUT> extends MapFunction<IN, OUT>, ResultTypeQueryable<OUT> {
            DataType getProducedDataType();
        }
    
        public static List<RowData.FieldGetter> getRowDataFieldGetters(DataType rowDataType) {
            Preconditions.checkArgument(rowDataType.getLogicalType().getTypeRoot() == LogicalTypeRoot.ROW);
            return getRowDataFieldGetters(rowDataType.getChildren().stream().map(DataType::getLogicalType).collect(Collectors.toList()));
        }
    
        public static List<RowData.FieldGetter> getRowDataFieldGetters(RowType rowType) {
            return getRowDataFieldGetters(rowType.getFields().stream().map(RowType.RowField::getType).collect(Collectors.toList()));
        }
    
        public static List<RowData.FieldGetter> getRowDataFieldGetters(List<LogicalType> logicalTypes) {
            List<RowData.FieldGetter> fieldGetterList = new ArrayList<>();
            for (int i = 0; i < logicalTypes.size(); i++) {
                final RowData.FieldGetter fieldGetter = RowData.createFieldGetter(logicalTypes.get(i), i);
                fieldGetterList.add(fieldGetter);
            }
            return fieldGetterList;
        }
    
        public static void copyRowData(RowData input, GenericRowData output, List<RowData.FieldGetter> fieldGetters) {
            for (int i = 0; i < input.getArity() && i < output.getArity(); i++) {
                if (input instanceof GenericRowData) {
                    output.setField(i, ((GenericRowData) input).getField(i));
                } else {
                    Preconditions.checkArgument(fieldGetters != null);
                    Object value = fieldGetters.get(i).getFieldOrNull(input);
                    output.setField(i, value);
                }
            }
        }
    
        public static TypedMapFunc<RowData, Row> getRowDataToRowMapFunc(DataType rowDataType) {
            LogicalType logicalType = rowDataType.getLogicalType();
            Preconditions.checkArgument(logicalType.getTypeRoot() == LogicalTypeRoot.ROW);
            return new TypedMapFunc<RowData, Row>() {
                private RowData.FieldGetter[] fieldGetters = getRowDataFieldGetters(rowDataType).toArray(new RowData.FieldGetter[0]);
                @Override
                public TypeInformation<Row> getProducedType() {
                    RowType rowType = (RowType) logicalType;
                    List<RowType.RowField> rowFields = rowType.getFields();
                    List<DataType> rowDataTypes = rowDataType.getChildren();
                    TypeInformation<?>[] fieldTypeInfos = rowDataTypes.stream().map(t -> InternalTypeInfo.of(t.getLogicalType())).toArray(TypeInformation[]::new);
                    String[] fieldNames = rowFields.stream().map(RowType.RowField::getName).toArray(String[]::new);
                    return new RowTypeInfo(fieldTypeInfos, fieldNames);
                }
    
                @Override
                public DataType getProducedDataType() {
                    return rowDataType.bridgedTo(Row.class);
                }
    
                @Override
                public Row map(RowData rowData) throws Exception {
                    Row row = new Row(rowData.getRowKind(), rowData.getArity());
                    for (int i = 0; i < rowData.getArity(); i++) {
                        RowData.FieldGetter fieldGetter = fieldGetters[i];
                        row.setField(i, fieldGetter.getFieldOrNull(rowData));
                    }
                    return row;
                }
            };
        }
    
        public static TypedMapFunc<Row, RowData> getRowToRowRowMapFunc(DataType rowDataType) {
            Preconditions.checkArgument(rowDataType.getLogicalType().getTypeRoot() == LogicalTypeRoot.ROW);
            return new TypedMapFunc<Row, RowData>() {
                /**
                 * @Description: 注意input Row中所有的数据类型必须是Flink Table API规定的内部类型。具体参考 {@DataTypeUtils.toInternalDataType()}
                 * @param
                 * @return TypeInformation
                 */
                @Override
                public TypeInformation<RowData> getProducedType() {
                    return InternalTypeInfo.of((RowType)rowDataType.getLogicalType());
                }
    
                @Override
                public DataType getProducedDataType() {
                    return rowDataType.bridgedTo(RowData.class);
                }
    
                @Override
                public RowData map(Row row) throws Exception {
                    GenericRowData rowData = new GenericRowData(row.getKind(), row.getArity());
                    for (int i = 0; i < rowData.getArity(); i++) {
                        rowData.setField(i, row.getField(i));
                    }
                    return rowData;
                }
            };
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
  • 相关阅读:
    021_SSSS_Diffusion Models Already Have a Semantic Latent Space
    【能效管理】安科瑞远程预付费系统在江西某沃尔玛收费管理的应用
    Flask实现简单的首页登录注销逻辑
    Windows 批量部署简易脚本
    windows 下 QT Android 环境搭建(QGC 4.2.x + Qt 5.15.2)
    Lru-k在Rust中的实现及源码解析
    15 | Linux系统上安装python
    (附源码)ssm智慧社区管理系统 毕业设计 101635
    kalibr标定IMU随机变量(高斯分布)的方差
    Leetcode - 112双周赛
  • 原文地址:https://blog.csdn.net/lisacumt/article/details/132891624