• Flink 基础 -- 应用开发(项目配置)


    1、概述

    本节中的指南将向您展示如何通过流行的构建工具(Maven, Gradle)配置项目,添加必要的依赖项(即连接器和格式测试),并涵盖一些高级配置主题。

    每个Flink应用程序都依赖于一组Flink库。至少,应用程序依赖于Flink api,此外,还依赖于某些连接器库(如Kafka, Cassandra)和第三方依赖,用户需要开发自定义函数来处理数据。

    1.1 开始进行

    要开始使用Flink应用程序,请使用以下命令、脚本和模板来创建Flink项目。

    Maven

    您可以使用下面的Maven命令基于 Archetype创建一个项目,或者使用提供的快速入门bash脚本。

    所有Flink Scala api都已弃用,并将在未来的Flink版本中删除。您仍然可以在Scala中构建应用程序,但是应该使用Java版本的DataStream和/或Table API。
    See FLIP-265 Deprecate and remove Scala API support

    Maven command

    $ mvn archetype:generate                \
      -DarchetypeGroupId=org.apache.flink   \
      -DarchetypeArtifactId=flink-quickstart-java \
      -DarchetypeVersion=1.18.0
    
    • 1
    • 2
    • 3
    • 4

    这允许您命名新创建的项目,并将交互地询问您的groupId、artifactId和包名。

    Quickstart script

    $ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.18.0
    
    • 1

    1.2 您需要哪些依赖项?

    要开始处理Flink作业,通常需要以下依赖项:

    除此之外,您可能还需要添加开发自定义功能所需的第三方依赖项。

    1.3 Flink APIs

    Flink提供了两个主要的API:数据流API表API & SQL。它们可以单独使用,也可以混合使用,这取决于你的用例:

    想要使用的APIs需要添加的依赖项
    DataStreamflink-streaming-java
    DataStream with Scalaflink-streaming-scala_2.12
    Table APIflink-table-api-java
    Table API with Scalaflink-table-api-scala_2.12
    Table API + DataStreamflink-table-api-java-bridge
    Table API + DataStream with Scalaflink-table-api-scala-bridge_2.12

    1.3 运行和打包

    如果您希望通过简单地执行主类来运行作业,则需要在类路径中使用flink-clients。对于Table API程序,您还需要flink-table-runtimeflink-table-planner-loader

    根据经验,我们建议将应用程序代码及其所需的所有依赖项打包到一个fat/uber JAR中。这包括作业的打包连接器、格式和第三方依赖项。此规则不适用于 Java APIs、DataStream Scala api和前面提到的运行时模块,这些模块已经由Flink自己提供,不应该包含在job uber JAR中。这个作业JAR可以提交到已经运行的Flink集群,或者添加到Flink应用程序容器映像中,而无需修改发行版。

    1.4 What’s next?

    • 要开始开发您的工作,请查看数据流API和表API & SQL
    • 有关如何根据构建工具打包作业的详细信息,请查看以下特定指南
      Maven
      Gradle
    • 有关项目配置的更高级主题,请查看高级主题部分。

    2、如何使用Maven来配置您的项目

    本指南将向您展示如何使用Maven配置Flink作业项目(Flink job project),Maven是由Apache Software Foundation开发的开源构建自动化工具,使您能够构建、发布和部署项目。您可以使用它来管理软件项目的整个生命周期。

    2.1 要求

    • Maven 3.8.6 (recommended or higher)
    • Java 8 (deprecated) or Java 11

    2.2 将项目导入到IDE中

    一旦创建了项目文件夹和文件,我们建议您将该项目导入到IDE中进行开发和测试。

    IntelliJ IDEA支持开箱即用的Maven项目。Eclipse提供了m2e插件导入Maven项目

    注意:对于Flink来说,Java的默认JVM堆大小可能太小,您必须手动增加它。在Eclipse中,选择“Run Configurations -> Arguments”,在“VM Arguments”中输入“-Xmx800m”。在IntelliJ IDEA中,建议从Help |编辑自定义虚拟机选项菜单中更改JVM选项。有关详细信息,请参阅本文

    关于IntelliJ的注意事项:要使应用程序在IntelliJ IDEA中运行,必须在运行配置中勾选Include dependencies with "Provided" scope框。如果这个选项不可用(可能是由于使用较旧的IntelliJ IDEA版本),那么一个变通方法是创建一个调用应用程序的main()方法的测试。

    2.3 构建项目

    如果你想构建/打包你的项目,导航到你的项目目录并运行mvn clean package’ 命令。您将在这里找到一个JAR文件,其中包含您的应用程序(以及您可能作为依赖项添加到应用程序中的连接器和库):target/-.jar

    注意:如果您使用与DataStreamJob不同的类作为应用程序的主类/入口点,我们建议您相应地更改pom.xml文件中的mainClass设置,以便Flink可以从JAR文件运行应用程序,而无需额外指定主类。

    2.4 向项目添加依赖项

    在项目目录中打开pom.xml文件,并在dependencies 选项卡之间添加依赖项

    例如,你可以像这样添加Kafka连接器作为依赖项:

    <dependencies>
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-connector-kafkaartifactId>
            <version>1.18.0version>
        dependency>
        
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    然后在命令行上执行mvn install
    Java Project Template Scala Project TemplateGradle创建的项目被配置为在运行mvn clean package时自动将应用程序依赖项包含到应用程序JAR中。对于没有从这些模板中设置的项目,我们建议添加Maven Shade Plugin来构建包含所有必需依赖项的应用程序jar。

    重要:请注意,所有这些核心API依赖项都应该将它们的作用域设置为 provided。这意味着需要对它们进行编译,但是不应该将它们打包到项目的最终应用程序JAR文件中。如果没有设置为provided,最好的情况是最终的JAR变得过大,因为它还包含所有Flink核心依赖项。最坏的情况是,添加到应用程序JAR文件中的Flink核心依赖项与您自己的一些依赖项版本发生冲突(通常可以通过反向类加载来避免这种情况)。

    要正确地将依赖项打包到应用程序JAR中,必须将Flink API依赖项设置为compile 范围。

    2.5 打包应用程序

    根据您的用例,在将Flink应用程序部署到Flink环境之前,可能需要以不同的方式对其进行打包。

    如果你想为Flink Job创建一个JAR,并且只使用Flink依赖关系而不使用任何第三方依赖关系(即使用JSON格式的文件系统连接器),你不需要创建一个uber/fat JAR或遮挡任何依赖关系。

    如果您想为Flink Job创建一个JAR,并使用Flink发行版中没有内置的外部依赖项,您可以将它们添加到发行版的类路径中,或者将它们隐藏到您的uber/fat应用程序JAR中。

    有了生成的uber/fat JAR,你可以通过以下命令将其提交到本地或远程集群:

    bin/flink run -c org.example.MyJob myFatJar.jar
    
    • 1

    要了解有关如何部署Flink作业的详细信息,请查看部署指南

    2.6 用于创建带有依赖项的uber/fat JAR的模板

    要构建一个包含声明的连接器和库所需的所有依赖项的应用程序JAR,您可以使用以下插件定义:

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.pluginsgroupId>
                <artifactId>maven-shade-pluginartifactId>
                <version>3.1.1version>
                <executions>
                    <execution>
                        <phase>packagephase>
                        <goals>
                            <goal>shadegoal>
                        goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305exclude>
                                excludes>
                            artifactSet>
                            <filters>
                                <filter>
                                    
                                    <artifact>*:*artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SFexclude>
                                        <exclude>META-INF/*.DSAexclude>
                                        <exclude>META-INF/*.RSAexclude>
                                    excludes>
                                filter>
                            filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    
                                    <mainClass>my.programs.main.clazzmainClass>
                                transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                            transformers>
                        configuration>
                    execution>
                executions>
            plugin>
        plugins>
    build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    默认情况下,Maven阴影插件将包括runtimecompile范围内的所有依赖项。

    3、连接器和格式

    Flink应用程序可以通过连接器读取和写入各种外部系统。它支持多种格式,以便编码和解码数据以匹配Flink的数据结构。

    数据流Table API/SQL的可用连接器和格式概述。

    3.1 Available artifacts

    为了使用连接器和格式,您需要确保Flink能够访问实现它们的构件。对于Flink社区支持的每个连接器,我们在Maven Central上发布两个工件:

    • flink-connector- 它是一个瘦JAR,只包括连接器代码,但不包括最终的第三方依赖
    • flink-sql-connector- 它是一个超级JAR,可以与所有连接器第三方依赖项一起使用

    这同样适用于格式(formats)。注意,有些连接器可能没有相应的flink-sql-connector-构件,因为它们不需要第三方依赖项。

    uber/fat jar主要用于与SQL客户端一起使用,但您也可以在任何数据流/表应用程序中使用它们。

    3.2 Using artifacts

    为了使用连接器/格式模块,你可以:

    • 在您的作业JAR中为瘦JAR及其传递依赖项 Shade
    • 在你的工作JAR中添加超级JAR
    • 将uber JAR直接拷贝到Flink发行版的/lib文件夹中

    对于shading 依赖,请查看特定的MavenGradle指南。有关Flink分布的参考,请查看Flink分布的解剖

    决定是shade uber JAR、瘦JAR还是仅仅在发行版中包含依赖取决于您和您的用例。如果您为依赖项添加阴影,您将对作业JAR中的依赖项版本有更多的控制。在对瘦JAR进行shade 的情况下,您将对传递依赖项有更多的控制,因为您可以在不更改连接器版本的情况下更改版本(允许二进制兼容性)。如果在Flink分发/lib文件夹中直接嵌入连接器uber JAR,您将能够在一个地方控制所有作业的连接器版本。

    4、测试依赖

    Flink提供了用于测试作业的实用程序,您可以将其添加为依赖项。

    4.1 DataStream API Testing

    如果要为使用DataStream API构建的作业开发测试,则需要添加以下依赖项:

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-test-utilsartifactId>
        <version>1.18.0version>
        <scope>testscope>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在各种测试实用程序中,该模块提供了MiniCluster,这是一个轻量级的可配置Flink集群,可在JUnit测试中运行,可以直接执行作业。

    有关如何使用这些实用程序的更多信息,请参阅DataStream API测试一节

    4.2 Table API Testing

    如果你想在IDE中本地测试Table API & SQL程序,除了前面提到的flink-test-utils之外,你可以添加以下依赖项:

    <dependency>
        <groupId>org.apache.flinkgroupId>
        <artifactId>flink-table-test-utilsartifactId>
        <version>1.18.0version>
        <scope>testscope>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这将自动引入查询规划器和运行时,它们分别用于规划和执行查询。

    Flink -table-test-utils模块已在Flink 1.15中引入,被认为是实验性的。

    5、高级配置主题

    5.1 Flink分布的解剖

    Flink本身由一组类和依赖项组成,这些类和依赖项构成了Flink运行时的核心,并且在启动Flink应用程序时必须出现。运行系统所需的类和依赖关系处理诸如协调、网络、检查点、故障转移、api、操作符(如窗口)、资源管理等领域。

    这些核心类和依赖项被打包在flink-dist.jar中,它可以在下载的发行版的/lib文件夹中获得,并且是基本Flink容器映像的一部分。您可以将这些依赖关系看作类似于Java的核心库,其中包含StringList之类的类。

    为了保持核心依赖尽可能小并避免依赖冲突,Flink核心依赖不包含任何连接器或库(即CEP, SQL, ML),以避免在类路径中有过多的默认类和依赖。

    Flink发行版的/lib目录还包含各种jar,其中包括常用模块,例如执行Table作业所需的所有模块以及一组连接器和格式。这些是默认加载的,可以通过从/lib文件夹中删除它们来从类路径中删除。

    Flink还在/opt文件夹下提供了额外的可选依赖项,这可以通过移动/lib文件夹中的jar来启用。

    有关类加载的更多信息,请参阅Flink中的类加载一节。

    5.2 Scala版本

    不同的Scala版本之间不是二进制兼容的。所有(传递地)依赖于Scala的Flink依赖都以其构建的Scala版本为后缀(例如:flink-streaming-scala_2.12)。

    如果你只使用Flink的Java api,你可以使用任何Scala版本。如果您正在使用Flink的Scala api,则需要选择与应用程序的Scala版本匹配的Scala版本。

    有关如何为特定的Scala版本构建Flink的详细信息,请参阅构建指南

    2.12.8之后的Scala版本与之前的2.12.x 版本不兼容。这将阻止Flink项目升级其2.12.X版本在2.12.8之后。您可以按照构建指南在本地为以后的Scala版本构建Flink。为此,您需要添加-Djapicmp.skip在构建时跳过二进制兼容性检查。

    5.3 表依赖剖析

    Flink发行版默认包含执行Flink SQL作业所需的jar(在/lib文件夹中),特别是:

    • flink-table-api-java-uber-1.18.0.jar → contains all the Java APIs
    • flink-table-runtime-1.18.0.jar → contains the table runtime
    • flink-table-planner-loader-1.18.0.jar → contains the query planner

    以前,这些jar都打包到flink-table.jar中。从Flink 1.15开始,为了允许用户将flink-table-planner-loader-1.18.0.jar flink-table-planner_2.12-1.18.0.jar交换,这个文件现在被分成三个jar。

    虽然 Table Java API构件内置于发行版中,但默认情况下不包括表Scala API构件。当使用Flink Scala API的格式和连接器时,您需要手动下载并将这些JAR包含在distribution /lib文件夹中(推荐),或者将它们打包为Flink SQL作业的uber/fat JAR中的依赖项。

    有关更多详细信息,请查看如何连接到外部系统

    Table Planner and Table Planner Loader

    从Flink 1.15开始,该分布包含两个规划器:

    • flink-table-planner_2.12-1.18.0.jar, in /opt, contains the query planner
    • flink-table-planner-loader-1.18.0.jar, loaded by default in /lib, contains the query planner hidden behind an isolated classpath (you won’t be able to address any io.apache.flink.table.planner directly)

    这两个规划器jar包含相同的代码,但是它们的打包不同。在第一种情况下,必须使用相同的Scala版本的JAR。在第二种情况下,您不需要考虑Scala,因为它隐藏在JAR中。

    默认情况下,发行版使用flink-table-planner-loader。如果需要访问和使用查询规划器的内部,可以交换jar(在distribution /lib文件夹中复制并粘贴flink-table-planner_2.12.jar)。请注意,您将被限制使用您正在使用的Flink发行版的Scala版本。

    这两个计划器不能同时存在于类路径中。如果将它们都加载到/lib中,则表作业将失败。

    在即将到来的Flink版本中,我们将停止在Flink发行版中发布flink-table-planner_2.12工件。我们强烈建议迁移作业和自定义连接器/格式以使用API模块,而不依赖于规划器内部。如果您需要来自计划器的一些功能,这些功能目前没有通过API模块公开,请打开一个票证以便与社区讨论。

    5.4 Hadoop的依赖性

    一般规则:没有必要将Hadoop依赖项直接添加到应用程序中。如果你想在Hadoop中使用Flink,你需要有一个包含Hadoop依赖项的Flink设置,而不是将Hadoop作为一个应用程序依赖项添加。换句话说,Hadoop必须依赖于Flink系统本身,而不是包含应用程序的用户代码。Flink将使用由HADOOP_CLASSPATH环境变量指定的Hadoop依赖项,可以这样设置:

    export HADOOP_CLASSPATH=`hadoop classpath`
    
    • 1

    这种设计有两个主要原因:

    • 一些Hadoop交互发生在Flink的核心,可能在用户应用程序启动之前。其中包括为检查点设置HDFS,通过Hadoop的Kerberos令牌进行身份验证,或者部署在YARN上。
    • Flink的反向类加载方法从核心依赖项中隐藏了许多传递依赖项。这不仅适用于Flink自己的核心依赖,也适用于安装过程中出现的Hadoop依赖。这样,应用程序就可以使用相同依赖项的不同版本,而不会遇到依赖冲突。当依赖树变得非常大时,这非常有用。

    如果你在IDE中开发或测试时需要Hadoop依赖项(例如,HDFS访问),你应该将这些依赖项配置为类似于依赖项的范围(例如,test provided)。

  • 相关阅读:
    “罪魁祸首”已找到,微软回应修改 MIT 开源项目作者版权声明
    亚马逊主图视频可以上传几个?有什么要求?
    asp.net水资源检测系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio
    如何安装Jmeter监控服务器资源插件(JMeterPlugins + ServerAgent 方法一)?
    Go语言Gin框架中使用MySQL数据库的三种方式
    go语言的channel笔记
    java计算机毕业设计学生成绩管理系统源程序+mysql+系统+lw文档+远程调试
    探花交友前置-dubbo
    技术Leader对下管理的法宝-SMART
    不知道图片加文字水印怎么弄?这3个方法自媒体达人必学
  • 原文地址:https://blog.csdn.net/chinusyan/article/details/134281751