• flink 基于flink-sql-connector-elasticsearch6二次开发思路


    flink 继承connector源码二次开发思路

    说明:其他连接器jdbc,kafka等等二次开发思路一致
    推荐:每个公司基于flink开发内部平台,一些内部的特殊场景与需求,经常需要修改源码。但是修改源码在版本更新的情况下会导致开发成本大,周期长。本方案通过继承源码的方式,通过加强,打包覆盖源码的类解决上述问题。

    1. idea 创建module项目

    在这里插入图片描述

    2. pom文件说明-以elasticsearch6为例

    2.1 参考官方提供的连接包设置

    elasticsearch6 为案例,部分pom参考官方提供的连接包,版本号对应,

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-sql-connector-elasticsearch6_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.2 maven-shade-plugin

    1. 作用:maven-shade-plugin打包
    2. 拷贝flink-sql-connector-elasticsearch6_${scala.binary.version}的所有配置,其余的配置根据自身情况做相应的配置

    2.3 elasticsearch6 二开pom配置

     <properties>
            <maven.compiler.source>${java.version}</maven.compiler.source>
            <maven.compiler.target>${java.version}</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <flink.version>1.13.1</flink.version>
            <scala.binary.version>2.11</scala.binary.version>
            <slf4j.version>1.7.15</slf4j.version>
            <sql.driver.version>8.0.21</sql.driver.version>
            <fastjson.version>1.2.75</fastjson.version>
            <google.guava.version>20.0</google.guava.version>
            <apache.commons.version>3.11</apache.commons.version>
            <cn.hutool.all.version>5.5.2</cn.hutool.all.version>
            <macasaet.version>1.5.0</macasaet.version>
            <!-- compile,provided -->
            <scope>provided</scope>
        </properties>
    
    
        <dependencies>
    
            <!-- flink -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!-- kafka -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-api</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-csv</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-json</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-common</artifactId>
                <version>${flink.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <!-- log -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
                <scope>${scope}</scope>
            </dependency>
    
            <!-- other jar -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
    
            <dependency>
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>${cn.hutool.all.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>${apache.commons.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>${google.guava.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.macasaet.fernet</groupId>
                <artifactId>fernet-java8</artifactId>
                <version>${macasaet.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.pupu.bigdata</groupId>
                <artifactId>pupu-flink-connector-common</artifactId>
                <version>1.0.0.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.10</version>
                <optional>true</optional>
                <scope>${scope}</scope>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.3.0</version>
                    <executions>
                        <execution>
                            <id>shade-flink</id>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <shadeTestJar>false</shadeTestJar>
                                <artifactSet>
                                    <includes>
                                        <include>*:*</include>
                                    </includes>
                                    <excludes>
                                        <exclude>com.carrotsearch:hppc</exclude>
                                        <exclude>com.tdunning:t-digest</exclude>
                                        <exclude>joda-time:joda-time</exclude>
                                        <exclude>net.sf.jopt-simple:jopt-simple</exclude>
                                        <exclude>org.elasticsearch:jna</exclude>
                                        <exclude>org.hdrhistogram:HdrHistogram</exclude>
                                        <exclude>org.yaml:snakeyaml</exclude>
                                    </excludes>
                                </artifactSet>
    
                                <filters>
                                    <filter>
                                        <artifact>cn.hutool:hutool-all</artifact>
                                        <excludes>
                                            <exclude>META-INF/**</exclude>
                                        </excludes>
                                    </filter>
    
                                    <filter>
                                        <artifact>cn.hutool:hutool-all</artifact>
                                        <excludes>
                                            <exclude>META-INF/**</exclude>
                                        </excludes>
                                    </filter>
    
                                    <filter>
                                        <artifact>com.alibaba:fastjson</artifact>
                                        <excludes>
                                            <exclude>META-INF/**</exclude>
                                        </excludes>
                                    </filter>
    
                                    <filter>
                                        <artifact>org.elasticsearch:elasticsearch</artifact>
                                        <excludes>
                                            <exclude>config/**</exclude>
                                            <exclude>modules.txt</exclude>
                                            <exclude>plugins.txt</exclude>
                                            <exclude>org/joda/**</exclude>
                                        </excludes>
                                    </filter>
                                    <filter>
                                        <artifact>org.elasticsearch.client:elasticsearch-rest-high-level-client</artifact>
                                        <excludes>
                                            <exclude>forbidden/**</exclude>
                                        </excludes>
                                    </filter>
                                    <filter>
                                        <artifact>org.apache.httpcomponents:httpclient</artifact>
                                        <excludes>
                                            <exclude>mozilla/**</exclude>
                                        </excludes>
                                    </filter>
                                    <filter>
                                        <artifact>org.apache.lucene:lucene-analyzers-common</artifact>
                                        <excludes>
                                            <exclude>org/tartarus/**</exclude>
                                        </excludes>
                                    </filter>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <!-- exclude Java 9 specific classes as otherwise the shade-plugin crashes -->
                                            <exclude>META-INF/versions/**</exclude>
                                            <exclude>META-INF/services/com.fasterxml.**</exclude>
                                            <exclude>META-INF/services/org.apache.lucene.**</exclude>
                                            <exclude>META-INF/services/org.elasticsearch.**</exclude>
                                            <exclude>META-INF/LICENSE.txt</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
    
                                <relocations>
                                    <!-- Force relocation of all Elasticsearch dependencies. -->
                                    <relocation>
                                        <pattern>org.apache.commons</pattern>
                                        <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.commons</shadedPattern>
                                    </relocation>
                                    <relocation>
                                        <pattern>org.apache.http</pattern>
                                        <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.http</shadedPattern>
                                    </relocation>
                                    <relocation>
                                        <pattern>org.apache.lucene</pattern>
                                        <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.apache.lucene</shadedPattern>
                                    </relocation>
                                    <relocation>
                                        <pattern>org.elasticsearch</pattern>
                                        <shadedPattern>org.apache.flink.elasticsearch6.shaded.org.elasticsearch</shadedPattern>
                                    </relocation>
                                    <relocation>
                                        <pattern>com.fasterxml.jackson</pattern>
                                        <shadedPattern>org.apache.flink.elasticsearch6.shaded.com.fasterxml.jackson</shadedPattern>
                                    </relocation>
                                </relocations>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268

    3 新增自定义连接器 my-elasticsearch6

    3.1 创建连接器类继承Elasticsearch6DynamicSinkFactory

    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.annotation.Internal;
    import org.apache.flink.configuration.ConfigOption;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.connectors.elasticsearch.table.Elasticsearch6DynamicSinkFactory;
    import org.apache.flink.streaming.connectors.elasticsearch.table.ElasticsearchOptions;
    import org.apache.flink.table.api.ValidationException;
    import org.apache.flink.table.connector.sink.DynamicTableSink;
    import org.apache.flink.table.factories.FactoryUtil;
    import pupu.flink.stream.connector.common.constants.Constant;
    import pupu.flink.stream.connector.common.https.MateHttpUtils;
    import java.util.Map;
    import java.util.Optional;
    import java.util.Set;
    
    @Slf4j
    @Internal
    public class PupuElasticsearch6DynamicSinkFactory extends Elasticsearch6DynamicSinkFactory {
    
        // 映射sql中的connector
        public static final String IDENTIFIER = "my-elasticsearch-6";
    
        @Override
        public String factoryIdentifier() {
            return IDENTIFIER;
        }
    
        // 新增需要校验的参数
        @Override
        public Set<ConfigOption<?>> requiredOptions() {
            Set<ConfigOption<?>> options = super.requiredOptions();
            return options;
        }
    
        // 可在该方法中做一些参数的特殊处理
        @Override
        public DynamicTableSink createDynamicTableSink(Context context) {
            final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
    
            // 环境参数
            final Configuration envConfig = (Configuration) context.getConfiguration();
    
            // tableSql 参数
            Map<String, String> configMap = context.getCatalogTable().getOptions();
    
    
            return super.createDynamicTableSink(context);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    3.2 tableEnv 环境参数设置

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            // tableEnv 设置换将参数
            Configuration tabConfig = tableEnv.getConfig().getConfiguration();
            // 设置参数
            tabConfig.setString(key, value);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 提交过程中获取环境参数

    3.3 获取tableEnv 环境参数

    @Override
        public DynamicTableSink createDynamicTableSink(Context context) {
        	...
            // 环境参数
            final Configuration envConfig = (Configuration) context.getConfiguration();
            ...
            return super.createDynamicTableSink(context);
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    4 配置到org.apache.flink.table.factories.Factory

    配置原因:flink 集群启动会加载META-INF.service文件夹下的org.apache.flink.table.factories.Factory,动态遍历factoryIdentifier()获取到sql中指定的连接器

    5. mainClass单元测试类的编写

    6. 应用生效方式

    1. 打包,将.jar 复制到flink集群的lib目录下
    2. 程序包pom引用当前包使得代码生效
    3. 注意:检查应用报是否重复饮用flink-sql-connector-elasticsearch6_2.11 去掉;检查flink集群lib中是否存在flink-sql-connector-elasticsearch6_2.11,去掉。不去掉可能会存在改动的代码失效,运行虚拟机使用的是官网提供的类。
  • 相关阅读:
    Nginx请求强制缓存设置
    RabbitMQ 常用运维命令
    iText实战--根据绝对位置添加内容
    Mol-Instructions:大模型赋能,药物研发新视野
    OSG笔记:osgText::Font搜索路径及开发中的应用
    Failed to execute org.scala-tools:maven-scala-plugin:2.15.2解决
    linux下使用qt+mpv调用GPU硬件解码
    详解Nacos和Eureka的区别
    立方尾不变:(BigInteger、multiply、toString()、endsWith()、String.valueOf())
    unity驱动3d模特跳舞 穿模问题 穿透
  • 原文地址:https://blog.csdn.net/qq_27242695/article/details/125482850