• SeaTunnel扩展Transform插件,自定义转换插件


    代码结构

    在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类

    自定义转换插件功能说明

    这是个适配KafkaSource的转换插件,接收到的原文格式为:

    {"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}

    需要转换为只保留cont里面的数据

    {"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}

    任务配置文件

    1. env {
    2.   # You can set engine configuration here STREAMING BATCH
    3.   execution.parallelism = 1
    4.   job.mode = "STREAMING"
    5.   #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
    6. }
    7. source {
    8.   # This is a example source plugin **only for test and demonstrate the feature source plugin**
    9.    Kafka {
    10.             bootstrap.servers = "xxxxx:9092"
    11.             topic = "test_in2"
    12.             consumer.group = "167321237613
    13.             format="text"
    14.             result_table_name="kafka"
    15.         }
    16. }
    17. transform {
    18.     ExtractFromCJ {
    19.     source_table_name="kafka"
    20.     result_table_name="kafka1"
    21.     schema = {
    22.         fields {
    23.                 NAME = "string"
    24.                 TABLE = "string"
    25.                 create_time = "string"
    26.                 ID="string"
    27.             }
    28.         }
    29.     }
    30. }
    31. sink {
    32.   kafka {
    33.       source_table_name="kafka1"
    34.       topic = "test_out2"
    35.       bootstrap.servers = "xxxx:9092"
    36.       kafka.request.timeout.ms = 60000
    37.       semantics = EXACTLY_ONCE
    38.   }
    39. }

    代码说明

    XXXConfig代码,这个类主要用来保存transform的配置项

    1. package org.apache.seatunnel.transform.extract;
    2. import lombok.Getter;
    3. import lombok.Setter;
    4. import org.apache.seatunnel.api.configuration.Option;
    5. import org.apache.seatunnel.api.configuration.Options;
    6. import org.apache.seatunnel.api.configuration.ReadonlyConfig;
    7. import java.io.Serializable;
    8. import java.util.Map;
    9. @Getter
    10. @Setter
    11. public class ExtractFromCJTransformConfig implements Serializable {
    12.     public static final Option<Map<String, String>> SCHEMA =
    13.             Options.key("schema.fields")
    14.                     .mapType()
    15.                     .noDefaultValue()
    16.                     .withDescription(
    17.                             "Specify the field mapping relationship between input and output");
    18.     private Map<String, String> fieldColumns;
    19.     public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {
    20.         ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();
    21.         Map<String, String> fieldColumns = config.get(SCHEMA);
    22.         extractFromCJTransformConfig.setFieldColumns(fieldColumns);
    23.         return extractFromCJTransformConfig;
    24.     }
    25. }

    XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类

    1. package org.apache.seatunnel.transform.extract;
    2. import com.google.auto.service.AutoService;
    3. import org.apache.seatunnel.api.configuration.ReadonlyConfig;
    4. import org.apache.seatunnel.api.configuration.util.OptionRule;
    5. import org.apache.seatunnel.api.table.catalog.CatalogTable;
    6. import org.apache.seatunnel.api.table.connector.TableTransform;
    7. import org.apache.seatunnel.api.table.factory.Factory;
    8. import org.apache.seatunnel.api.table.factory.TableFactoryContext;
    9. import org.apache.seatunnel.api.table.factory.TableTransformFactory;
    10. @AutoService(Factory.class)
    11. public class ExtractFromCJTransformFactory implements TableTransformFactory {
    12.     @Override
    13.     public String factoryIdentifier() {
    14.         return  "ExtractFromCJ";
    15.     }
    16.     @Override
    17.     public OptionRule optionRule() {
    18.         return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();
    19.     }
    20.     @Override
    21.     public TableTransform createTransform(TableFactoryContext context) {
    22.         CatalogTable catalogTable = context.getCatalogTable();
    23.         ReadonlyConfig options = context.getOptions();
    24.         ExtractFromCJTransformConfig extractFromCJTransformConfig =
    25.                 ExtractFromCJTransformConfig.of(options);
    26.         return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);
    27.     }
    28. }

    XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存

    1. package org.apache.seatunnel.transform.extract;
    2. import cn.hutool.core.collection.CollUtil;
    3. import cn.hutool.json.JSONObject;
    4. import cn.hutool.json.JSONUtil;
    5. import com.google.auto.service.AutoService;
    6. import lombok.NoArgsConstructor;
    7. import lombok.NonNull;
    8. import lombok.extern.slf4j.Slf4j;
    9. import org.apache.seatunnel.api.configuration.ReadonlyConfig;
    10. import org.apache.seatunnel.api.configuration.util.ConfigValidator;
    11. import org.apache.seatunnel.api.table.catalog.CatalogTable;
    12. import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
    13. import org.apache.seatunnel.api.table.catalog.Column;
    14. import org.apache.seatunnel.api.table.catalog.ConstraintKey;
    15. import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
    16. import org.apache.seatunnel.api.table.catalog.PrimaryKey;
    17. import org.apache.seatunnel.api.table.catalog.TableIdentifier;
    18. import org.apache.seatunnel.api.table.catalog.TableSchema;
    19. import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
    20. import org.apache.seatunnel.api.table.type.SeaTunnelRow;
    21. import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
    22. import org.apache.seatunnel.api.transform.SeaTunnelTransform;
    23. import org.apache.seatunnel.shade.com.typesafe.config.Config;
    24. import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
    25. import java.util.ArrayList;
    26. import java.util.List;
    27. import java.util.stream.Collectors;
    28. @AutoService(SeaTunnelTransform.class)
    29. @NoArgsConstructor
    30. @Slf4j
    31. public class ExtractFromCJTransform extends AbstractCatalogSupportTransform {
    32.     private ExtractFromCJTransformConfig config;
    33.     protected SeaTunnelRowType inputRowType;
    34.     @Override
    35.     public String getPluginName() {
    36.         return "ExtractFromCJ";
    37.     }
    38.     public ExtractFromCJTransform(
    39.             @NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {
    40.         super(catalogTable);
    41.         this.config = config;
    42.     }
    43.     @Override
    44.     protected void setConfig(Config pluginConfig) {
    45.         ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
    46.                 .validate(new ExtractFromCJTransformFactory().optionRule());
    47.         this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
    48.     }
    49.     @Override
    50.     protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
    51.         return inputRowType;
    52.     }
    53.     @Override
    54.     protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
    55.         Object content = inputRow.getFields()[0];
    56.         String data = content.toString();
    57.         Object[] outputDataArray = new Object[0];
    58.         if (JSONUtil.isJson(data)) {
    59.             JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");
    60.             if (!cont.isEmpty()) {
    61.                 if (!CollUtil.isEmpty(this.config.getFieldColumns())) {
    62.                     outputDataArray = new Object[this.config.getFieldColumns().size()];
    63.                     int t = 0;
    64.                     for (String key : this.config.getFieldColumns().keySet()) {
    65.                         String value = cont.getStr(key);
    66.                         outputDataArray[t] = value;
    67.                         t++;
    68.                     }
    69.                 } else {
    70.                     outputDataArray = new Object[1];
    71.                     outputDataArray[0] = JSONUtil.toJsonStr(cont);
    72.                 }
    73.             }
    74.         }
    75.         SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);
    76.         outputRow.setRowKind(inputRow.getRowKind());
    77.         outputRow.setTableId(inputRow.getTableId());
    78.         return outputRow;
    79.     }
    80.     @Override
    81.     protected TableSchema transformTableSchema() {
    82.         List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();
    83.         List<ConstraintKey> outputConstraintKeys =
    84.                 inputCatalogTable.getTableSchema().getConstraintKeys().stream()
    85.                         .map(ConstraintKey::copy)
    86.                         .collect(Collectors.toList());
    87.         PrimaryKey copiedPrimaryKey =
    88.                 inputCatalogTable.getTableSchema().getPrimaryKey() == null
    89.                         ? null
    90.                         : inputCatalogTable.getTableSchema().getPrimaryKey().copy();
    91.         if (CollUtil.isEmpty(this.config.getFieldColumns())) {
    92.             return TableSchema.builder()
    93.                     .primaryKey(copiedPrimaryKey)
    94.                     .columns(inputColumns)
    95.                     .constraintKey(outputConstraintKeys)
    96.                     .build();
    97.         } else {
    98.             List<Column> transformColumns = new ArrayList<>();
    99.             for (String key : this.config.getFieldColumns().keySet()) {
    100.                 SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));
    101.                 transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));
    102.             }
    103.             return TableSchema.builder()
    104.                     .primaryKey(copiedPrimaryKey)
    105.                     .columns(transformColumns)
    106.                     .constraintKey(outputConstraintKeys)
    107.                     .build();
    108.         }
    109.     }
    110.     @Override
    111.     protected TableIdentifier transformTableIdentifier() {
    112.         return inputCatalogTable.getTableId().copy();
    113.     }
    114. }

    文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类

    执行结果

    来源消息

    结果消息

    以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~

  • 相关阅读:
    python绘制3D图表
    Linux虚拟机共享文件夹不显示问题终极解决方法
    SpringCloudAlibaba【二】整合Nacos
    树的遍历及应用(C-数据结构)
    Python学习:自动生成Mysql建表语句
    小县城蔬菜配送小程序制作全攻略
    Git:完美实现远端仓库迁移,包含提交历史
    Autoware.universe部署06:使用DBC文件进行UDP的CAN通信代码编写
    万邦京东获得JD商品详情 API 返回值说明
    【牛客-剑指offer-数据结构篇】【图解】JZ27 二叉树的镜像 Java实现
  • 原文地址:https://blog.csdn.net/u010479989/article/details/132674921