• springboot-集成flink最佳实践和打包部署


    引入flink依赖

    //stream api和table api
    
        org.apache.flink
        flink-table-api-java-bridge_2.11
        1.14.2
        
        provided
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    
        org.apache.flink
        flink-clients_2.11
        1.14.2
        provided
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    编写入口

    目录结构

    • com.example.demo
      • auto
        • ChildApplication
      • task
        • Task
        • AbstractTask
        • TaskManager
      • time
        • TimeSource
        • TimeTask
      • Demo2Application

    子容器初始化类

    @EnableAutoConfiguration
    public class ChildApplication {
    }
    
    • 1
    • 2
    • 3

    任务接口

    public interface Task {
        void run(String... args) throws Exception;
    }
    
    • 1
    • 2
    • 3

    抽象任务类

    @Slf4j
    public abstract class AbstractTask implements Task {
    
        @Override
        public void run(String... args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //解析spring参数
            DefaultApplicationArguments arguments = new DefaultApplicationArguments(args);
            //解析flink参数
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            //合并两种参数
            Configuration configuration = new Configuration();
            Map map = parameterTool.toMap();
            for (Map.Entry entry : map.entrySet()) {
                if (Objects.equals(entry.getValue(), "__NO_VALUE_KEY")) {
                    continue;
                }
                configuration.setString(entry.getKey(), entry.getValue());
            }
    
            Set optionNames = arguments.getOptionNames();
            for (String optionName : optionNames) {
                List optionValues = arguments.getOptionValues(optionName);
                if (CollectionUtils.isEmpty(optionValues)) {
                    continue;
                }
                configuration.setString(optionName, String.join(",", optionValues));
            }
            //设置全局参数
            env.getConfig().setGlobalJobParameters(configuration);
            //配置任务
            configTask(env, parameterTool);
            //提交任务
            JobClient jobClient = env.executeAsync(getClass().getName());
            if (jobClient instanceof WebSubmissionJobClient) {
                return;
            }
            jobClient.getJobExecutionResult()
                    .whenComplete(new BiConsumer() {
                        @Override
                        public void accept(JobExecutionResult jobExecutionResult, Throwable throwable) {
                            log.error("time {}", jobExecutionResult.getNetRuntime(TimeUnit.SECONDS));
                        }
                    });
        }
    
        public abstract void configTask(StreamExecutionEnvironment env, ParameterTool tool);
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    任务管理器

    @Slf4j
    @Service
    public class TaskManager implements CommandLineRunner {
    
        @Resource
        List taskList;
    
        @Override
        public void run(String... args) throws Exception {
            ParameterTool parameterTool = ParameterTool.fromArgs(args);
            log.info("程序参数 {}", parameterTool);
            String runTaskName = parameterTool.get("task");
            if (CollectionUtils.isEmpty(taskList) || StringUtils.isBlank(runTaskName)) {
                return;
            }
            for (Task task : taskList) {
                if (Objects.equals(runTaskName, task.getClass().getName())) {
                    task.run(args);
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    一个计时任务数据源

    @Slf4j
    @Service
    public class TimeSource extends RichSourceFunction {
    
        volatile boolean running = true;
    
        private JdbcTemplate jdbcTemplate;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            //创建一个容器,并拿到需要的bean
            List args = new LinkedList<>();
            args.add(String.format("--spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s", cls.getName() + UUID.randomUUID()));
            args.add(String.format("--spring.jmx.default-domain=%s", cls.getName() + UUID.randomUUID()));
            Configuration globalJobParameters = (Configuration) runtimeContext.getExecutionConfig().getGlobalJobParameters();
            String activeKey = "spring.profiles.active";
            String active = globalJobParameters.getString(ConfigOptions.key(activeKey).stringType().noDefaultValue());
            if (StringUtils.isNotEmpty(active)) {
                args.add(String.format("--%s=%s", activeKey, active));
            }
            ConfigurableApplicationContext applicationContext = SpringApplication.run(ChildApplication.class, args.toArray(new String[0]));
            jdbcTemplate = applicationContext.getBean(JdbcTemplate.class);
        }
    
        @Override
        public void run(SourceContext ctx) throws Exception {
            while (running) {
                Date date = DataAccessUtils.uniqueResult(jdbcTemplate.queryForList("select now()", Date.class));
                ctx.collect(date);
                TimeUnit.SECONDS.sleep(1);
            }
        }
    
        @Override
        public void cancel() {
            running = false;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    写这个数据源类花了很长时间,期间报了很多错,一直不符合预期:

    • xxx is not serializable:flink的算子可能会在不同的机器上运行,所以类信息会序列化之后传输。所以算子不能有任何不能序列化的字段(字段为null除外)
    • 有些需要的字段没有实现Serializable,但是又确实要用到,比如JdbcTemplate,如果是mybatis的话,就是各种mapper;像这些字段,只能在open方法里面初始化。有两种方法做这个初始化:一是,通过全局参数把一些连接信息传到算子,然后在open方法中初始化JdbcTemplate;二是,在open方法中重新创建一个容器,然后从容器中拿到JdbcTemplate。第一种方法,比较容易实现,但是要手动装配JdbcTemplate;第二种方法,要重新创建一个容器,装配的任务全都交给容器;想法是很nice,但在一个容器中创建另一个容器,比想象中的要复杂一些。
    • 在一个容器中初始化另一个容器:
      • 需要一个容器初始化类:因为毕竟不需要注入所有对象,所以不能用主程序启动类Demo2Application;但是又要autoconfigure里面的很多对象,所以考虑加@EnableAutoConfiguration注解,同时放入单独的auto包,避免扫到不需要的bean定义;如果需要mybatis的mapper,考虑加@MapperScan注解
      • 定义好容器初始化类之后,启动报错:Error creating bean with name ‘springApplicationAdminRegistrar’ defined in class path resource [org/springframework/boot/autoconfigure/admin/SpringApplicationAdminJmxAutoConfiguration.class]: Invocation of init method failed; nested exception is javax.management.InstanceAlreadyExistsException: org.springframework.boot:type=Admin,name=SpringApplication。看错误信息是实例重复了,这个有两种解决办法:
        • 容器初始化类直接排除掉SpringApplicationAdminJmxAutoConfiguration.class:@EnableAutoConfiguration(exclude = {SpringApplicationAdminJmxAutoConfiguration.class})
        • 子容器启动时修改spring.application.admin.jmx-name:–spring.application.admin.jmx-name=org.springframework.boot:type=Admin,name=%s
      • 再启动,还是报错:Unable to register MBean [HikariDataSource (HikariPool-2)] with key ‘dataSource’; nested exception is javax.management.InstanceAlreadyExistsException: com.zaxxer.hikari:name=dataSource,type=HikariDataSource。又是个实例重复的问题,这个问题百度了下,需要给–spring.jmx.default-domain配置个新的值:–spring.jmx.default-domain=%s
      • 再启动,子容器正常创建,程序运行发现ok
      • 打包上传flink web,提交运行,正常!

    一个计时任务

    @Slf4j
    @Service
    public class TimeTask extends AbstractTask {
    
        @Resource
        private TimeSource timeSource;
    
        @Override
        public void configTask(StreamExecutionEnvironment env, ParameterTool tool) {
            env.getConfig().setAutoWatermarkInterval(0);
            env.addSource(timeSource)
                    .setParallelism(1)
                    .print()
                    .setParallelism(1);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    主程序启动类

    @SpringBootApplication
    public class Demo2Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Demo2Application.class, args);
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    打包程序

    设置parent

    
        org.springframework.boot
        spring-boot-starter-parent
        2.7.5
         
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    直接使用spring-boot-maven-plugin?

    
      org.springframework.boot
      spring-boot-maven-plugin
    
    
    • 1
    • 2
    • 3
    • 4

    因为spring-boot-maven-plugin打包区分了main-class和start-class,打包之后main-class是org.springframework.boot.loader.JarLauncher引导类,上传到flink web执行报错。

    考虑使用maven-shade-plugin

    参考SpringBoot超详细讲解集成Flink的部署与打包方法的方法二写了一版:

    
        org.apache.maven.plugins
        maven-shade-plugin
        3.3.0
        
          
            package
            
              shade
            
            
              false
              
                
                  com.google.code.findbugs:jsr305
                  org.slf4j:*
                  log4j:*
                
              
              
                
                  *:*
                  
                    module-info.class
                    META-INF/*.SF
                    META-INF/*.DSA
                    META-INF/*.RSA
                  
                
              
              
                
                  META-INF/spring.handlers
                  reference.conf
                
                
                  META-INF/spring.factories
                
                
                  META-INF/spring.schemas
                
                
                
                  ${start-class}
                
              
            
          
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    结果报错:

    Cannot find ‘resource’ in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer

    纠结了半天,也没找到原因

    再试试maven-assembly-plugin

      
        org.apache.maven.plugins
        maven-assembly-plugin
        3.3.0
        
          
            
              ${start-class}
            
          
          
          
            jar-with-dependencies
          
        
        
          
            make-assembly
            package
            
              single
            
          
        
      
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    可以正常打包,本地也能运行,但是上传到flink web报错

    LoggerFactory is not a Logback LoggerContext but Logback is on the classpath. Either remove Logback or the competing implementation (class org.apache.logging.slf4j.Log4jLoggerFactory loaded from file:/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar)
    
    • 1

    很明显,日志相关的jar冲突了。那么问题就是怎么配置maven-assembly-plugin,打包的时候移出org.apache.logging.log4j或ch.qos.logback?这个也比较困难,需要自定义assembly.xml文件,相对来说成本比较大。

    重回maven-shade-plugin

    找到很多资料,包括flink官方的maven打包方式也是用maven-shade-plugin,所以决定还是使用maven-shade-plugin。

    那怎么解决Cannot find 'resource' in class org.apache.maven.plugins.shade.resource.ServicesResourceTransformer的问题呢?

    恰好最近在看maven pom文件的相关知识,不小心打开了spring-boot-starter-parentpluginManagement,发现里面定义很多插件,其中就包括maven-shade-plugin

    按照pom依赖的逻辑,只要在build->plugins声明maven-shade-plugin就行:

    
        org.apache.maven.plugins
        maven-shade-plugin
    
    
    • 1
    • 2
    • 3
    • 4
    mvn clean package
    
    • 1

    打包成功了!

    仔细翻看spring-boot-starter-parent声明的maven-shade-plugin,发现executions->execution->configuration->transformers的内容在spring-boot的不同版本是不同的。难怪找不到resource。

    后续打包上传到flink web,也是报日志相关的jar冲突,不过maven-shade-plugin打包排除依赖比maven-assembly-plugin简单多了。由于flink运行时包含/opt/flink/lib/log4j-slf4j-impl-2.16.0.jar,所以果断排除logback,完整plugin配置如下:

    
        org.apache.maven.plugins
        maven-shade-plugin
        
            
                
                    
                    com.google.code.findbugs:jsr305
                    ch.qos.logback:*
                
            
        
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
  • 相关阅读:
    SpringBoot文件上传
    java基于springboot+vue+elementui的校园疫情防控系统 前后端分离
    Java项目:SSM健身房俱乐部管理系统
    互联网控制报文协议ICMP(计算机网络)
    Python编程 字典的常用操作
    rocketmq
    Android热修复学习(一)
    centos journalctl日志查看
    Qt中定时器的3种实现方法
    针对应用程序依赖库漏洞的攻击
  • 原文地址:https://blog.csdn.net/ssehs/article/details/127994357