• 【源码编译】Apache SeaTunnel-Web 适配最新2.3.4版本教程


    Apache SeaTunnel新版本已经发布,感兴趣的小伙伴可以看之前版本发布的文章

    file

    本文主要给大家介绍为使用2.3.4版本的新特性,需要对Apache SeaTunnel-Web依赖的版本进行升级,而SeaTunnel2.3.4版本部分API跟之前版本不兼容,所以需要对 SeaTunnel-Web的源码进行修改适配。

    源码修改编译

    克隆SeaYunnel-Web源码到本地

      git  clone https://github.com/apache/seatunnel-web.git

      在idea中打开项目

      升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖

        2.3.3
        改为
        2.3.4
      • 1
      • 2

      因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题,所以本篇文章重点来了:我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改,修改完之后,我们就能完全适配2.3.4最新版本。

      社区推出了2.3.X及Web系列专属的社群,感兴趣的小伙伴可以加社区小助手进群。

      org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType

      public static class SeaTunnelDataTypeConvertor
              implements DataTypeConvertor> {
      
          @Override
          public SeaTunnelDataType toSeaTunnelType(String engineDataType) {
              return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();
          }
      
          @Override
          public SeaTunnelDataType toSeaTunnelType(
                  SeaTunnelDataType seaTunnelDataType, Map map)
                  throws DataTypeConvertException {
              return seaTunnelDataType;
          }
      
          @Override
          public SeaTunnelDataType toConnectorType(
                  SeaTunnelDataType seaTunnelDataType, Map map)
                  throws DataTypeConvertException {
              return seaTunnelDataType;
          }
      
          @Override
          public String getIdentity() {
              return "EngineDataTypeConvertor";
          }
      }
      // 改为
      public static class SeaTunnelDataTypeConvertor
                  implements DataTypeConvertor> {
      
              @Override
              public SeaTunnelDataType toSeaTunnelType(String s, String s1) {
                  return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();
              }
      
              @Override
              public SeaTunnelDataType toSeaTunnelType(
                      String s, SeaTunnelDataType seaTunnelDataType, Map map) {
                  return seaTunnelDataType;
              }
      
              @Override
              public SeaTunnelDataType toConnectorType(
                      String s, SeaTunnelDataType seaTunnelDataType, Map map) {
                  return seaTunnelDataType;
              }
      
              @Override
              public String getIdentity() {
                  return "EngineDataTypeConvertor";
              }
          }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52

      org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl

      public TableSchemaServiceImpl() throws IOException {
          Common.setStarter(true);
          Set pluginIdentifiers =
                  SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
          ArrayList pluginIdentifiersList = new ArrayList<>();
          pluginIdentifiersList.addAll(pluginIdentifiers);
          List pluginJarPaths =
                  new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
          //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
          if (!pluginJarPaths.isEmpty()) {
              //            List files = FileUtils.searchJarFiles(path);
              pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
              factory =
                      new DataTypeConvertorFactory(
                              new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
          } else {
              factory = new DataTypeConvertorFactory();
          }
      }
      // 改为
          public TableSchemaServiceImpl() throws IOException {
              Common.setStarter(true);
              Set pluginIdentifiers =
                      SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();
              ArrayList pluginIdentifiersList = new ArrayList<>();
              pluginIdentifiersList.addAll(pluginIdentifiers);
              List pluginJarPaths =
                      new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);
              //        Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
              if (!pluginJarPaths.isEmpty()) {
                  //            List files = FileUtils.searchJarFiles(path);
                  pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));
                  factory =
                          new DataTypeConvertorFactory(
                                  new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
              } else {
                  factory = new DataTypeConvertorFactory();
              }
          }
      
      SeaTunnelDataType dataType = convertor.toSeaTunnelType(field.getType());
      // 改为
      SeaTunnelDataType dataType =
                          convertor.toSeaTunnelType(field.getName(), field.getType());
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43

      org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel()

       public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
              Common.setDeployMode(DeployMode.CLIENT);
              JobConfig jobConfig = new JobConfig();
              jobConfig.setName(jobInstanceId + "_job");
              try {
                  SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
                  SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
                  ClientJobExecutionEnvironment jobExecutionEnv =
                          seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
                      final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
                  JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
                  jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
                  jobInstanceDao.update(jobInstance);
      
                  CompletableFuture.runAsync(
                          () -> {
                              waitJobFinish(
                                      clientJobProxy,
                                      userId,
                                      jobInstanceId,
                                      Long.toString(clientJobProxy.getJobId()),
                                      seaTunnelClient);
                          });
      
              } catch (ExecutionException | InterruptedException e) {
                  ExceptionUtils.getMessage(e);
                  throw new RuntimeException(e);
              }
              return jobInstanceId;
          }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29

      org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl

      else if (statusList.contains("CANCELLING")) {
                  jobStatus = JobStatus.CANCELLING.name();
      // 改为
      else if (statusList.contains("CANCELING")) {
                  jobStatus = JobStatus.CANCELING.name();
      • 1
      • 2
      • 3
      • 4

      org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl

      TableFactoryContext context =
              new TableFactoryContext(
                      Collections.singletonList(table),
                      ReadonlyConfig.fromMap(config),
                      Thread.currentThread().getContextClassLoader());
      // 改为
      TableTransformFactoryContext context =
                      new TableTransformFactoryContext(
                              Collections.singletonList(table),
                              ReadonlyConfig.fromMap(config),
                              Thread.currentThread().getContextClassLoader());
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10

      org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy

      public void restoreJob(
                  @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
              SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
              JobConfig jobConfig = new JobConfig();
              jobConfig.setName(jobInstanceId + "_job");
              try {
                  seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();
              } catch (ExecutionException e) {
                  throw new RuntimeException(e);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
      }
      // 改为
      public void restoreJob(
              @NonNull String filePath, @NonNull Long jobInstanceId, @NonNull Long jobEngineId) {
              SeaTunnelClient seaTunnelClient = new SeaTunnelClient(clientConfig);
              JobConfig jobConfig = new JobConfig();
              jobConfig.setName(jobInstanceId + "_job");
              SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
              try {
                  seaTunnelClient
                      .restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId)
                      .execute();
              } catch (ExecutionException e) {
                  throw new RuntimeException(e);
              } catch (InterruptedException e) {
                  throw new RuntimeException(e);
              }
          }
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29

      org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil

      public static Map getConnectorFeatures(
              PluginType pluginType) throws IOException {
          Common.setStarter(true);
          if (!pluginType.equals(PluginType.SOURCE)) {
              throw new UnsupportedOperationException("ONLY support plugin type source");
          }
          Path path = new SeaTunnelSinkPluginDiscovery().getPluginDir();
          List factories;
          if (path.toFile().exists()) {
              List files = FileUtils.searchJarFiles(path);
              factories =
                      FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));
          } else {
              factories =
                      FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
          }
          Map featureMap = new ConcurrentHashMap<>();
          factories.forEach(
                  plugin -> {
                      if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                          TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                          PluginIdentifier info =
                                  PluginIdentifier.of(
                                          "seatunnel",
                                          PluginType.SOURCE.getType(),
                                          plugin.factoryIdentifier());
                          featureMap.put(
                                  info,
                                  new ConnectorFeature(
                                          SupportColumnProjection.class.isAssignableFrom(
                                                  tableSourceFactory.getSourceClass())));
                      }
                  });
          return featureMap;
      }
      // 改为
      
          public static Map getConnectorFeatures(
                  PluginType pluginType) {
              Common.setStarter(true);
              if (!pluginType.equals(PluginType.SOURCE)) {
                  throw new UnsupportedOperationException("ONLY support plugin type source");
              }
      
              ArrayList pluginIdentifiers = new ArrayList<>();
              pluginIdentifiers.addAll(
                      SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());
              List pluginJarPaths =
                      new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);
      
              List factories;
              if (!pluginJarPaths.isEmpty()) {
                  factories =
                          FactoryUtil.discoverFactories(
                                  new URLClassLoader(pluginJarPaths.toArray(new URL[0])));
              } else {
                  factories =
                          FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());
              }
              Map featureMap = new ConcurrentHashMap<>();
              factories.forEach(
                      plugin -> {
                          if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {
                              TableSourceFactory tableSourceFactory = (TableSourceFactory) plugin;
                              PluginIdentifier info =
                                      PluginIdentifier.of(
                                              "seatunnel",
                                              PluginType.SOURCE.getType(),
                                              plugin.factoryIdentifier());
                              featureMap.put(
                                      info,
                                      new ConnectorFeature(
                                              SupportColumnProjection.class.isAssignableFrom(
                                                      tableSourceFactory.getSourceClass())));
                          }
                      });
              return featureMap;
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68
      • 69
      • 70
      • 71
      • 72
      • 73
      • 74
      • 75
      • 76

      代码格式化

      mvn spotless:apply

        编译打包

        mvn clean package -DskipTests

          至此,seatunnel web 适配 seatunnel2.3.4版本完成,对应的安装包会在 seatunnel-web-dist/target目录下生成

          Linux部署测试

          这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南

          重要的配置项

          1、seatunnel-web数据库相关配置(application.yml) 
          用来web服务中的数据持久化
          
          2、SEATUNNEL_HOME(环境变量)
          seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器
          
          3、ST_WEB_HOME(环境变量)
          seatunnel-web会加载seatunnel-web/datasource下的插件包,这里决定了seatunnel-web支持哪些数据源的任务定义
          
          4、重要的配置文件:
          connector-datasource-mapper.yaml 
          该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息(比如是否支持多表同步、是否支持cdc等)
          hazelcast-client.yaml 
          seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互,该配置文件配置了集群节点等相关信息
          • 1
          • 2
          • 3
          • 4
          • 5
          • 6
          • 7
          • 8
          • 9
          • 10
          • 11
          • 12
          • 13

          感谢大家的阅读,希望对各位兄弟有所帮助,如果有任何疑问,欢迎来社区找我交流!

          本文由 白鲸开源科技 提供发布支持!

        • 相关阅读:
          python与pycharm如何设置文件夹为源代码根目录
          MySQL性能优化——MYSQL执行流程
          Codeforces Round 900 (Div. 3)--E. Iva & Pav(前缀和+二分)
          Pandas里的Series学习
          高效、优雅的对象copy之MapStruct入门到精通,实战踩坑版
          go safe template不转义
          一文读懂Vue.js与React.js的区别
          牛皮了,Alibaba专家甩出的MySQL笔记,看完我蒙了
          原创: 重构证据定义以消解贝叶斯确证逻辑的内在矛盾
          Java继承基础。
        • 原文地址:https://blog.csdn.net/weixin_54625990/article/details/136647358