在seatunnel-transforms-v2中新建数据包名,新建XXXTransform,XXXTransformConfig,XXXTransformFactory三个类
自定义转换插件功能说明
这是个适配KafkaSource的转换插件,接收到的原文格式为:
{"path":"xxx.log.gz","code":"011","cont":"{\"ID\":\"1\",\"NAME\":\"zhangsan\",\"TABLE\":\"USER\",\"create_time\":\"20230904\"}","timestamp":"20230823160246"}
需要转换为只保留cont里面的数据
{"create_time":"20230904","NAME":"zhangsan","TABLE":"USER","ID":"999"}
- env {
-
- # You can set engine configuration here STREAMING BATCH
-
- execution.parallelism = 1
-
- job.mode = "STREAMING"
-
-
-
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
-
- }
-
-
-
- source {
-
- # This is a example source plugin **only for test and demonstrate the feature source plugin**
-
- Kafka {
-
- bootstrap.servers = "xxxxx:9092"
-
- topic = "test_in2"
-
- consumer.group = "167321237613
- format="text"
- result_table_name="kafka"
- }
- }
- transform {
- ExtractFromCJ {
- source_table_name="kafka"
- result_table_name="kafka1"
- schema = {
- fields {
- NAME = "string"
- TABLE = "string"
- create_time = "string"
- ID="string"
- }
- }
- }
- }
- sink {
- kafka {
- source_table_name="kafka1"
- topic = "test_out2"
- bootstrap.servers = "xxxx:9092"
- kafka.request.timeout.ms = 60000
- semantics = EXACTLY_ONCE
- }
- }
XXXConfig代码,这个类主要用来保存transform的配置项
- package org.apache.seatunnel.transform.extract;
-
-
-
- import lombok.Getter;
-
- import lombok.Setter;
-
- import org.apache.seatunnel.api.configuration.Option;
-
- import org.apache.seatunnel.api.configuration.Options;
-
- import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
-
-
- import java.io.Serializable;
-
- import java.util.Map;
-
-
-
- @Getter
-
- @Setter
-
- public class ExtractFromCJTransformConfig implements Serializable {
-
-
-
- public static final Option<Map<String, String>> SCHEMA =
-
- Options.key("schema.fields")
-
- .mapType()
-
- .noDefaultValue()
-
- .withDescription(
-
- "Specify the field mapping relationship between input and output");
-
-
-
- private Map<String, String> fieldColumns;
-
- public static ExtractFromCJTransformConfig of(ReadonlyConfig config) {
-
- ExtractFromCJTransformConfig extractFromCJTransformConfig = new ExtractFromCJTransformConfig();
-
- Map<String, String> fieldColumns = config.get(SCHEMA);
-
- extractFromCJTransformConfig.setFieldColumns(fieldColumns);
-
- return extractFromCJTransformConfig;
-
- }
-
- }
XXXTransformFactory说明,工厂类,主要用来初始化具体的转换类
- package org.apache.seatunnel.transform.extract;
-
-
-
- import com.google.auto.service.AutoService;
-
- import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
- import org.apache.seatunnel.api.configuration.util.OptionRule;
-
- import org.apache.seatunnel.api.table.catalog.CatalogTable;
-
- import org.apache.seatunnel.api.table.connector.TableTransform;
-
- import org.apache.seatunnel.api.table.factory.Factory;
-
- import org.apache.seatunnel.api.table.factory.TableFactoryContext;
-
- import org.apache.seatunnel.api.table.factory.TableTransformFactory;
-
-
-
- @AutoService(Factory.class)
-
- public class ExtractFromCJTransformFactory implements TableTransformFactory {
-
- @Override
-
- public String factoryIdentifier() {
-
- return "ExtractFromCJ";
-
- }
-
-
-
- @Override
-
- public OptionRule optionRule() {
-
- return OptionRule.builder().optional(ExtractFromCJTransformConfig.SCHEMA).build();
-
- }
-
-
-
- @Override
-
- public TableTransform createTransform(TableFactoryContext context) {
-
- CatalogTable catalogTable = context.getCatalogTable();
-
- ReadonlyConfig options = context.getOptions();
-
- ExtractFromCJTransformConfig extractFromCJTransformConfig =
-
- ExtractFromCJTransformConfig.of(options);
-
- return () -> new ExtractFromCJTransform(extractFromCJTransformConfig, catalogTable);
-
- }
-
- }
XXXXTransform,具体的转换类,主要用于对source数据的处理,还有数据结构类型的保存
- package org.apache.seatunnel.transform.extract;
-
-
-
- import cn.hutool.core.collection.CollUtil;
-
- import cn.hutool.json.JSONObject;
-
- import cn.hutool.json.JSONUtil;
-
- import com.google.auto.service.AutoService;
-
- import lombok.NoArgsConstructor;
-
- import lombok.NonNull;
-
- import lombok.extern.slf4j.Slf4j;
-
- import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-
- import org.apache.seatunnel.api.configuration.util.ConfigValidator;
-
- import org.apache.seatunnel.api.table.catalog.CatalogTable;
-
- import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-
- import org.apache.seatunnel.api.table.catalog.Column;
-
- import org.apache.seatunnel.api.table.catalog.ConstraintKey;
-
- import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
-
- import org.apache.seatunnel.api.table.catalog.PrimaryKey;
-
- import org.apache.seatunnel.api.table.catalog.TableIdentifier;
-
- import org.apache.seatunnel.api.table.catalog.TableSchema;
-
- import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-
- import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-
- import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-
- import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-
- import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
- import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
-
-
-
- import java.util.ArrayList;
-
- import java.util.List;
-
- import java.util.stream.Collectors;
-
-
-
- @AutoService(SeaTunnelTransform.class)
-
- @NoArgsConstructor
-
- @Slf4j
-
- public class ExtractFromCJTransform extends AbstractCatalogSupportTransform {
-
-
-
- private ExtractFromCJTransformConfig config;
-
- protected SeaTunnelRowType inputRowType;
-
- @Override
-
- public String getPluginName() {
-
- return "ExtractFromCJ";
-
- }
-
-
-
- public ExtractFromCJTransform(
-
- @NonNull ExtractFromCJTransformConfig config, @NonNull CatalogTable catalogTable) {
-
- super(catalogTable);
-
- this.config = config;
-
- }
-
- @Override
-
- protected void setConfig(Config pluginConfig) {
-
- ConfigValidator.of(ReadonlyConfig.fromConfig(pluginConfig))
-
- .validate(new ExtractFromCJTransformFactory().optionRule());
-
- this.config = ExtractFromCJTransformConfig.of(ReadonlyConfig.fromConfig(pluginConfig));
-
- }
-
-
-
- @Override
-
- protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) {
-
- return inputRowType;
-
- }
-
-
-
- @Override
-
- protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) {
-
- Object content = inputRow.getFields()[0];
-
- String data = content.toString();
-
- Object[] outputDataArray = new Object[0];
-
- if (JSONUtil.isJson(data)) {
-
- JSONObject cont = JSONUtil.parseObj(data).getJSONObject("cont");
-
- if (!cont.isEmpty()) {
-
- if (!CollUtil.isEmpty(this.config.getFieldColumns())) {
-
- outputDataArray = new Object[this.config.getFieldColumns().size()];
-
- int t = 0;
-
- for (String key : this.config.getFieldColumns().keySet()) {
-
- String value = cont.getStr(key);
-
- outputDataArray[t] = value;
-
- t++;
-
- }
-
- } else {
-
- outputDataArray = new Object[1];
-
- outputDataArray[0] = JSONUtil.toJsonStr(cont);
-
- }
-
- }
-
- }
-
- SeaTunnelRow outputRow = new SeaTunnelRow(outputDataArray);
-
- outputRow.setRowKind(inputRow.getRowKind());
-
- outputRow.setTableId(inputRow.getTableId());
-
- return outputRow;
-
- }
-
-
-
- @Override
-
- protected TableSchema transformTableSchema() {
-
- List<Column> inputColumns = inputCatalogTable.getTableSchema().getColumns();
-
- List<ConstraintKey> outputConstraintKeys =
-
- inputCatalogTable.getTableSchema().getConstraintKeys().stream()
-
- .map(ConstraintKey::copy)
-
- .collect(Collectors.toList());
-
- PrimaryKey copiedPrimaryKey =
-
- inputCatalogTable.getTableSchema().getPrimaryKey() == null
-
- ? null
-
- : inputCatalogTable.getTableSchema().getPrimaryKey().copy();
-
-
-
- if (CollUtil.isEmpty(this.config.getFieldColumns())) {
-
- return TableSchema.builder()
-
- .primaryKey(copiedPrimaryKey)
-
- .columns(inputColumns)
-
- .constraintKey(outputConstraintKeys)
-
- .build();
-
- } else {
-
- List<Column> transformColumns = new ArrayList<>();
-
- for (String key : this.config.getFieldColumns().keySet()) {
-
- SeaTunnelDataType<?> dataType = CatalogTableUtil.parseDataType(this.config.getFieldColumns().get(key));
-
- transformColumns.add(PhysicalColumn.of(key, dataType, 0, true, null, null));
-
- }
-
- return TableSchema.builder()
-
- .primaryKey(copiedPrimaryKey)
-
- .columns(transformColumns)
-
- .constraintKey(outputConstraintKeys)
-
- .build();
-
- }
-
- }
-
-
-
- @Override
-
- protected TableIdentifier transformTableIdentifier() {
-
- return inputCatalogTable.getTableId().copy();
-
- }
-
- }
文中的转换实现的是AbstractCatalogSupportTransform类,Seatunel还提供SingleFieldOutputTransform和MultipleFieldOutputTransform,分别对应单字段和多字段的数据处理,具体扩展可根据需求来实现对应的类
来源消息
结果消息
以上就是对转换插件的扩展分享,有需求的小伙伴可以参考,也欢迎大家一起评论沟通~