• springboot整合kettle和xxljob


    kettle 9.2.0.0-290

    入门可以了解一下

    本文只弄了ktr文件的执行,其他文件类似

    先说一下分工springboot不用说

    kettle主要是先配置出来ktr,配置完之后建议 先执行通过,确保ktr能正常使用

    然后通过执行ktr把数据读取,处理,和转换到指定库或者文件库都行

    xxljob主要起到定时任务的作用,创建定时任务按照设定规则执行Handler

    代码不多先上pom.xml 在调试过程中遇到各种坑,总之就是缺少jar,  jar文件绑定资源里下载

    1. 9.2.0.0-290
    2. c:\work\a\b\c
    3. pentaho-kettle
    4. kettle-core
    5. ${kettle-version}
    6. system
    7. ${kettle-lib-path}\lib\kettle-core-9.2.0.0-290.jar
    8. pentaho-kettle
    9. kettle-dbdialog
    10. ${kettle-version}
    11. system
    12. ${kettle-lib-path}\lib\kettle-dbdialog-9.2.0.0-290.jar
    13. pentaho-kettle
    14. kettle-engine
    15. ${kettle-version}
    16. system
    17. ${kettle-lib-path}\lib\kettle-engine-9.2.0.0-290.jar
    18. pentaho
    19. metastore
    20. ${kettle-version}
    21. system
    22. ${kettle-lib-path}\lib\metastore-9.2.0.0-290.jar
    23. org.pentaho
    24. pentaho-encryption-support
    25. ${kettle-version}
    26. system
    27. ${kettle-lib-path}\lib\pentaho-encryption-support-9.2.0.0-290.jar
    28. pentaho
    29. pentaho-vfs-browser
    30. ${kettle-version}
    31. system
    32. ${kettle-lib-path}\lib\pentaho-vfs-browser-9.2.0.0-290.jar
    33. org.apache.commons
    34. commons-vfs2
    35. 2.8.0 system
    36. ${kettle-lib-path}\lib\commons-vfs2-2.8.0.jar
    37. commons-lang
    38. commons-lang
    39. 2.6

    java代码创建一个类JobKettleHandler 

    1. package aaa.bbb.ccc.kettle;
    2. import com.xxl.job.core.handler.annotation.XxlJob;
    3. import lombok.extern.slf4j.Slf4j;
    4. import org.pentaho.di.core.KettleEnvironment;
    5. import org.pentaho.di.core.exception.KettleException;
    6. import org.pentaho.di.core.exception.KettleMissingPluginsException;
    7. import org.pentaho.di.core.exception.KettleXMLException;
    8. import org.pentaho.di.trans.Trans;
    9. import org.pentaho.di.trans.TransMeta;
    10. import org.springframework.beans.factory.annotation.Value;
    11. import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    12. import org.springframework.stereotype.Component;
    13. import javax.annotation.Resource;
    14. import java.io.File;
    15. import java.util.Arrays;
    16. @Component
    17. @Slf4j
    18. public class JobKettleHandler {
    19. //ktr源文件的位置
    20. @Value("${kettle.ktr.path}")
    21. private String dirPath;
    22. @Resource(name = "ThreadPoolTaskExecutor")
    23. ThreadPoolTaskExecutor runRunnerTaskExecutor;
    24. @XxlJob("runRunRun")
    25. public void runRunRun() {
    26. File file = new File(dirPath);
    27. File[] files = file.listFiles();
    28. ;
    29. log.info("要执行的文件:");
    30. Arrays.stream(files).forEach(i -> {
    31. if (i.getName().substring(i.getName().length() - 3).equals("ktr")) {
    32. log.info("{}", i.getName());
    33. }
    34. });
    35. Arrays.stream(files).parallel().forEach(f -> {
    36. if (f.getName().substring(f.getName().length() - 3).equals("ktr")) {
    37. runRunnerTaskExecutor.execute(() -> {
    38. TransMeta transMeta = null;
    39. Trans trans = null;
    40. try {
    41. KettleEnvironment.init();
    42. transMeta = new TransMeta(dirPath + "\\" + f.getName());
    43. trans = new Trans(transMeta);
    44. log.info("开始执行[{}]文件", f.getName());
    45. trans.execute(null);
    46. trans.waitUntilFinished();
    47. if (trans.getErrors() > 0) {
    48. System.out.println("转换执行失败");
    49. } else {
    50. System.out.println("转换执行成功");
    51. }
    52. } catch (KettleXMLException e) {
    53. log.error("执行[{}]报错,错误原因1:{}", f.getName(), e.getMessage(), e);
    54. e.printStackTrace();
    55. } catch (KettleMissingPluginsException e) {
    56. log.error("执行[{}]报错,错误原因2:{}", f.getName(), e.getMessage(), e);
    57. e.printStackTrace();
    58. } catch (KettleException e) {
    59. log.error("执行[{}]报错,错误原因3:{}", f.getName(), e.getMessage(), e);
    60. e.printStackTrace();
    61. } finally {
    62. trans.cleanup();
    63. KettleEnvironment.shutdown();
    64. }
    65. });
    66. }
    67. });
    68. }
    69. }
    ThreadPoolTaskExecutor是多线程处理,一个ktr启用一个线程,不用的可以摘除。

    比如说有是个ktr需要执行,我们创建一个job就行。job跑的时候指定runRunRun.会在job里开启多个线程

    只是记录一下,有不同见解的可以评论

  • 相关阅读:
    Java泛型
    gcc编译选项
    息肉分割(Polyp Segmentation)方向常用数据集汇总
    SpringCloud - 服务调用
    部署支持使用Redis哨兵模式,支持纳管ClickHouse数据库,JumpServer堡垒机v2.28.0发布
    Java--集合框架详解
    【货干】IP 配置出现意外。
    web网页设计期末课程大作业:美食餐饮文化主题网站设计——中华美德6页面HTML+CSS+JavaScript
    ZooKeeper+kafka消息队列群集部署
    Frida系列--Stalker原理分析
  • 原文地址:https://blog.csdn.net/panbingcan272007258/article/details/134445615