kettle 9.2.0.0-290
入门可以了解一下
本文只弄了ktr文件的执行,其他文件类似
先说一下分工springboot不用说
kettle主要是先配置出来ktr,配置完之后建议 先执行通过,确保ktr能正常使用
然后通过执行ktr把数据读取,处理,和转换到指定库或者文件库都行
xxljob主要起到定时任务的作用,创建定时任务按照设定规则执行Handler
代码不多先上pom.xml 在调试过程中遇到各种坑,总之就是缺少jar, jar文件绑定资源里下载
-
-
9.2.0.0-290 -
c:\work\a\b\c -
-
-
-
-
-
-
pentaho-kettle -
kettle-core -
${kettle-version} -
system -
${kettle-lib-path}\lib\kettle-core-9.2.0.0-290.jar -
-
-
pentaho-kettle -
kettle-dbdialog -
${kettle-version} -
system -
${kettle-lib-path}\lib\kettle-dbdialog-9.2.0.0-290.jar -
-
-
pentaho-kettle -
kettle-engine -
${kettle-version} -
system -
${kettle-lib-path}\lib\kettle-engine-9.2.0.0-290.jar -
-
-
pentaho -
metastore -
${kettle-version} -
system -
${kettle-lib-path}\lib\metastore-9.2.0.0-290.jar -
-
-
org.pentaho -
pentaho-encryption-support -
${kettle-version} -
system -
${kettle-lib-path}\lib\pentaho-encryption-support-9.2.0.0-290.jar -
-
-
pentaho -
pentaho-vfs-browser -
${kettle-version} -
system -
${kettle-lib-path}\lib\pentaho-vfs-browser-9.2.0.0-290.jar -
-
-
org.apache.commons -
commons-vfs2 -
2.8.0 system -
${kettle-lib-path}\lib\commons-vfs2-2.8.0.jar -
-
-
commons-lang -
commons-lang -
2.6 -
java代码创建一个类JobKettleHandler
- package aaa.bbb.ccc.kettle;
-
- import com.xxl.job.core.handler.annotation.XxlJob;
- import lombok.extern.slf4j.Slf4j;
- import org.pentaho.di.core.KettleEnvironment;
- import org.pentaho.di.core.exception.KettleException;
- import org.pentaho.di.core.exception.KettleMissingPluginsException;
- import org.pentaho.di.core.exception.KettleXMLException;
- import org.pentaho.di.trans.Trans;
- import org.pentaho.di.trans.TransMeta;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.io.File;
- import java.util.Arrays;
-
- @Component
- @Slf4j
- public class JobKettleHandler {
-
- //ktr源文件的位置
- @Value("${kettle.ktr.path}")
- private String dirPath;
-
- @Resource(name = "ThreadPoolTaskExecutor")
- ThreadPoolTaskExecutor runRunnerTaskExecutor;
-
- @XxlJob("runRunRun")
- public void runRunRun() {
- File file = new File(dirPath);
- File[] files = file.listFiles();
- ;
- log.info("要执行的文件:");
- Arrays.stream(files).forEach(i -> {
- if (i.getName().substring(i.getName().length() - 3).equals("ktr")) {
- log.info("{}", i.getName());
- }
- });
- Arrays.stream(files).parallel().forEach(f -> {
- if (f.getName().substring(f.getName().length() - 3).equals("ktr")) {
- runRunnerTaskExecutor.execute(() -> {
- TransMeta transMeta = null;
- Trans trans = null;
- try {
- KettleEnvironment.init();
- transMeta = new TransMeta(dirPath + "\\" + f.getName());
- trans = new Trans(transMeta);
- log.info("开始执行[{}]文件", f.getName());
- trans.execute(null);
- trans.waitUntilFinished();
- if (trans.getErrors() > 0) {
- System.out.println("转换执行失败");
- } else {
- System.out.println("转换执行成功");
- }
- } catch (KettleXMLException e) {
- log.error("执行[{}]报错,错误原因1:{}", f.getName(), e.getMessage(), e);
- e.printStackTrace();
- } catch (KettleMissingPluginsException e) {
- log.error("执行[{}]报错,错误原因2:{}", f.getName(), e.getMessage(), e);
- e.printStackTrace();
- } catch (KettleException e) {
- log.error("执行[{}]报错,错误原因3:{}", f.getName(), e.getMessage(), e);
- e.printStackTrace();
- } finally {
- trans.cleanup();
- KettleEnvironment.shutdown();
- }
- });
- }
- });
- }
- }
ThreadPoolTaskExecutor是多线程处理,一个ktr启用一个线程,不用的可以摘除。
比如说有是个ktr需要执行,我们创建一个job就行。job跑的时候指定runRunRun.会在job里开启多个线程
只是记录一下,有不同见解的可以评论