• 1.31.Flink自定义rocketmq(source/sink)+自定义redis source和sink


    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义
    1.31.1.工程结构
    1.31.2.定义pom.xml文件
    1.31.3.log4j2.properties
    1.31.4.logback.xml
    1.31.5.cache.properties
    1.31.6.project-config.properties
    1.31.7.IssueAcceptSimpleProducer.java
    1.31.8.Consumer.java
    1.31.9.DefaultTopicSelector.java
    1.31.10.SimpleTopicSelector.java
    1.31.11.TopicSelector.java
    1.31.12.KeyValueDeserializationSchema.java
    1.31.13.KeyValueSerializationSchema.java
    1.31.14.SimpleKeyValueDeserializationSchema.java
    1.31.15.SimpleKeyValueSerializationSchema.java
    1.31.16.RocketMQConfig.java
    1.31.17.RocketMQSink.java
    1.31.18.RocketMQSource.java
    1.31.19.RocketMQUtils.java
    1.31.20.RunningChecker.java
    1.31.21.DateUtils.java
    1.31.22.PropertiesUtils.java
    1.31.23.RedisUtil.java
    1.31.24.IssueConstants.java
    1.31.25.IssueAcceptRedisSink.java
    1.31.26.IssueAcceptFlinkHandlerByCustomRedisSink.java
    1.32.Flink其它案例
    1.32.1.使用DataGen生成数据
    1.32.2.使用value state进行存储临时数据

    1.31.Flink自定义rocketmq(source/sink)+自定义redis+自定义

    1.31.1.工程结构

    在这里插入图片描述

    1.31.2.定义pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
        <!-- xxxxxx实时处理 -->
        <modelVersion>4.0.0</modelVersion>
        <groupId>xxx.xxx.xxxx</groupId>
        <artifactId>indicators-real-time-handler</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <!--maven properties -->
            <maven.test.skip>true</maven.test.skip>
            <maven.javadoc.skip>true</maven.javadoc.skip>
            <!-- compiler settings properties -->
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <!--<rocketmq.version>4.7.1</rocketmq.version>-->
            <rocketmq.version>4.5.1</rocketmq.version>
            <flink.version>1.11.1</flink.version>
            <flink-connector-redis.version>1.0</flink-connector-redis.version>
            <commons-lang.version>2.5</commons-lang.version>
            <scala.binary.version>2.12</scala.binary.version>
            <junit.version>4.12</junit.version>
            <redis.version>3.3.0</redis.version>
            <slf4j.version>1.7.25</slf4j.version>
            <fastjson.version>1.2.73</fastjson.version>
            <joda-time.version>2.9.4</joda-time.version>
            <!--<hadoop.version>2.8.3</hadoop.version>-->
            <!-- 用于连接中间件团队的redis用 -->
            <tmc-version>0.6.2</tmc-version>
    
    
            <fileName>issue-handler</fileName>
            <mainClass>com.xxx.issue.flink.handler.IssueHandleFlinkHandlerByCustomRedisSink</mainClass>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>commons-cli</groupId>
                <artifactId>commons-cli</artifactId>
                <version>1.4</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
            <!--<dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>-->
    
            <!--<dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>jackson-databind</artifactId>
                        <groupId>com.fasterxml.jackson.core</groupId>
                    </exclusion>
                </exclusions>
            </dependency>-->
    
            <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
            <!--<dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>jackson-databind</artifactId>
                        <groupId>com.fasterxml.jackson.core</groupId>
                    </exclusion>
                </exclusions>
            </dependency>-->
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--
            1.compile   : 默认的scope,运行期有效,需要打入包中。
            2.provided  : 编译器有效,运行期不需要提供,不会打入包中。
            3.runtime   : 编译不需要,在运行期有效,需要导入包中。(接口与实现分离)
            4.test      : 测试需要,不会打入包中
            5.system    : 非本地仓库引入、存在系统的某个路径下的jar。(一般不使用)
            -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.3</version>
            </dependency>
    		
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j.version}</version>
                <scope>test</scope>
            </dependency>
    
            <!--<dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>${redis.version}</version>
            </dependency>-->
    
            <!--<dependency>
                <groupId>org.apache.bahir</groupId>
                <artifactId>flink-connector-redis_2.11</artifactId>
                <version>${flink-connector-redis.version}</version>
            </dependency>-->
    		
    		<dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-namesrv</artifactId>
                <version>${rocketmq.version}</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-broker</artifactId>
                <version>${rocketmq.version}</version>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>${rocketmq.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-acl</artifactId>
                <version>${rocketmq.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-common</artifactId>
                <version>${rocketmq.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>io.netty</groupId>
                        <artifactId>netty-tcnative</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>1.6.1</version>
            </dependency>
    
            <dependency>
                <groupId>commons-lang</groupId>
                <artifactId>commons-lang</artifactId>
                <version>${commons-lang.version}</version>
            </dependency>
    
            <!--test -->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <scope>test</scope>
                <version>${junit.version}</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
    
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <version>${joda-time.version}</version>
            </dependency>
    
            <!-- 使用scala编程的时候使用下面的依赖 start-->
            <!--<dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>-->
            <!-- 使用scala编程的时候使用下面的依赖 end-->
    
            <!-- kafka connector scala 2.12 -->
            <!--
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.0.0</version>
            </dependency>
            -->
    
            <!--
            <dependency>
                <groupId>org.powermock</groupId>
                <artifactId>powermock-module-junit4</artifactId>
                <version>1.5.5</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.powermock</groupId>
                <artifactId>powermock-api-mockito</artifactId>
                <version>1.5.5</version>
                <scope>test</scope>
            </dependency>
    		-->
    
        </dependencies>
    
        <distributionManagement>
            <repository>
                <id>releases</id>
                <layout>default</layout>
                <url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url>
            </repository>
    
            <snapshotRepository>
                <id>snapshots</id>
                <name>snapshots</name>
                <url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url>
            </snapshotRepository>
        </distributionManagement>
    
        <repositories>
            <repository>
                <id>releases</id>
                <layout>default</layout>
                <url>http://xxx.xxx.xxx/nexus/content/repositories/releases/</url>
            </repository>
    
            <repository>
                <id>snapshots</id>
                <name>snapshots</name>
                <url>http://xxx.xxx.xxx/nexus/content/repositories/snapshots/</url>
                <snapshots>
                    <enabled>true</enabled>
                    <updatePolicy>always</updatePolicy>
                    <checksumPolicy>warn</checksumPolicy>
                </snapshots>
            </repository>
    
            <repository>
                <id>xxxxx</id>
                <name>xxxxxx</name>
                <url>http://xxx.xxx.xxx/nexus/content/repositories/xxxx/</url>
            </repository>
    
            <repository>
                <id>public</id>
                <name>public</name>
                <url>http://xxx.xxx.xxx/nexus/content/groups/public/</url>
            </repository>
    
            <!-- 新加 -->
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <build>
            <finalName>${fileName}</finalName>
            <plugins>
                <!-- 编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.6.0</version>
                    <configuration>
                        <source>${maven.compiler.source}</source>
                        <target>${maven.compiler.target}</target>
                        <encoding>${project.build.sourceEncoding}</encoding>
                        <compilerVersion>${maven.compiler.source}</compilerVersion>
                        <showDeprecation>true</showDeprecation>
                        <showWarnings>true</showWarnings>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.12.4</version>
                    <configuration>
                        <skipTests>${maven.test.skip}</skipTests>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.rat</groupId>
                    <artifactId>apache-rat-plugin</artifactId>
                    <version>0.12</version>
                    <configuration>
                        <excludes>
                            <exclude>README.md</exclude>
                        </excludes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-javadoc-plugin</artifactId>
                    <version>2.10.4</version>
                    <configuration>
                        <aggregate>true</aggregate>
                        <reportOutputDirectory>javadocs</reportOutputDirectory>
                        <locale>en</locale>
                    </configuration>
                </plugin>
                <!-- scala编译插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.1.6</version>
                    <configuration>
                        <scalaCompatVersion>2.11</scalaCompatVersion>
                        <scalaVersion>2.11.12</scalaVersion>
                        <encoding>UTF-8</encoding>
                    </configuration>
                    <executions>
                        <execution>
                            <id>compile-scala</id>
                            <phase>compile</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                        <execution>
                            <id>test-compile-scala</id>
                            <phase>test-compile</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!-- 打jar包插件(会包含所有依赖) -->
                <plugin>
                    <!--<groupId>org.apache.maven.plugins</groupId>-->
                    <artifactId>maven-assembly-plugin</artifactId>
                    <!--<version>2.6</version>-->
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <!--<archive>
                            <manifest>
                                <mainClass>${mainClass}</mainClass>
                            </manifest>
                        </archive>-->
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404

    1.31.3.log4j2.properties

    rootLogger.level = ERROR
    rootLogger.appenderRef.console.ref = ConsoleAppender
    
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    1.31.4.logback.xml

    <configuration>
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
            </encoder>
        </appender>
    
        <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <!-- Daily rollover -->
                <fileNamePattern>log/generator.%d{yyyy-MM-dd}.log</fileNamePattern>
                <!-- Keep 7 days' worth of history -->
                <maxHistory>7</maxHistory>
            </rollingPolicy>
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
    
        <root level="ERROR">
            <appender-ref ref="FILE" />
            <appender-ref ref="STDOUT" />
        </root>
    </configuration>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    1.31.5.cache.properties

    #\u5E94\u7528appkey
    app.key=appKeyTest
    # not use memoryCache
    #\u7F13\u5B58\u76D1\u63A7\u5F00\u5173 true/false
    monitor.enabled=false
    # 测试环境
    monitor.bootstrap.servers=xxx.xxx.xxx.xxx:9094
    synchronize.enabled= false
    local.cache.name = none
    #localCache config start
    # 测试环境redis
    remote.cache.servers=redis://xxx.xxx.xxx.xxx:6390,redis://xxx.xxx.xxx.xxx:6391,redis://xxx.xxx.xxx.xxx:6392,redis://xxx.xxx.xxx.xxx:6393,redis://xxx.xxx.xxx.xxx:6394,redis://xxx.xxx.xxx.xxx:6395
    remote.cache.mode=cluster
    #remote.cache.mode=standalone
    remote.cache.password=123456
    
    
    synchronize.strategy=kafka
    synchronize.address=
    monitor.address=
    monitor.strategy=kafka
    
    namespace=doraemon
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    1.31.6.project-config.properties

    # 本地的rocketmq的name server地址
    #rocketmq.name.server.addr=localhost:9876
    # 开发环境rocketmq
    rocketmq.name.server.addr=xxx.xxx.xxx.xxx:9876
    # 测试环境
    # rocketmq.name.server.addr=xxx.xxx.xxx.xxx:9876
    rocketmq.topics=issue_sync_message_1##issue_sync_message_2
    
    ####################################flink相关配置 start###########################################
    # 间隔5秒产生checkpoing
    flink.checkpoint.interval=120000
    # 确保检查点之间有至少500 ms的间隔
    flink.checkpoint.minPauseBetweenCheckpoints=1000
    # 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
    flink.checkpoint.checkpointTimeout=60000
    # 同一时间只允许进行一个检查点
    flink.checkpoint.maxConcurrentCheckpoints=1
    # rocketmq的读并发
    flink.rockeqmq.source.parallelism=1
    # redis下沉的并发
    flink.redis.sink.parallelism=1
    # 尝试重启次数
    flink.fixedDelayRestart.times=3
    # 每次尝试重启时之间的时间间隔
    flink.fixedDelayRestart.interval=5
    ####################################redis相关配置 end  ###########################################
    
    # 默认保存10天
    redis.default.expiration.time=864000
    ####################################redis相关配置 end  ###########################################
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    1.31.7.IssueAcceptSimpleProducer.java

    package com.xxxxx.issue.producer;
    
    import com.alibaba.fastjson.JSON;
    import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
    import com.xxxxx.doraemon.service.issue.domain.IssueSyncMessageBody;
    import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
    import com.xxxxx.issue.utils.PropertiesUtils;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    
    /**
     * 启动rocketmq的命令:
     * .\bin\mqnamesrv.cmd
     * .\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
     *
     * @author tuzuoquan
     * @version 1.0
     * @ClassName IssuePassSimpleProducer
     * @description TODO
     * @date 2020/9/14 15:29
     **/
    public class IssueAcceptSimpleProducer {
        private static Logger LOG = LoggerFactory.getLogger(IssueAcceptSimpleProducer.class);
    
        public static void main(String[] args) {
            DefaultMQProducer producer = new DefaultMQProducer("p003");
            producer.setNamesrvAddr("localhost:9876");
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            Long i = 1000L;
            while (i <= 100000000L) {
                IssueSyncMessageBody body = new IssueSyncMessageBody();
                body.setIssueId(i);
                body.setSerialNumber("" + i);
                body.setCreateDate(new Date());
    
                Long flag = i % 10;
                String userName = "user" + flag;
                String mobile = "1981715866" + flag;
                body.setHandleUserName(userName);
                body.setHandleMobile(mobile);
    
                String tenantId = i % 10 + "";
                ;
                body.setTenantId(tenantId);
    
                String oneLevel = i % 32 + "";
                String twoLevel = i % 22 + "";
                String threeLevel = i % 30 + "";
                String fourLevel = i % 15 + "";
                String fiveLevel = i % 25 + "";
                String sixLevel = i % 5 + "";
                String sevenLevel = i % 20 + "";
    
                Long tmp = i % 7;
    
                String handlerOrgCode = null;
                if (tmp.compareTo(0L) == 0) {
                    handlerOrgCode = oneLevel;
                } else if (tmp.compareTo(1L) == 0) {
                    handlerOrgCode = oneLevel + "." + twoLevel;
                } else if (tmp.compareTo(2L) == 0) {
                    handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel;
                } else if (tmp.compareTo(3L) == 0) {
                    handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel;
                } else if (tmp.compareTo(4L) == 0) {
                    handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel;
                } else if (tmp.compareTo(5L) == 0) {
                    handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel + "." + sixLevel;
                } else if (tmp.compareTo(6L) == 0) {
                    handlerOrgCode = oneLevel + "." + twoLevel + "." + threeLevel + "." + fourLevel + "." + fiveLevel + "." + sixLevel + "." + sevenLevel;
                }
                body.setHandleOrgCode(handlerOrgCode);
    
                //1、受理数
                String tag = "issue_accept_operat";
                Integer operatType = 61;
    
                //2、发生事件数:
                // TAG:issue_accept_operat    operatType:61 +
                // TAG:issue_add_operat   operatType:2 -
                // TAG:issue_delete_operat    operatType:0   org_code:区域范围  createDate:今日
    
                //3、办结数:
                //TAG:issue_inspect_pass_operat    operatType:30 +TAG:issue_complete_operat    operatType:31
                IssueSyncMessageBodyVO issueSyncMessageBodyVO = new IssueSyncMessageBodyVO();
                issueSyncMessageBodyVO.setBody(body);
                issueSyncMessageBodyVO.setOperatType(operatType);
    
                //TQMessage msg = new TQMessage("issue_sync_message", "issue_add_operat", ObjectConvertUtil.objectToByte(issueSyncMessageBodyVO));
                Message msg = new Message("issue_sync_message_1", tag, "id_" + i, ObjectConvertUtil.objectToByte(issueSyncMessageBodyVO));
                //Message msg = new Message("issue_sync_message5", "issue_add_operat", "id_" + i, JSON.toJSONString(issueSyncMessageBodyVO).getBytes());
                try {
                    producer.send(msg);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                LOG.info("send :" + i + " content: " + JSON.toJSONString(issueSyncMessageBodyVO));
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                i++;
            }
            //Shut down once the producer instance is not longer in use.
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118

    1.31.8.Consumer.java

    package com.xxxxx.issue.consumer;
    
    import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
    import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            // Instantiate with specified consumer group name.
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("abcdefsssss");
    
            // Specify name server addresses.
            //consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
            consumer.setNamesrvAddr("localhost:9876");
            //consumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
    
            // Subscribe one more more topics to consume.
            //consumer.subscribe(PropertiesUtils.getInstance().getRocketMqTopic_1(), "issue_accept_operat || issue_add_operat || issue_delete_operat");
            //consumer.subscribe(PropertiesUtils.getInstance().getRocketMqTopic_1(), "*");
            consumer.subscribe("issue_sync_message_1", "*");
            consumer.subscribe("issue_sync_message_2", "*");
            // Register callback to execute on arrival of messages fetched from brokers.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
    
                    for (MessageExt msg : msgs) {
                        IssueSyncMessageBodyVO issueSyncMessageBodyVO = (IssueSyncMessageBodyVO) ObjectConvertUtil.byteToObject(msg.getBody());
                        System.out.println(issueSyncMessageBodyVO.getBody());
                        //byte[] value = JSON.toJSONString(issueSyncMessageBodyVO).getBytes();
    //                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), value.toString());
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            //Launch the consumer instance.
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    1.31.9.DefaultTopicSelector.java

    package org.apache.rocketmq.flink.common.selector;
    
    public class DefaultTopicSelector<T> implements TopicSelector<T> {
        private final String topicName;
        private final String tagName;
    
        public DefaultTopicSelector(final String topicName, final String tagName) {
            this.topicName = topicName;
            this.tagName = tagName;
        }
    
        public DefaultTopicSelector(final String topicName) {
            this(topicName, "");
        }
    
        @Override
        public String getTopic(T tuple) {
            return topicName;
        }
    
        @Override
        public String getTag(T tuple) {
            return tagName;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    1.31.10.SimpleTopicSelector.java

    package org.apache.rocketmq.flink.common.selector;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Map;
    
    public class SimpleTopicSelector implements TopicSelector<Map> {
        private static final Logger LOG = LoggerFactory.getLogger(SimpleTopicSelector.class);
    
        private final String topicFieldName;
        private final String defaultTopicName;
    
        private final String tagFieldName;
        private final String defaultTagName;
    
        /**
         * SimpleTopicSelector Constructor.
         * @param topicFieldName field name used for selecting topic
         * @param defaultTopicName default field name used for selecting topic
         * @param tagFieldName field name used for selecting tag
         * @param defaultTagName default field name used for selecting tag
         */
        public SimpleTopicSelector(String topicFieldName, String defaultTopicName, String tagFieldName, String defaultTagName) {
            this.topicFieldName = topicFieldName;
            this.defaultTopicName = defaultTopicName;
            this.tagFieldName = tagFieldName;
            this.defaultTagName = defaultTagName;
        }
    
        @Override
        public String getTopic(Map tuple) {
            if (tuple.containsKey(topicFieldName)) {
                Object topic =  tuple.get(topicFieldName);
                return topic != null ? topic.toString() : defaultTopicName;
            } else {
                LOG.warn("Field {} Not Found. Returning default topic {}", topicFieldName, defaultTopicName);
                return defaultTopicName;
            }
        }
    
        @Override
        public String getTag(Map tuple) {
            if (tuple.containsKey(tagFieldName)) {
                Object tag = tuple.get(tagFieldName);
                return tag != null ? tag.toString() : defaultTagName;
            } else {
                LOG.warn("Field {} Not Found. Returning default tag {}", tagFieldName, defaultTagName);
                return defaultTagName;
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    1.31.11.TopicSelector.java

    package org.apache.rocketmq.flink.common.selector;
    
    import java.io.Serializable;
    
    public interface TopicSelector<T> extends Serializable {
    
        String getTopic(T tuple);
    
        String getTag(T tuple);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.31.12.KeyValueDeserializationSchema.java

    package org.apache.rocketmq.flink.common.serialization;
    
    import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    
    import java.io.Serializable;
    
    public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
        T deserializeKeyAndValue(byte[] key, byte[] value);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    1.31.13.KeyValueSerializationSchema.java

    package org.apache.rocketmq.flink.common.serialization;
    
    import java.io.Serializable;
    
    public interface KeyValueSerializationSchema<T> extends Serializable {
    
        byte[] serializeKey(T tuple);
    
        byte[] serializeValue(T tuple);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    1.31.14.SimpleKeyValueDeserializationSchema.java

    package org.apache.rocketmq.flink.common.serialization;
    
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    
    import java.nio.charset.StandardCharsets;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> {
        public static final String DEFAULT_KEY_FIELD = "key";
        public static final String DEFAULT_VALUE_FIELD = "value";
    
        public String keyField;
        public String valueField;
    
        public SimpleKeyValueDeserializationSchema() {
            this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
        }
    
        /**
         * SimpleKeyValueDeserializationSchema Constructor.
         * @param keyField tuple field for selecting the key
         * @param valueField  tuple field for selecting the value
         */
        public SimpleKeyValueDeserializationSchema(String keyField, String valueField) {
            this.keyField = keyField;
            this.valueField = valueField;
        }
    
        @Override
        public Map deserializeKeyAndValue(byte[] key, byte[] value) {
            HashMap map = new HashMap(2);
            if (keyField != null) {
                String k = key != null ? new String(key, StandardCharsets.UTF_8) : null;
                map.put(keyField, k);
            }
            if (valueField != null) {
                String v = value != null ? new String(value, StandardCharsets.UTF_8) : null;
                map.put(valueField, v);
            }
            return map;
        }
    
        @Override
        public TypeInformation<Map> getProducedType() {
            return TypeInformation.of(Map.class);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    1.31.15.SimpleKeyValueSerializationSchema.java

    package org.apache.rocketmq.flink.common.serialization;
    
    import java.nio.charset.StandardCharsets;
    import java.util.Map;
    
    public class SimpleKeyValueSerializationSchema implements KeyValueSerializationSchema<Map> {
        public static final String DEFAULT_KEY_FIELD = "key";
        public static final String DEFAULT_VALUE_FIELD = "value";
    
        public String keyField;
        public String valueField;
    
        public SimpleKeyValueSerializationSchema() {
            this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
        }
    
        /**
         * SimpleKeyValueSerializationSchema Constructor.
         * @param keyField tuple field for selecting the key
         * @param valueField  tuple field for selecting the value
         */
        public SimpleKeyValueSerializationSchema(String keyField, String valueField) {
            this.keyField = keyField;
            this.valueField = valueField;
        }
    
        @Override
        public byte[] serializeKey(Map tuple) {
            if (tuple == null || keyField == null) {
                return null;
            }
            Object key = tuple.get(keyField);
            return key != null ? key.toString().getBytes(StandardCharsets.UTF_8) : null;
        }
    
        @Override
        public byte[] serializeValue(Map tuple) {
            if (tuple == null || valueField == null) {
                return null;
            }
            Object value = tuple.get(valueField);
            return value != null ? value.toString().getBytes(StandardCharsets.UTF_8) : null;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    1.31.16.RocketMQConfig.java

    package org.apache.rocketmq.flink;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.commons.lang.Validate;
    import org.apache.rocketmq.acl.common.AclClientRPCHook;
    import org.apache.rocketmq.acl.common.SessionCredentials;
    import org.apache.rocketmq.client.ClientConfig;
    import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    
    import java.util.Properties;
    import java.util.UUID;
    
    import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
    
    /**
     * RocketMQConfig for Consumer/Producer.
     */
    public class RocketMQConfig {
        // Server Config
        public static final String NAME_SERVER_ADDR = "nameserver.address"; // Required
    
        public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
        public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds
    
        public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
        public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds
    
    
        // Producer related config
        public static final String PRODUCER_GROUP = "producer.group";
    
        public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
        public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;
    
        public static final String PRODUCER_TIMEOUT = "producer.timeout";
        public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
    
        public static final String ACCESS_KEY = "access.key";
        public static final String SECRET_KEY = "secret.key";
    
    
        // Consumer related config
        public static final String CONSUMER_GROUP = "consumer.group"; // Required
    
        public static final String CONSUMER_TOPIC = "consumer.topic"; // Required
    
        public static final String CONSUMER_TAG = "consumer.tag";
        public static final String DEFAULT_CONSUMER_TAG = "*";
    
        public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to";
        public static final String CONSUMER_OFFSET_LATEST = "latest";
        public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
        public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
        public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp";
    
        public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
        public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds
    
        public static final String CONSUMER_PULL_POOL_SIZE = "consumer.pull.thread.pool.size";
        public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20;
    
        public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
        public static final int DEFAULT_CONSUMER_BATCH_SIZE = 32;
    
        public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
        public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;
    
        public static final String MSG_DELAY_LEVEL = "msg.delay.level";
        public static final int MSG_DELAY_LEVEL00 = 0; // no delay
        public static final int MSG_DELAY_LEVEL01 = 1; // 1s
        public static final int MSG_DELAY_LEVEL02 = 2; // 5s
        public static final int MSG_DELAY_LEVEL03 = 3; // 10s
        public static final int MSG_DELAY_LEVEL04 = 4; // 30s
        public static final int MSG_DELAY_LEVEL05 = 5; // 1min
        public static final int MSG_DELAY_LEVEL06 = 6; // 2min
        public static final int MSG_DELAY_LEVEL07 = 7; // 3min
        public static final int MSG_DELAY_LEVEL08 = 8; // 4min
        public static final int MSG_DELAY_LEVEL09 = 9; // 5min
        public static final int MSG_DELAY_LEVEL10 = 10; // 6min
        public static final int MSG_DELAY_LEVEL11 = 11; // 7min
        public static final int MSG_DELAY_LEVEL12 = 12; // 8min
        public static final int MSG_DELAY_LEVEL13 = 13; // 9min
        public static final int MSG_DELAY_LEVEL14 = 14; // 10min
        public static final int MSG_DELAY_LEVEL15 = 15; // 20min
        public static final int MSG_DELAY_LEVEL16 = 16; // 30min
        public static final int MSG_DELAY_LEVEL17 = 17; // 1h
        public static final int MSG_DELAY_LEVEL18 = 18; // 2h
    
        /**
         * Build Producer Configs.
         * @param props Properties
         * @param producer DefaultMQProducer
         */
        public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
            buildCommonConfigs(props, producer);
    
            String group = props.getProperty(PRODUCER_GROUP);
            if (StringUtils.isEmpty(group)) {
                group = UUID.randomUUID().toString();
            }
            producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));
    
            producer.setRetryTimesWhenSendFailed(getInteger(props,
                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
            producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
            producer.setSendMsgTimeout(getInteger(props,
                PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
        }
    
        /**
         * Build Consumer Configs.
         * @param props Properties
         * @param consumer DefaultMQPushConsumer
         */
        public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {
            buildCommonConfigs(props, consumer);
    
            consumer.setMessageModel(MessageModel.CLUSTERING);
    
            consumer.setPersistConsumerOffsetInterval(getInteger(props,
                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
        }
    
        /**
         * Build Common Configs.
         * @param props Properties
         * @param client ClientConfig
         */
        public static void buildCommonConfigs(Properties props, ClientConfig client) {
            String nameServers = props.getProperty(NAME_SERVER_ADDR);
            Validate.notEmpty(nameServers);
            client.setNamesrvAddr(nameServers);
    
            client.setPollNameServerInterval(getInteger(props,
                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
            client.setHeartbeatBrokerInterval(getInteger(props,
                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
        }
    
    
        /**
         * Build credentials for client.
         * @param props
         * @return
         */
        public static AclClientRPCHook buildAclRPCHook(Properties props) {
            String accessKey = props.getProperty(ACCESS_KEY);
            String secretKey = props.getProperty(SECRET_KEY);
            if (!StringUtils.isEmpty(accessKey) && !StringUtils.isEmpty(secretKey)) {
                AclClientRPCHook aclClientRPCHook = new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
                return aclClientRPCHook;
            }
            return null;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158

    1.31.17.RocketMQSink.java

    package org.apache.rocketmq.flink;
    
    import org.apache.commons.lang.Validate;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.flink.common.selector.TopicSelector;
    import org.apache.rocketmq.flink.common.serialization.KeyValueSerializationSchema;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.nio.charset.StandardCharsets;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Properties;
    import java.util.UUID;
    
    /**
     * The RocketMQSink provides at-least-once reliability guarantees when
     * checkpoints are enabled and batchFlushOnCheckpoint(true) is set.
     * Otherwise, the sink reliability guarantees depends on rocketmq producer's retry policy.
     */
    public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    
        private static final long serialVersionUID = 1L;
    
        private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class);
    
        private transient DefaultMQProducer producer;
        private boolean async; // false by default
    
        private Properties props;
        private TopicSelector<IN> topicSelector;
        private KeyValueSerializationSchema<IN> serializationSchema;
    
        private boolean batchFlushOnCheckpoint; // false by default
        private int batchSize = 1000;
        private List<Message> batchList;
    
        private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
    
        public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
            this.serializationSchema = schema;
            this.topicSelector = topicSelector;
            this.props = props;
    
            if (this.props != null) {
                this.messageDeliveryDelayLevel  = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,
                        RocketMQConfig.MSG_DELAY_LEVEL00);
                if (this.messageDeliveryDelayLevel  < RocketMQConfig.MSG_DELAY_LEVEL00) {
                    this.messageDeliveryDelayLevel  = RocketMQConfig.MSG_DELAY_LEVEL00;
                } else if (this.messageDeliveryDelayLevel  > RocketMQConfig.MSG_DELAY_LEVEL18) {
                    this.messageDeliveryDelayLevel  = RocketMQConfig.MSG_DELAY_LEVEL18;
                }
            }
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            Validate.notEmpty(props, "Producer properties can not be empty");
            Validate.notNull(topicSelector, "TopicSelector can not be null");
            Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");
    
            producer = new DefaultMQProducer(RocketMQConfig.buildAclRPCHook(props));
            producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
            RocketMQConfig.buildProducerConfigs(props, producer);
    
            batchList = new LinkedList<>();
    
            if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
                LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
                batchFlushOnCheckpoint = false;
            }
    
            try {
                producer.start();
            } catch (MQClientException e) {
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void invoke(IN input, Context context) throws Exception {
            Message msg = prepareMessage(input);
    
            if (batchFlushOnCheckpoint) {
                batchList.add(msg);
                if (batchList.size() >= batchSize) {
                    flushSync();
                }
                return;
            }
    
            if (async) {
                try {
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            LOG.debug("Async send message success! result: {}", sendResult);
                        }
    
                        @Override
                        public void onException(Throwable throwable) {
                            if (throwable != null) {
                                LOG.error("Async send message failure!", throwable);
                            }
                        }
                    });
                } catch (Exception e) {
                    LOG.error("Async send message failure!", e);
                }
            } else {
                try {
                    SendResult result = producer.send(msg);
                    LOG.debug("Sync send message result: {}", result);
                    if (result.getSendStatus() != SendStatus.SEND_OK) {
                        throw new RemotingException(result.toString());
                    }
                } catch (Exception e) {
                    LOG.error("Sync send message failure!", e);
                    throw e;
                }
            }
        }
    
        private Message prepareMessage(IN input) {
            String topic = topicSelector.getTopic(input);
            String tag = (tag = topicSelector.getTag(input)) != null ? tag : "";
    
            byte[] k = serializationSchema.serializeKey(input);
            String key = k != null ? new String(k, StandardCharsets.UTF_8) : "";
            byte[] value = serializationSchema.serializeValue(input);
    
            Validate.notNull(topic, "the message topic is null");
            Validate.notNull(value, "the message body is null");
    
            Message msg = new Message(topic, tag, key, value);
            if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {
                msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);
            }
            return msg;
        }
    
        public RocketMQSink<IN> withAsync(boolean async) {
            this.async = async;
            return this;
        }
    
        public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) {
            this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;
            return this;
        }
    
        public RocketMQSink<IN> withBatchSize(int batchSize) {
            this.batchSize = batchSize;
            return this;
        }
    
        @Override
        public void close() throws Exception {
            if (producer != null) {
                try {
                    flushSync();
                } catch (Exception e) {
                    LOG.error("FlushSync failure!", e);
                }
                // make sure producer can be shutdown, thus current producerGroup will be unregistered
                producer.shutdown();
            }
        }
    
        private void flushSync() throws Exception {
            if (batchFlushOnCheckpoint) {
                synchronized (batchList) {
                    if (batchList.size() > 0) {
                        producer.send(batchList);
                        batchList.clear();
                    }
                }
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            flushSync();
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // Nothing to do
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202

    1.31.18.RocketMQSource.java

    import com.alibaba.fastjson.JSON;
    import com.xxxxx.caterpillar.sdk.util.ObjectConvertUtil;
    import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
    import org.apache.commons.collections.map.LinkedMap;
    import org.apache.commons.lang.Validate;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.CheckpointListener;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    import org.apache.rocketmq.client.consumer.*;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.message.MessageQueue;
    import org.apache.rocketmq.flink.common.serialization.KeyValueDeserializationSchema;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.nio.charset.StandardCharsets;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    
    import static org.apache.rocketmq.flink.RocketMQConfig.*;
    import static org.apache.rocketmq.flink.RocketMQUtils.getInteger;
    import static org.apache.rocketmq.flink.RocketMQUtils.getLong;
    
    /**
     * The MyRocketMQSource is based on RocketMQ pull consumer mode, and provides exactly once reliability guarantees when
     * checkpoints are enabled. Otherwise, the source doesn't provide any reliability guarantees.
     */
    public class RocketMQSource<OUT> extends RichParallelSourceFunction<OUT>
        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<OUT> {
    
        private static final long serialVersionUID = 1L;
    
        private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);
    
        private transient MQPullConsumerScheduleService pullConsumerScheduleService;
        private DefaultMQPullConsumer consumer;
    
        private KeyValueDeserializationSchema<OUT> schema;
    
        private RunningChecker runningChecker;
    
        private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
        private Map<MessageQueue, Long> offsetTable;
        private Map<MessageQueue, Long> restoredOffsets;
        /** Data for pending but uncommitted offsets. */
        private LinkedMap pendingOffsetsToCommit;
    
        private Properties props;
        private String topic;
        private String group;
    
        private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
    
        private transient volatile boolean restored;
        private transient boolean enableCheckpoint;
    
        public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
            this.schema = schema;
            this.props = props;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            LOG.debug("source open....");
            Validate.notEmpty(props, "Consumer properties can not be empty");
            Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");
    
            this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
            this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);
    
            Validate.notEmpty(topic, "Consumer topic can not be empty");
            Validate.notEmpty(group, "Consumer group can not be empty");
    
            this.enableCheckpoint = ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled();
    
            if (offsetTable == null) {
                offsetTable = new ConcurrentHashMap<>();
            }
            if (restoredOffsets == null) {
                restoredOffsets = new ConcurrentHashMap<>();
            }
            if (pendingOffsetsToCommit == null) {
                pendingOffsetsToCommit = new LinkedMap();
            }
    
            runningChecker = new RunningChecker();
    
            //Wait for lite pull consumer
            //pullConsumerScheduleService = new MQPullConsumerScheduleService(group, RocketMQConfig.buildAclRPCHook(props));
            pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
            consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();
    
            consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()) + "_" + UUID.randomUUID());
            RocketMQConfig.buildConsumerConfigs(props, consumer);
        }
    
        @Override
        public void run(SourceContext context) throws Exception {
            LOG.debug("source run....");
            // The lock that guarantees that record emission and state updates are atomic,
            // from the view of taking a checkpoint.
            final Object lock = context.getCheckpointLock();
    
            int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
                RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);
    
            String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);
    
            int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
                RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);
    
            int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,
                RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
    
            pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
            pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
    
                @Override
                public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
                    try {
                        long offset = getMessageQueueOffset(mq);
                        if (offset < 0) {
                            return;
                        }
    
                        PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
                        boolean found = false;
                        switch (pullResult.getPullStatus()) {
                            case FOUND:
                                List<MessageExt> messages = pullResult.getMsgFoundList();
                                for (MessageExt msg : messages) {
                                    byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
                                    //byte[] value = msg.getBody();
    
    
                                    //IssueSyncMessageBodyVO bodyVO = JSON.parseObject(new String(value),IssueSyncMessageBodyVO.class);
    
                                    IssueSyncMessageBodyVO issueSyncMessageBodyVO = (IssueSyncMessageBodyVO) ObjectConvertUtil.byteToObject(msg.getBody());
                                    //LOG.info(JSON.toJSONString("issueSyncMessageBodyVO = " + issueSyncMessageBodyVO));
                                    byte[] value = JSON.toJSONString(issueSyncMessageBodyVO).getBytes();
    
                                    OUT data = schema.deserializeKeyAndValue(key, value);
    
                                    // output and state update are atomic
                                    synchronized (lock) {
                                        context.collectWithTimestamp(data, msg.getBornTimestamp());
                                    }
                                }
                                found = true;
                                break;
                            case NO_MATCHED_MSG:
                                LOG.debug("No matched message after offset {} for queue {}", offset, mq);
                                break;
                            case NO_NEW_MSG:
                                break;
                            case OFFSET_ILLEGAL:
                                LOG.warn("Offset {} is illegal for queue {}", offset, mq);
                                break;
                            default:
                                break;
                        }
    
                        synchronized (lock) {
                            putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                        }
    
                        if (found) {
                            pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found
                        } else {
                            pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
                        }
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            });
    
            try {
                pullConsumerScheduleService.start();
            } catch (MQClientException e) {
                throw new RuntimeException(e);
            }
    
            runningChecker.setRunning(true);
    
            awaitTermination();
    
        }
    
        private void awaitTermination() throws InterruptedException {
            while (runningChecker.isRunning()) {
                Thread.sleep(50);
            }
        }
    
        private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
            Long offset = offsetTable.get(mq);
            // restoredOffsets(unionOffsetStates) is the restored global union state;
            // should only snapshot mqs that actually belong to us
            if (restored && offset == null) {
                offset = restoredOffsets.get(mq);
            }
            if (offset == null) {
                offset = consumer.fetchConsumeOffset(mq, false);
                if (offset < 0) {
                    String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
                    switch (initialOffset) {
                        case CONSUMER_OFFSET_EARLIEST:
                            offset = consumer.minOffset(mq);
                            break;
                        case CONSUMER_OFFSET_LATEST:
                            offset = consumer.maxOffset(mq);
                            break;
                        case CONSUMER_OFFSET_TIMESTAMP:
                            offset = consumer.searchOffset(mq, getLong(props,
                                RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
                            break;
                        default:
                            throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
                    }
                }
            }
            offsetTable.put(mq, offset);
            return offsetTable.get(mq);
        }
    
        private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
            offsetTable.put(mq, offset);
            if (!enableCheckpoint) {
                consumer.updateConsumeOffset(mq, offset);
            }
        }
    
        @Override
        public void cancel() {
            LOG.debug("cancel ...");
            runningChecker.setRunning(false);
    
            if (pullConsumerScheduleService != null) {
                pullConsumerScheduleService.shutdown();
            }
            if (offsetTable != null) {
                offsetTable.clear();
            }
            if (restoredOffsets != null) {
                restoredOffsets.clear();
            }
            if (pendingOffsetsToCommit != null) {
                pendingOffsetsToCommit.clear();
            }
        }
    
        @Override
        public void close() throws Exception {
            LOG.debug("close ...");
            // pretty much the same logic as cancelling
            try {
                cancel();
            } finally {
                super.close();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // called when a snapshot for a checkpoint is requested
    
            if (!runningChecker.isRunning()) {
                LOG.debug("snapshotState() called on closed source; returning null.");
                return;
            }
    
            if (LOG.isDebugEnabled()) {
                LOG.debug("Snapshotting state {} ...", context.getCheckpointId());
            }
    
            unionOffsetStates.clear();
    
            HashMap<MessageQueue, Long> currentOffsets = new HashMap<>(offsetTable.size());
    
            // remove the unassigned queues in order to avoid read the wrong offset when the source restart
            Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
            offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));
    
            for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
                unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
                currentOffsets.put(entry.getKey(), entry.getValue());
            }
    
            pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
    
            if (LOG.isDebugEnabled()) {
                LOG.debug("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                    offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // called every time the user-defined function is initialized,
            // be that when the function is first initialized or be that
            // when the function is actually recovering from an earlier checkpoint.
            // Given this, initializeState() is not only the place where different types of state are initialized,
            // but also where state recovery logic is included.
            LOG.debug("initialize State ...");
    
            this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
                    OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {
    
                    })));
            this.restored = context.isRestored();
    
            if (restored) {
                if (restoredOffsets == null) {
                    restoredOffsets = new ConcurrentHashMap<>();
                }
                for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
                    if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
                        restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
                    }
                }
                LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
            } else {
                LOG.info("No restore state for the consumer.");
            }
        }
    
        @Override
        public TypeInformation<OUT> getProducedType() {
            return schema.getProducedType();
        }
    
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            // callback when checkpoint complete
            if (!runningChecker.isRunning()) {
                LOG.debug("notifyCheckpointComplete() called on closed source; returning null.");
                return;
            }
    
            final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
            if (posInMap == -1) {
                LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
                return;
            }
    
            Map<MessageQueue, Long> offsets = (Map<MessageQueue, Long>) pendingOffsetsToCommit.remove(posInMap);
    
            // remove older checkpoints in map
            for (int i = 0; i < posInMap; i++) {
                pendingOffsetsToCommit.remove(0);
            }
    
            if (offsets == null || offsets.size() == 0) {
                LOG.debug("Checkpoint state was empty.");
                return;
            }
    
            for (Map.Entry<MessageQueue, Long> entry : offsets.entrySet()) {
                consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375

    1.31.19.RocketMQUtils.java

    package org.apache.rocketmq.flink;
    
    import java.util.Properties;
    
    public final class RocketMQUtils {
    
        public static int getInteger(Properties props, String key, int defaultValue) {
            return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
        }
    
        public static long getLong(Properties props, String key, long defaultValue) {
            return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
        }
    
        public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
            return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    1.31.20.RunningChecker.java

    package org.apache.rocketmq.flink;
    
    import java.io.Serializable;
    
    public class RunningChecker implements Serializable {
        private volatile boolean isRunning = false;
    
        public boolean isRunning() {
            return isRunning;
        }
    
        public void setRunning(boolean running) {
            isRunning = running;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    1.31.21.DateUtils.java

    package com.xxxxx.issue.utils;
    
    import org.apache.commons.lang.StringUtils;
    import org.joda.time.DateTime;
    import org.joda.time.format.DateTimeFormat;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.*;
    
    /**
     * Created by Administrator on 2017/3/7.
     *
     * @author Administrator
     */
    public final class DateUtils {
        private static final Logger logger = LoggerFactory.getLogger(DateUtils.class);
    
        /**
         * 时间格式符:yyyy-MM-dd HH:mm:ss
         */
        public static final String PATTERN_DATE_TIME = "yyyy-MM-dd HH:mm:ss";
    
        /**
         * 精简时间格式符:yyyyMMddHHmmss
         */
        public static final String PATTERN_DATE_TIME_SIMPLE = "yyyyMMddHHmmss";
    
        /**
         * 毫秒格式符:yyyyMMddhhmmssSSS
         */
        public static final String PATTERN_DATE_TIME_ALL = "yyyyMMddhhmmssSSS";
    
    
        /**
         * 时间格式符:yyyy-MM-dd
         */
        public static final String PATTERN_DATE = "yyyy-MM-dd";
    
        /**
         * 精简时间格式符:yyyyMM dd
         */
        public static final String PATTERN_DATE_SIMPLE = "yyyyMMdd";
    
        /**
         * 精简时间格式符:yyMM
         */
        public static final String PATTERN_DATE_SIMPLE_YYMM = "yyMM";
    
        /**
         * 时间格式符:HH:mm:ss
         */
        public static final String PATTERN_TIME = "HH:mm:ss";
    
        /**
         * 精简时间格式符:HHmmss
         */
        public static final String PATTERN_TIME_SIMPLE = "HHmmss";
    
        /**
         * 时间格式符:yyyy
         */
        public static final String PATTERN_YYYY = "yyyy";
    
        /**
         * 时间格式符:MM
         */
        public static final String PATTERN_MM = "MM";
    
        /**
         * 时间格式符:dd
         */
        public static final String PATTERN_DAY = "dd";
    
        /**
         * 从开始到现在的最大时间戳
         **/
        public static final Long TIME_MAX = 999999999999L;
    
        /**
         * 时间格式符:E
         */
        public static final String PATTERN_WEEK = "E";
    
        private static final String DATE_STR = "yyyy-MM-dd";
        private static final String TIME_STR = " 00:00:00";
        private static final String DATE_TIME_STR = "yyyy-MM-dd HH:mm:ss";
    
        /**
         * @return 取得系统毫秒数,返回Long.
         */
        public static Long getTimeMillisLong() {
            return System.currentTimeMillis();
        }
    
        /**
         * @return 取得系统秒数,返回Long.
         */
        public static Long getTimeSecondLong() {
            return System.currentTimeMillis() / 1000;
        }
    
        /**
         * @return 取得系统毫秒数,返回字符串.
         */
        public static String getTimeMillisString() {
            return System.currentTimeMillis() + "";
        }
    
        /**
         * @return 取得现在年月日时分秒yyyy-MM-dd HH:mm:ss.
         */
        public static String getNowDateTime() {
            return formateDateTime(PATTERN_DATE_TIME);
        }
    
        /**
         * @return 取得现在年月日时分秒yyyyMMddHHmmss.
         */
        public static String getNowDateTimeSimple() {
            return formateDateTime(PATTERN_DATE_TIME_SIMPLE);
        }
    
    
        /**
         * @return 取得现在年月日时分秒yyyyMMddHHmmss.
         */
        public static String getNowDateTimeAll() {
            return formateDateTime(PATTERN_DATE_TIME_ALL);
        }
    
        /**
         * @return 取得现在年月日yyyy-MM-dd.
         */
        public static String getNowDate() {
            return formateDateTime(PATTERN_DATE);
        }
    
        /**
         * @return 取得现在年月日yyyyMMdd.
         */
        public static String getNowDateSimple() {
            return formateDateTime(PATTERN_DATE_SIMPLE);
        }
    
        /**
         * @return 取得现在年月yyMM.
         */
        public static String getNowDateSimpleYymm() {
            return formateDateTime(PATTERN_DATE_SIMPLE_YYMM);
        }
    
        /**
         * @return  取得现在时分秒HH:mm:ss.
         */
        public static String getNowTime() {
            return formateDateTime(PATTERN_TIME);
        }
    
        /**
         * @return 取得现在时分秒HHmmss.
         */
        public static String getNowTimeSimple() {
            return formateDateTime(PATTERN_TIME_SIMPLE);
        }
    
        /**
         * @return 取得现在年yyyy.
         */
        public static String getNowYear() {
            return formateDateTime(PATTERN_YYYY);
        }
    
        /**
         * @return 取得现在月MM.
         */
        public static String getNowMonth() {
            return formateDateTime(PATTERN_MM);
        }
    
        /**
         * @return 取得现在天dd.
         */
        public static String getNowDay() {
            return formateDateTime(PATTERN_DAY);
        }
    
        /**
         * @return 取得现在星期,格式为星期一.
         */
        public static String getNowWeek() {
            return formateDateTime(PATTERN_WEEK);
        }
    
        /**
         * @param pattern 自定义的格式类型
         * @return 根据自定义格式取得现在时间.
         */
        public static String getNowDateTime(String pattern) {
            if (StringUtils.isNotBlank(pattern)) {
                return formateDateTime(pattern);
            } else {
                return "";
            }
        }
    
        /**
         * @return 取得当月第一天.
         */
        public static String getStartDayOfMonth() {
            DateTime dateTime = new DateTime();
            return dateTime.dayOfMonth().withMinimumValue().withTimeAtStartOfDay().toString(PATTERN_DATE);
        }
    
        /**
         * @return 取得当月最后一天.
         */
        public static String getEndDayOfMonth() {
            DateTime dateTime = new DateTime();
            return dateTime.dayOfMonth().withMaximumValue().millisOfDay().withMaximumValue().toString(PATTERN_DATE);
        }
    
        /**
         * @param month 月数
         * @return 增加月数
         * @throws Exception 向外抛出异常
         */
        public static String addMonth(int month) throws Exception {
            //设置日期格式
            SimpleDateFormat df = new SimpleDateFormat(PATTERN_DATE_TIME);
            String validatetime = df.format(new Date());
            Date now = df.parse(validatetime);
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(now);
            calendar.add(Calendar.MONTH, month);
            return calendar.getTime().getTime() + "";
        }
    	
    	/**
         * 增加或减少指定数量的天数
         *
         * @param date :传入的时间
         * @param num  :增加或减少的天数,增加num值为正数,减少num的值为负数
         * @return 时间date
         */
        public static Date addOrMinusDay(Date date, int num) {
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(new Date());
    
            calendar.add(Calendar.DAY_OF_MONTH, num);
            return calendar.getTime();
        }
    
        /**
         * @param date 时间类型的字符串
         * @param pattern 时间的格式类型
         * @return 根据时间字符串转换成Long毫秒数.  时间字符串的格式应该与Pattern的样式一致.
         */
        public static Long getStringToLong(String date, String pattern) {
            if (StringUtils.isBlank(date) || StringUtils.isBlank(pattern)) {
                return 0L;
            } else {
                DateTime dateTime = DateTimeFormat.forPattern(pattern).parseDateTime(date);
                return dateTime.getMillis();
            }
        }
    
        /**
         * @param date 时间
         * @param pattern 时间类型
         * @return 根据时间字符串转换成Long秒数.  时间字符串的格式应该与Pattern的样式一致.
         */
        public static int getStringToIntSeconds(String date, String pattern) {
            if (StringUtils.isBlank(date) || StringUtils.isBlank(pattern)) {
                return 0;
            } else {
                DateTime dateTime = DateTimeFormat.forPattern(pattern).parseDateTime(date);
                return Integer.parseInt(dateTime.getMillis() / 1000 + "");
            }
        }
    
        /**
         * @param millis long型的时间值
         * @param pattern 要转换成的类型
         * @return 根据String毫秒数转换成时间字符串.
         */
        public static String getLongToString(String millis, String pattern) {
            DateTime dateTime = new DateTime(Long.parseLong(millis));
            return dateTime.toString(pattern);
        }
    
        /**
         * @param millis long型的时间值
         * @param pattern 要转换成的类型
         * @return 根据Long毫秒数转换成时间字符串.
         */
        public static String getLongToString(long millis, String pattern) {
            DateTime dateTime = new DateTime(millis);
            return dateTime.toString(pattern);
        }
    
        /**
         * @param seconds 秒值
         * @return 根据秒数获取天时分秒.
         */
        public static String getRuntimeBySecond(int seconds) {
            long diffSeconds = seconds % 60;
            long diffMinutes = seconds / 60 % 60;
            long diffHours = seconds / (60 * 60) % 24;
            long diffDays = seconds / (24 * 60 * 60);
    
            StringBuffer buffer = new StringBuffer();
            buffer.append(diffDays + "天" + diffHours + "小时" + diffMinutes + "分钟" + diffSeconds + "秒");
            return buffer.toString();
        }
    
        /**
         * @param millis 毫秒值
         * @return 根据毫秒数获取天时分秒.
         */
        public static String getRuntimeByMillis(long millis) {
            long diffSeconds = millis / 1000 % 60;
            long diffMinutes = millis / (60 * 1000) % 60;
            long diffHours = millis / (60 * 60 * 1000) % 24;
            long diffDays = millis / (24 * 60 * 60 * 1000);
    
            StringBuffer buffer = new StringBuffer();
            buffer.append(diffDays + "天" + diffHours + "小时" + diffMinutes + "分钟" + diffSeconds + "秒");
            return buffer.toString();
        }
    
        /**
         * @param pattern 格式类型
         * @return 转换时间.
         */
        private static String formateDateTime(String pattern) {
            DateTime dateTime = new DateTime();
            return dateTime.toString(pattern);
        }
    
        /**
         * @param time 字符串时间
         * @return  转String为Calendar
         * @throws ParseException 向外抛出异常
         */
        public static Calendar changecal(String time) throws ParseException {
            //转类型
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            Date date = sdf.parse(time);
            Calendar cal = Calendar.getInstance();
            cal.setTime(date);
            return cal;
        }
    
        /**
         * @param cal 日期对象
         * @return 转Calendar为String
         */
        public static String changestr(Calendar cal) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            String time = sdf.format(cal.getTime());
            return time;
        }
    
        /**
         * @param cal 日期对象
         * @return 根据日期获取当月第一天和最后一天的索引
         */
        public static List<Integer> flmonthdate(Calendar cal) {
            //获取本月第一天和最后一天
            int monthMum = cal.get(Calendar.MONTH);
            cal.set(Calendar.DAY_OF_MONTH, 1);
            int firstDayInThisMonth = cal.get(Calendar.DAY_OF_YEAR);
            int nextMonth = monthMum + 1;
            cal.set(Calendar.MONTH, nextMonth);
            cal.set(Calendar.DAY_OF_MONTH, 1);
            int firstDayInNextMonth = cal.get(Calendar.DAY_OF_YEAR);
            int lastDayInThisMonth = firstDayInNextMonth - 1;
            List<Integer> list = new ArrayList<Integer>();
            list.add(0, firstDayInThisMonth);
            list.add(1, lastDayInThisMonth);
            return list;
        }
    
        /**
         * @param cal 指定的日期对象
         * @return 根据日期获取当天和当月最后一天的索引
         */
        public static List<Integer> twoDay(Calendar cal) {
            int firstDay = cal.get(Calendar.DAY_OF_YEAR);
            int monthMum = cal.get(Calendar.MONTH);
            int firstDayInThisMonth = cal.get(Calendar.DAY_OF_YEAR);
            int nextMonth = monthMum + 1;
            cal.set(Calendar.MONTH, nextMonth);
            cal.set(Calendar.DAY_OF_MONTH, 1);
            int firstDayInNextMonth = cal.get(Calendar.DAY_OF_YEAR);
            int lastDayInThisMonth = firstDayInNextMonth - 1;
            List<Integer> list = new ArrayList<Integer>();
            list.add(0, firstDay);
            list.add(1, lastDayInThisMonth);
            return list;
        }
    
        /**
         * @param i 日期索引值
         * @return 日期索引转毫秒数
         */
        public static String changeday(int i) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            Calendar cal = Calendar.getInstance();
            //赋值日期
            cal.set(Calendar.DAY_OF_YEAR, i);
            //日期转毫秒
            String time = Long.toString(DateUtils.getStringToLong(sdf.format(cal.getTime()), DateUtils.PATTERN_DATE));
            return time;
        }
    
        /**
         * @param date 指定时间
         * @return 获取当月最大天数
         * @throws ParseException 向外抛出异常
         */
        public static int getMaxDay(String date) throws ParseException {
            Calendar cal = changecal(date);
            int day = cal.getActualMaximum(Calendar.DATE);
            return day;
        }
    
        /**
         * @param date 指定的时间
         * @return  获取当月除星期天天数
         * @throws ParseException 向外抛出异常
         */
        public static int getDayNoSunday(String date) throws ParseException {
            Calendar cal = changecal(date);
            List<Integer> day = flmonthdate(cal);
            int days = 0;
            for (int i = day.get(0); i <= day.get(1); i++) {
                cal.set(Calendar.DAY_OF_YEAR, i);
                int weekDay = cal.get(Calendar.DAY_OF_WEEK);
                if (weekDay != 1) {
                    days++;
                }
            }
            return days;
        }
    
        /**
         * @param date 指定的时间字符串
         * @return 获取当月某日到月底中除星期天的天数
         * @throws ParseException 向外抛出异常
         */
        public static int getDayNoSundayBewToDay(String date) throws ParseException {
            Calendar cal = changecal(date);
            List<Integer> day = twoDay(cal);
            int days = 0;
            for (int i = day.get(0); i <= day.get(1); i++) {
                cal.set(Calendar.DAY_OF_YEAR, i);
                int weekDay = cal.get(Calendar.DAY_OF_WEEK);
                if (weekDay != 1) {
                    days++;
                }
            }
            return days;
        }
    
        /**
         * @param date 指定的时间字符串
         * @return 获取当月某日到月底中除星期六星期天的天数
         * @throws ParseException 向外抛出异常
         */
        public static int getDayNoWeekendBewToDay(String date) throws ParseException {
            Calendar cal = changecal(date);
            List<Integer> day = twoDay(cal);
            int days = 0;
            for (int i = day.get(0); i <= day.get(1); i++) {
                cal.set(Calendar.DAY_OF_YEAR, i);
                int weekDay = cal.get(Calendar.DAY_OF_WEEK);
                if (weekDay != 1 && weekDay != 7) {
                    days++;
                }
            }
            return days;
        }
    
        /**
         * @param date 指定的时间字符串
         * @return 获取当月某日到月底中的天数
         * @throws ParseException 向外抛出异常
         */
        public static int getDayNoWeekend(String date) throws ParseException {
            Calendar cal = changecal(date);
            List<Integer> day = twoDay(cal);
            int days = 0;
            for (int i = day.get(0); i <= day.get(1); i++) {
                days++;
            }
            return days;
        }
    
        /**
         * @param i 索引值
         * @return 日期索引转String
         */
        public static String getday(int i) {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            Calendar cal = Calendar.getInstance();
            //赋值日期
            cal.set(Calendar.DAY_OF_YEAR, i);
            String day = changestr(cal);
            return day;
        }
    
        /**
         * @param time 时间的字符串
         * @return 时间 转 毫秒  time格式:yyyy-MM-dd hh:mm:ss
         * @throws ParseException 向外抛出异常
         */
        public static long dateChangeMillisecond(String time) throws ParseException {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
            //毫秒
            long millionSeconds = sdf.parse(time).getTime();
            return millionSeconds;
        }
    
        /**
         * @param time 时间的字符串
         * @return 时间 转 毫秒    time格式:yyyyMMddhhmmss
         * @throws ParseException 向外抛出异常
         */
        public static long dateChangeMillisecond1(String time) throws ParseException {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddhhmmss");
            //毫秒
            long millionSeconds = sdf.parse(time).getTime();
            return millionSeconds;
        }
    
        /**
         * @return 计算当前日期的开始时间  time格式:yyyyMMdd
         * @throws ParseException 向外抛出异常
         */
        public static long getTodayStartLongSecond() throws ParseException {
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
            //毫秒
            String nowDate = sdf.format(new Date()) + "000000";
            return dateChangeMillisecond1(nowDate) / 1000;
        }
    
        /**
         * @return 计算当前日期的开始时间   time格式:yyyyMMdd
         * @throws ParseException 向外抛出异常
         */
        public static long getTodayEndLongSecond() throws ParseException {
            return getTodayStartLongSecond() + 24 * 60 * 60;
        }
    
        /**
         * @param times   long 时间戳[]
         * @param dateFor 数据格式类型
         * @return 计算当前日期的开始时间  time格式:yyyyMMdd
         */
        public static String getDateStrLongtoString(Long times, String dateFor) {
    
            SimpleDateFormat sdf = new SimpleDateFormat(dateFor);
            String str = "";
            if (times > TIME_MAX) {
                Date date = new Date(times);
                str = sdf.format(date);
            } else {
                Date date = new Date(times * 1000);
                str = sdf.format(date);
            }
    
            return str;
        }
    
        /**
         * @param year  :指定年
         * @param month :指定月
         * @param flag  :true 月的最早开始时间   false,最
         * @return 获取指定月的最早开始时间 和 指定月的最晚时间
         */
        public static Date getStartOrEndDateInTargetMonth(int year, int month, boolean flag) {
            GregorianCalendar ca = new GregorianCalendar();
            ca.clear();
            ca.set(Calendar.YEAR, year);
            ca.set(Calendar.MONTH, month - 1);
    
            Calendar calendar = Calendar.getInstance();
            //设置时间
            calendar.setTime(ca.getTime());
    
            //要获得上一个月的第一天
            if (flag) {
                calendar.set(Calendar.MONTH, calendar.get(Calendar.MONTH));
                //设置"日"
                calendar.set(Calendar.DAY_OF_MONTH, 1);
                //设置"时"
                calendar.set(Calendar.HOUR_OF_DAY, 0);
                //设置"分"
                calendar.set(Calendar.MINUTE, 0);
                //设置"秒"
                calendar.set(Calendar.SECOND, 0);
                //设置"毫秒"
                calendar.set(Calendar.MILLISECOND, 0);
            } else {
                //设置"日"
                calendar.set(Calendar.DAY_OF_MONTH, calendar.getActualMaximum(Calendar.DAY_OF_MONTH));
                //设置"时"
                calendar.set(Calendar.HOUR_OF_DAY, 23);
                //设置"分"
                calendar.set(Calendar.MINUTE, 59);
                //设置"秒"
                calendar.set(Calendar.SECOND, 59);
                //设置"毫秒"
                calendar.set(Calendar.MILLISECOND, 999);
            }
    
            return calendar.getTime();
        }
    	
    	/**
    	 * <p class="detail">
    	 * 功能:日期格式化为字符串
    	 * </p>
    	 * @author tangy
    	 * @param date 时间
    	 * @param format 格式
    	 * @return 日期格式化为字符串
    	 */
    	public static String dateFormat(Date date,String format){
    		return new SimpleDateFormat(format).format(date);
    	}
    
    //    public static void main(String[] args) {
    //        LOG.info(DateUtils.dateFormat(new Date(),DateUtils.PATTERN_DATE_SIMPLE));
    //    }
    
        public static Date parseDate(String datestr,String format) throws ParseException{
            return new SimpleDateFormat(format).parse(datestr);
        }
    
        /**
         * <p class="detail">
         * 功能:日期查询时根据类型获得日期查询范围的开始时间
         * </p>
         * @author tangy
         * @param dateType 10今天,20昨天,30过去7天,40过去30天,50过去3个月,60过去6个月,70过去一年
         * @return 日期扣减后的当天凌晨时间
         */
        public static Date getStartDateByType(Integer dateType){
            Date curDate = new Date();
            Date sDate = null;
    
            Calendar rightNow = Calendar.getInstance();
            rightNow.setTime(curDate);
    
            if(dateType==null){
                sDate=new Date();
            }else if(dateType==10){
                try {
                    sDate = parseDate(getMorningToString(curDate),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }else if(dateType==20){
                rightNow.add(Calendar.DAY_OF_YEAR,-1);//日期加减一天
                try {
                    sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }else if(dateType==30){
                rightNow.add(Calendar.DAY_OF_YEAR,-6);
                try {
                    sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }else if(dateType==35){
                rightNow.add(Calendar.DAY_OF_YEAR,-13);
                try {
                    sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }else if(dateType==40){
                rightNow.add(Calendar.MONTH,-1);
                try {
                    sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }else if(dateType==50){
                rightNow.add(Calendar.MONTH,-3);
                try {
                    sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }else if(dateType==60){
                rightNow.add(Calendar.MONTH,-6);
                try {
                    sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }else if(dateType==70){
                rightNow.add(Calendar.MONTH,-12);
                try {
                    sDate = parseDate(getMorningToString(rightNow.getTime()),DATE_TIME_STR);
                } catch (ParseException e) {
                    e.printStackTrace();
                }
            }
            return sDate;
        }
    
        /**
         * <p class="detail">
         * 功能:获得传入的时间凌晨时间(即00:00:00)
         * </p>
         * @author tangy
         * @param date 时间对象
         * @return 获得传入的时间凌晨时间(即00:00:00)
         */
        public static String getMorningToString(Date date){
            return dateFormat(date,DATE_TIME_STR).substring(0,10)+TIME_STR;
        }
    
        /**
         * <p class="detail">
         * 功能:获得传入的时间午夜时间(即23:59:59)
         * </p>
         * @author tangy
         * @param date 时间对象
         * @return 获得传入的时间午夜时间(即23:59:59)
         */
        public static String getNightToString(Date date){
            return dateFormat(date,DATE_TIME_STR).substring(0,10)+" 23:59:59";
        }
    
        /**
         * <p class="detail">
         * 功能:获取上月第一天日期
         * </p>
         * @author tangy
         * @return 获取上月第一天日期
         */
        public static Date getLastMonthFirstDay(){
            Calendar calendar = Calendar.getInstance();
            calendar.add(Calendar.MONTH, -1);
            calendar.set(Calendar.DAY_OF_MONTH, 1);
            return calendar.getTime();
        }
        /**
         * <p class="detail">
         * 功能:获得上月最后一天日期
         * </p>
         * @author tangy
         * @return 获得上月最后一天日期
         */
        public static Date getLastMonthEndDay(){
            Calendar calendar = Calendar.getInstance();
            calendar.set(Calendar.DAY_OF_MONTH, 1);
            calendar.add(Calendar.DATE, -1);
            return calendar.getTime();
        }
        /**
         * <p class="detail">
         * 功能:获取当月第一天
         * </p>
         * @author zhanghl
         * @return 获取当月第一天
         */
        public static String getCurrentMonthFirstDay(){
            Calendar calendar = Calendar.getInstance();
            calendar.set(Calendar.DAY_OF_MONTH, 1);
            return dateFormat(calendar.getTime(),DATE_TIME_STR).substring(0,10)+TIME_STR;
        }
    
        /**
         * <p class="detail">
         * 功能:获得给定的时间所在月份第一天的时间
         * </p>
         * @author tangy
         * @param date 时间
         * @return 返回给定时间第一天00:00:00点时间
         */
        public static String getMonthFirstDay(Date date){
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(date);
            calendar.set(Calendar.DAY_OF_MONTH, 1);
            return dateFormat(calendar.getTime(),DATE_STR)+TIME_STR;
        }
    
        /**
         * <p class="detail">
         * 功能:获得给定的时间所在月份最后一天的时间
         * </p>
         * @author tangy
         * @param date 时间
         * @return 返回给定时间最后一天23:59:59点时间
         */
        public static String getMonthEndDay(Date date){
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(date);
            calendar.set(Calendar.DATE, calendar.getActualMaximum(Calendar.DATE));
            return dateFormat(calendar.getTime(),DATE_STR)+" 23:59:59";
        }
    
        /**
         * <p class="detail">
         * 功能:按格式格式化时间字符串,如果时间格式不对则返回空
         * </p>
         * @author tangy
         * @param dateStr 时间字符串
         * @param dateFormat (如:yyyy-MM-dd HH:mm:ss)
         * @return 按格式格式化时间字符串,如果时间格式不对则返回空
         */
        public static String getDateFormat(String dateStr,String dateFormat){
            String result = null;
            if(dateStr!=null && dateFormat!=null){
                try{
                    result=dateFormat(parseDate(dateStr,dateFormat),dateFormat);
                }catch(Exception e){
                    logger.info(e.getMessage());
                }
            }
            return result;
        }
    
        /**
         * <p class="detail">
         * 功能:时间加减某个天数后的时间
         * </p>
         * @author tangy
         * @param date 要加减的时间
         * @param day 要加减的天数,减天数传负数
         * @return 时间加减某个天数后的时间
         */
        public static Date addSubtractDate(Date date,int day){
            Calendar resultCalendar = Calendar.getInstance();
            resultCalendar.setTime(date);
            resultCalendar.add(Calendar.DAY_OF_YEAR,day);
            return resultCalendar.getTime();
        }
    
        public static Date timeAddTwoHour(){
            Date now = new Date();
            try {
                return parseDate(dateFormat(new Date(now.getTime()+7200000),"yyyy-MM-dd HH:mm"),"yyyy-MM-dd HH:mm");
            } catch (ParseException e) {
                e.printStackTrace();
            }
            return now;
        }
    
        /**
         * <p class="detail">
         * 功能:时间戳转换成字符窜
         * </p>
         * @author zhangqi
         * @param time 时间戳
         * @return 时间戳转换成字符窜
         */
        public static String getDateToString(long time) {
            Date d = new Date(time);
            SimpleDateFormat sf = new SimpleDateFormat(DATE_TIME_STR);
            return sf.format(d);
        }
    
        /**
         * <p class="detail">
         * 功能:字符串转换成时间戳
         * </p>
         * @author zhangqi
         * @param time 时间的字符串
         * @return 字符串转换成时间戳
         */
        public static long getStringToDate(String time) {
            SimpleDateFormat sdf = new SimpleDateFormat(DATE_TIME_STR);
            Date date = new Date();
            try{
                date = sdf.parse(time);
            } catch(ParseException e) {
                e.printStackTrace();
            }
            return date.getTime();
        }
    
        /**
         * <p >
         * 功能:获取当天时间的时间戳 (精确时分秒)
         * </p>
         * @param
         * @author chenyx
         * @date   
         * @return XXX
         */
        public static Long getNowDateToDate(){
            SimpleDateFormat sf = new SimpleDateFormat(DATE_TIME_STR);
            String nowDateStr = sf.format(new Date());
            Date date = new Date();
            try {
                date = sf.parse(nowDateStr);
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return date.getTime();
        }
    
        /**
         * <p >
         * 功能:获取当天时间的时间戳 (年月日)
         * </p>
         * @param
         * @author chenyx
         * @return Long型年月日时间戳
         */
        public static Long getYearMonthDayTimeStamp(){
            SimpleDateFormat sf = new SimpleDateFormat(DATE_STR);
            String nowDateStr = sf.format(new Date());
            Date date = new Date();
            try {
                date = sf.parse(nowDateStr);
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return date.getTime();
        }
    
        /**
         * <p >
         * 功能:两个时间之间的秒数
         * </p>
         *
         * @param time 第一个时间
         * @param bigTime 第二个时间
         * @author zhangq
         * @return XXX
         */
        public static int countTimes(Date time, Date bigTime){
            long timeNum = time.getTime();
            long bigTimeNum = bigTime.getTime();
            int count = (int)((bigTimeNum - timeNum) / 1000);
            return count;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
    • 222
    • 223
    • 224
    • 225
    • 226
    • 227
    • 228
    • 229
    • 230
    • 231
    • 232
    • 233
    • 234
    • 235
    • 236
    • 237
    • 238
    • 239
    • 240
    • 241
    • 242
    • 243
    • 244
    • 245
    • 246
    • 247
    • 248
    • 249
    • 250
    • 251
    • 252
    • 253
    • 254
    • 255
    • 256
    • 257
    • 258
    • 259
    • 260
    • 261
    • 262
    • 263
    • 264
    • 265
    • 266
    • 267
    • 268
    • 269
    • 270
    • 271
    • 272
    • 273
    • 274
    • 275
    • 276
    • 277
    • 278
    • 279
    • 280
    • 281
    • 282
    • 283
    • 284
    • 285
    • 286
    • 287
    • 288
    • 289
    • 290
    • 291
    • 292
    • 293
    • 294
    • 295
    • 296
    • 297
    • 298
    • 299
    • 300
    • 301
    • 302
    • 303
    • 304
    • 305
    • 306
    • 307
    • 308
    • 309
    • 310
    • 311
    • 312
    • 313
    • 314
    • 315
    • 316
    • 317
    • 318
    • 319
    • 320
    • 321
    • 322
    • 323
    • 324
    • 325
    • 326
    • 327
    • 328
    • 329
    • 330
    • 331
    • 332
    • 333
    • 334
    • 335
    • 336
    • 337
    • 338
    • 339
    • 340
    • 341
    • 342
    • 343
    • 344
    • 345
    • 346
    • 347
    • 348
    • 349
    • 350
    • 351
    • 352
    • 353
    • 354
    • 355
    • 356
    • 357
    • 358
    • 359
    • 360
    • 361
    • 362
    • 363
    • 364
    • 365
    • 366
    • 367
    • 368
    • 369
    • 370
    • 371
    • 372
    • 373
    • 374
    • 375
    • 376
    • 377
    • 378
    • 379
    • 380
    • 381
    • 382
    • 383
    • 384
    • 385
    • 386
    • 387
    • 388
    • 389
    • 390
    • 391
    • 392
    • 393
    • 394
    • 395
    • 396
    • 397
    • 398
    • 399
    • 400
    • 401
    • 402
    • 403
    • 404
    • 405
    • 406
    • 407
    • 408
    • 409
    • 410
    • 411
    • 412
    • 413
    • 414
    • 415
    • 416
    • 417
    • 418
    • 419
    • 420
    • 421
    • 422
    • 423
    • 424
    • 425
    • 426
    • 427
    • 428
    • 429
    • 430
    • 431
    • 432
    • 433
    • 434
    • 435
    • 436
    • 437
    • 438
    • 439
    • 440
    • 441
    • 442
    • 443
    • 444
    • 445
    • 446
    • 447
    • 448
    • 449
    • 450
    • 451
    • 452
    • 453
    • 454
    • 455
    • 456
    • 457
    • 458
    • 459
    • 460
    • 461
    • 462
    • 463
    • 464
    • 465
    • 466
    • 467
    • 468
    • 469
    • 470
    • 471
    • 472
    • 473
    • 474
    • 475
    • 476
    • 477
    • 478
    • 479
    • 480
    • 481
    • 482
    • 483
    • 484
    • 485
    • 486
    • 487
    • 488
    • 489
    • 490
    • 491
    • 492
    • 493
    • 494
    • 495
    • 496
    • 497
    • 498
    • 499
    • 500
    • 501
    • 502
    • 503
    • 504
    • 505
    • 506
    • 507
    • 508
    • 509
    • 510
    • 511
    • 512
    • 513
    • 514
    • 515
    • 516
    • 517
    • 518
    • 519
    • 520
    • 521
    • 522
    • 523
    • 524
    • 525
    • 526
    • 527
    • 528
    • 529
    • 530
    • 531
    • 532
    • 533
    • 534
    • 535
    • 536
    • 537
    • 538
    • 539
    • 540
    • 541
    • 542
    • 543
    • 544
    • 545
    • 546
    • 547
    • 548
    • 549
    • 550
    • 551
    • 552
    • 553
    • 554
    • 555
    • 556
    • 557
    • 558
    • 559
    • 560
    • 561
    • 562
    • 563
    • 564
    • 565
    • 566
    • 567
    • 568
    • 569
    • 570
    • 571
    • 572
    • 573
    • 574
    • 575
    • 576
    • 577
    • 578
    • 579
    • 580
    • 581
    • 582
    • 583
    • 584
    • 585
    • 586
    • 587
    • 588
    • 589
    • 590
    • 591
    • 592
    • 593
    • 594
    • 595
    • 596
    • 597
    • 598
    • 599
    • 600
    • 601
    • 602
    • 603
    • 604
    • 605
    • 606
    • 607
    • 608
    • 609
    • 610
    • 611
    • 612
    • 613
    • 614
    • 615
    • 616
    • 617
    • 618
    • 619
    • 620
    • 621
    • 622
    • 623
    • 624
    • 625
    • 626
    • 627
    • 628
    • 629
    • 630
    • 631
    • 632
    • 633
    • 634
    • 635
    • 636
    • 637
    • 638
    • 639
    • 640
    • 641
    • 642
    • 643
    • 644
    • 645
    • 646
    • 647
    • 648
    • 649
    • 650
    • 651
    • 652
    • 653
    • 654
    • 655
    • 656
    • 657
    • 658
    • 659
    • 660
    • 661
    • 662
    • 663
    • 664
    • 665
    • 666
    • 667
    • 668
    • 669
    • 670
    • 671
    • 672
    • 673
    • 674
    • 675
    • 676
    • 677
    • 678
    • 679
    • 680
    • 681
    • 682
    • 683
    • 684
    • 685
    • 686
    • 687
    • 688
    • 689
    • 690
    • 691
    • 692
    • 693
    • 694
    • 695
    • 696
    • 697
    • 698
    • 699
    • 700
    • 701
    • 702
    • 703
    • 704
    • 705
    • 706
    • 707
    • 708
    • 709
    • 710
    • 711
    • 712
    • 713
    • 714
    • 715
    • 716
    • 717
    • 718
    • 719
    • 720
    • 721
    • 722
    • 723
    • 724
    • 725
    • 726
    • 727
    • 728
    • 729
    • 730
    • 731
    • 732
    • 733
    • 734
    • 735
    • 736
    • 737
    • 738
    • 739
    • 740
    • 741
    • 742
    • 743
    • 744
    • 745
    • 746
    • 747
    • 748
    • 749
    • 750
    • 751
    • 752
    • 753
    • 754
    • 755
    • 756
    • 757
    • 758
    • 759
    • 760
    • 761
    • 762
    • 763
    • 764
    • 765
    • 766
    • 767
    • 768
    • 769
    • 770
    • 771
    • 772
    • 773
    • 774
    • 775
    • 776
    • 777
    • 778
    • 779
    • 780
    • 781
    • 782
    • 783
    • 784
    • 785
    • 786
    • 787
    • 788
    • 789
    • 790
    • 791
    • 792
    • 793
    • 794
    • 795
    • 796
    • 797
    • 798
    • 799
    • 800
    • 801
    • 802
    • 803
    • 804
    • 805
    • 806
    • 807
    • 808
    • 809
    • 810
    • 811
    • 812
    • 813
    • 814
    • 815
    • 816
    • 817
    • 818
    • 819
    • 820
    • 821
    • 822
    • 823
    • 824
    • 825
    • 826
    • 827
    • 828
    • 829
    • 830
    • 831
    • 832
    • 833
    • 834
    • 835
    • 836
    • 837
    • 838
    • 839
    • 840
    • 841
    • 842
    • 843
    • 844
    • 845
    • 846
    • 847
    • 848
    • 849
    • 850
    • 851
    • 852
    • 853
    • 854
    • 855
    • 856
    • 857
    • 858
    • 859
    • 860
    • 861
    • 862
    • 863
    • 864
    • 865
    • 866
    • 867
    • 868
    • 869
    • 870
    • 871
    • 872
    • 873
    • 874
    • 875
    • 876
    • 877
    • 878
    • 879
    • 880
    • 881
    • 882
    • 883
    • 884
    • 885
    • 886
    • 887
    • 888
    • 889
    • 890
    • 891
    • 892
    • 893
    • 894
    • 895
    • 896
    • 897
    • 898
    • 899
    • 900
    • 901
    • 902
    • 903
    • 904
    • 905
    • 906
    • 907
    • 908
    • 909
    • 910
    • 911
    • 912
    • 913
    • 914
    • 915
    • 916
    • 917
    • 918
    • 919
    • 920
    • 921
    • 922
    • 923
    • 924
    • 925
    • 926
    • 927
    • 928
    • 929
    • 930
    • 931
    • 932
    • 933
    • 934
    • 935
    • 936
    • 937
    • 938
    • 939
    • 940
    • 941
    • 942
    • 943
    • 944
    • 945
    • 946
    • 947
    • 948
    • 949
    • 950
    • 951
    • 952
    • 953

    1.31.22.PropertiesUtils.java

    package com.xxxxx.issue.utils;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.InputStream;
    import java.util.Properties;
    
    /**
     * @author tuzuoquan
     * @version 1.0
     * @ClassName PropertiesUtils
     * @description TODO
     * @date 2020/9/23 9:23
     **/
    public final class PropertiesUtils {
        private static Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
        private static PropertiesUtils instance = null;
    
        /** 间隔xxx秒产生checkpoing **/
        private Integer flinkCheckpointsInterval = null;
        /** 确保检查点之间有至少xxx ms的间隔 **/
        private Integer flinkMinPauseBetweenCheckpoints = null;
        /** 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】 **/
        private Integer flinkCheckpointTimeout = null;
        /** 同一时间只允许进行一个检查点 **/
        private Integer flinkMaxConcurrentCheckpoints = null;
        /** 尝试重启次数 **/
        private Integer flinkFixedDelayRestartTimes = null;
        /** 每次尝试重启时之间的时间间隔 **/
        private Integer flinkFixedDelayRestartInterval = null;
    
        private String rocketmqNameServer = null;
        private String rocketMqTopics = null;
    
        /** rocketmq source 的并行度 **/
        private Integer rockeqMqSourceParallelism = null;
        /** redis sink 的并行度 **/
        private Integer redisSinkParallelism = null;
    
        private Integer redisDefaultExpirationTime = null;
    
        /**
         * 静态代码块
         */
        private PropertiesUtils() {
            try {
                // 读取配置文件,通过类加载器的方式读取属性文件
                InputStream in = PropertiesUtils.class.getClassLoader().getResourceAsStream("project-config.properties");
                Properties prop = new Properties();
                prop.load(in);
    
                rocketmqNameServer = prop.getProperty("rocketmq.name.server.addr").trim();
                rocketMqTopics = prop.getProperty("rocketmq.topics").trim();
    
                flinkCheckpointsInterval = Integer.parseInt(prop.getProperty("flink.checkpoint.interval").trim());
                flinkMinPauseBetweenCheckpoints = Integer.parseInt(prop.getProperty("flink.checkpoint.minPauseBetweenCheckpoints").trim());
                flinkCheckpointTimeout = Integer.parseInt(prop.getProperty("flink.checkpoint.checkpointTimeout").trim());
                flinkMaxConcurrentCheckpoints = Integer.parseInt(prop.getProperty("flink.checkpoint.maxConcurrentCheckpoints").trim());
                flinkFixedDelayRestartTimes = Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.times").trim());
                flinkFixedDelayRestartInterval = Integer.parseInt(prop.getProperty("flink.fixedDelayRestart.interval").trim());
    
                rockeqMqSourceParallelism = Integer.parseInt(prop.getProperty("flink.rockeqmq.source.parallelism").trim());
                redisSinkParallelism = Integer.parseInt(prop.getProperty("flink.redis.sink.parallelism").trim());
    
                redisDefaultExpirationTime = Integer.parseInt(prop.getProperty("redis.default.expiration.time").trim());
    
                in.close();
                in = null;
            } catch (Exception e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    
        public static PropertiesUtils getInstance() {
            if (instance == null) {
                instance = new PropertiesUtils();
            }
            return instance;
        }
    
        public Integer getFlinkCheckpointsInterval() {
            return flinkCheckpointsInterval;
        }
    
        public Integer getFlinkMinPauseBetweenCheckpoints() {
            return flinkMinPauseBetweenCheckpoints;
        }
    
        public Integer getFlinkCheckpointTimeout() {
            return flinkCheckpointTimeout;
        }
    
        public Integer getFlinkMaxConcurrentCheckpoints() {
            return flinkMaxConcurrentCheckpoints;
        }
    
        public String getRocketmqNameServer() {
            return rocketmqNameServer;
        }
    
        public String getRocketMqTopics() {
            return rocketMqTopics;
        }
    
        public Integer getRockeqMqSourceParallelism() {
            return rockeqMqSourceParallelism;
        }
    
        public Integer getRedisSinkParallelism() {
            return redisSinkParallelism;
        }
    
        public Integer getFlinkFixedDelayRestartTimes() {
            return flinkFixedDelayRestartTimes;
        }
    
        public Integer getFlinkFixedDelayRestartInterval() {
            return flinkFixedDelayRestartInterval;
        }
    
        public Integer getRedisDefaultExpirationTime() {
            return redisDefaultExpirationTime;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125

    1.31.23.RedisUtil.java

    package com.xxxxx.issue.utils;
    
    import com.xxxxx.tmc.cache.service.impl.TqCacheServiceImpl;
    import com.xxxxx.tmc.commons.constant.CacheLevel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author tuzuoquan
     * @version 1.0
     * @ClassName RedisUtil
     * @description TODO
     * @date 2020/10/28 16:06
     **/
    public final class RedisUtil {
        private static Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
        private static TqCacheServiceImpl cacheService = new TqCacheServiceImpl("");
    
        public static TqCacheServiceImpl getCacheServiceInstance() {
    //        if (null != cacheService) {
    //            return cacheService;
    //        }
    //
    //        cacheService = new TqCacheServiceImpl("");
            return cacheService;
        }
    
        public static void main(String[] args) throws InterruptedException {
    
            TqCacheServiceImpl cacheService = RedisUtil.getCacheServiceInstance();
    //        cacheService.set("issue:test",2);
    //        cacheService.set("issue:2:20210108:000000:0:3:1.1.10.",2);
    //        cacheService.set("issue:2:20210108:000000:0:4:1.1.10.1.",23);
    //        LOG.info("=================================");
    //        LOG.info(cacheService.get("issue:2:20210108:000000:0:4:1.1.10.1.").toString());
    //        for(int i = 0 ; i < 10000; ++i) {
    //            cacheService.set("issue:test",i);
    //            System.out.println(cacheService.get("issue:test"));
    //        }
            LOG.info("=================================");
            String code = "issue:2:20210113:000000:0:4:1.1.10.1.";
            System.out.println(cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE));
            LOG.info("=================================");
    
            //cacheService.destroy();
    
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    1.31.24.IssueConstants.java

    package com.xxxxx.issue.constant;
    
    /**
     * @author tuzuoquan
     * @version 1.0
     * @ClassName IssueConstants
     * @description TODO
     * @date 2020/9/21 8:55
     **/
    public final class IssueConstants {
        /** 编码的前缀 **/
        public static final String ISSUE_CODE_PREFIX = "issue:";
        public static final String ISSUE_CODE_COLON = ":";
        public static final String TOPIC_SPLITTER = "##";
    
        /** 处理维度 **/
        /** 部门以及下辖 **/
        public static final String DIMENSION_DEPARTMENT_ALL = "0";
        /** 仅仅是自己处理的 **/
        public static final String DIMENSION_DEPARTMENT_ONLY = "1";
        /** 单人 **/
        public static final String DIMENSION_DEPARTMENT_USER = "2";
    
        /** 1、事件受理数 **/
        public static final String ROCKETMQ_ACCEPT_CONSUMER_GROUP_1 = "group_1";
        public static final String ROCKETMQ_ACCEPT_TAG = "issue_accept_operat";
        /** 指标类型 **/
        public static final String ISSUE_ACCEPT_TYPE = "1";
    
        /** 2、发生事件数 **/
        public static final String ROCKETMQ_HAPPEN_CONSUMER_GROUP_2 = "group_2";
        public static final String ROCKETMQ_HAPPEN_TAG = "issue_accept_operat || issue_add_operat || issue_delete_operat";
        /** 指标类型 **/
        public static final String ISSUE_HAPPEN_TYPE = "2";
    
        /** 3、事件办结的TAG **/
        public static final String ROCKETMQ_PASS_COMPLETE_CONSUMER_GROUP_3 = "group_3";
        public static final String ROCKETMQ_PASS_COMPLETE_TAG = "issue_inspect_pass_operat || issue_complete_operat";
        /** 指标类型 **/
        public static final String ISSUE_PASS_COMPLETE_TYPE = "3";
    
        /** 4、签收件次 **/
        public static final String ROCKETMQ_SIGNFOR_CONSUMER_GROUP_4 = "group_4";
        public static final String ROCKETMQ_SIGNFOR_TAG = "issue_signfor_operat";
        /** 指标类型 **/
        public static final String ISSUE_SIGNFOR_TYPE = "4";
    
        /** 5、处置件次的TAG **/
        public static final String ROCKETMQ_HANDLE_CONSUMER_GROUP_5 = "group_5";
        public static final String ROCKETMQ_HANDLE_TAG = "issue_comment_operat || issue_assignReply_operat || issue_complete_operat "
                + " || issue_report_operat || issue_assign_operat";
        /** 指标类型 **/
        public static final String ISSUE_HANDLE_TYPE = "5";
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    1.31.25.IssueAcceptRedisSink.java

    package com.xxxxx.issue.redissink;
    
    import com.xxxxx.issue.constant.IssueConstants;
    import com.xxxxx.issue.utils.PropertiesUtils;
    import com.xxxxx.tmc.cache.service.impl.TqCacheServiceImpl;
    import com.xxxxx.tmc.commons.constant.CacheLevel;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    
    /**
     * @author tuzuoquan
     * @version 1.0
     * @ClassName IssuePassRedisSink
     * @description TODO
     * @date 2020/9/22 14:49
     **/
    public class IssueAcceptRedisSink extends RichSinkFunction<Tuple4<String, Integer,String, String>> {
        private static final Logger LOG = LoggerFactory.getLogger(IssueAcceptRedisSink.class);
        TqCacheServiceImpl cacheService;
    
        public IssueAcceptRedisSink() {
            //this.cacheService = RedisUtil.getCacheServiceInstance();
    //      this.cacheService = new TqCacheServiceImpl("");
        }
    
        @Override
        public void invoke(Tuple4<String, Integer,String, String> input) {
            try {
                generateKeyAndData(input.f0,input.f1,input.f2,input.f3);
            } catch (Exception e) {
                LOG.error("处理受理数出错,错误信息是: ", e.getMessage());
                throw e;
            }
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            try {
                LOG.info("=======================cacheService init=============================");
                this.cacheService = new TqCacheServiceImpl("");
                LOG.info("=======================cacheService end=============================");
            } catch (Exception var3) {
                LOG.error("Redis has not been properly initialized: ", var3);
                throw var3;
            }
        }
    
        @Override
        public void close() throws IOException {
            try {
                LOG.info("=======================cacheService close start =============================");
                this.cacheService.destroy();
                this.cacheService = null;
                LOG.info("=======================cacheService close end  =============================");
            } catch (Exception e) {
                LOG.error("Redis cacheService has been destroy");
            }
        }
    
        private synchronized void generateKeyAndData(String handlerOrgCode,Integer operatType,String dayTime,String tenantId) {
            if (StringUtils.isNotBlank(handlerOrgCode)) {
                String[] codeSegment = handlerOrgCode.split("\\.");
                int length = codeSegment.length;
    
                //部门仅自己维度
                String theirOwnCode = new StringBuilder(IssueConstants.ISSUE_CODE_PREFIX)              //表示事件
                        .append(IssueConstants.ISSUE_ACCEPT_TYPE)                  //受理数
                        .append(IssueConstants.ISSUE_CODE_COLON)
                        .append(dayTime)                                       //日期:类似20200918
                        .append(IssueConstants.ISSUE_CODE_COLON)
                        .append(tenantId)                                       // 租户id  0-9
                        .append(IssueConstants.ISSUE_CODE_COLON)
                        .append(IssueConstants.DIMENSION_DEPARTMENT_ONLY)      //仅自己维度
                        .append(IssueConstants.ISSUE_CODE_COLON)
                        .append(length)                                        //层级
                        .append(IssueConstants.ISSUE_CODE_COLON)
                        .append(handlerOrgCode)                                //自己的这个code
                        .toString();
                Long theirOwnNum = 1L;
                // redis 是否存在 这个code
                if (null != this.cacheService.getWithCacheLevel("default",theirOwnCode,CacheLevel.REMOTE)) {
                    theirOwnNum = (Long) this.cacheService.getWithCacheLevel("default",theirOwnCode,CacheLevel.REMOTE) + 1;
    
                    LOG.info("theirOwnNum=" + theirOwnNum);
                }
                //this.cacheService.set(theirOwnCode,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(),theirOwnNum);
                this.cacheService.setWithCacheLevel("default", theirOwnCode,
                        PropertiesUtils.getInstance().getRedisDefaultExpirationTime(), theirOwnNum,
                        CacheLevel.REMOTE);
    
                for (int level = 1; level <= length; level++) {
                    //issue:type:date:dimension:level:code
                    String codePrefix = new StringBuilder(IssueConstants.ISSUE_CODE_PREFIX)      //表示事件
                            .append(IssueConstants.ISSUE_ACCEPT_TYPE)                            //受理数 指标类型
                            .append(IssueConstants.ISSUE_CODE_COLON)
                            .append(dayTime)                                                     //日期:类似20200918
                            .append(IssueConstants.ISSUE_CODE_COLON)
                            .append(tenantId)                                                   // 租户id  0-9
                            .append(IssueConstants.ISSUE_CODE_COLON)
                            .append(IssueConstants.DIMENSION_DEPARTMENT_ALL)                     //部门
                            .append(IssueConstants.ISSUE_CODE_COLON)
                            .append(level)                                                       //层级
                            .append(IssueConstants.ISSUE_CODE_COLON)
                            .toString();
                    StringBuilder codeSuffix = new StringBuilder();
                    for (int j = 0; j < level; j++) {
                        if (StringUtils.isBlank(codeSuffix.toString())) {
                            codeSuffix.append(codeSegment[j]);
                            continue;
                        }
                        codeSuffix.append(".").append(codeSegment[j]);
                    }
                    codeSuffix.append(".");
    
                    String code = codePrefix + codeSuffix.toString();
                    LOG.info(code);
    
                    Long num = 1L;
                    if (null != this.cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE)) {
    
    
                        num = (Long) this.cacheService.getWithCacheLevel("default",code,CacheLevel.REMOTE) + 1;
                    }
                    //this.cacheService.set(code,PropertiesUtils.getInstance().getRedisDefaultExpirationTime(),num);
                    this.cacheService.setWithCacheLevel("default", code,
                            PropertiesUtils.getInstance().getRedisDefaultExpirationTime(), num,
                            CacheLevel.REMOTE);
                }
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138

    1.31.26.IssueAcceptFlinkHandlerByCustomRedisSink.java

    package com.xxxxx.issue.flink.handler;
    
    import com.alibaba.fastjson.JSON;
    import com.xxxxx.doraemon.service.issue.vo.IssueSyncMessageBodyVO;
    import com.xxxxx.issue.constant.IssueConstants;
    import com.xxxxx.issue.redissink.IssueAcceptRedisSink;
    import com.xxxxx.issue.utils.DateUtils;
    import com.xxxxx.issue.utils.PropertiesUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.tuple.Tuple4;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.rocketmq.flink.RocketMQConfig;
    import org.apache.rocketmq.flink.RocketMQSource;
    import org.apache.rocketmq.flink.common.serialization.SimpleKeyValueDeserializationSchema;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Date;
    import java.util.Map;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author jun
     * @version 1.1
     * @ClassName IssueAcceptFlinkHandlerByCustomRedisSink
     * @description 今日"受理数"实时处理    type = 1
     *     含义:预受理中心受理为事件的数量
     *     区域范围:本部门、本级及下辖(含同级职能部门)
     *     时间区间:今日
     *     指标取值:TAG:issue_accept_operat   operatType:61
     *
     * code规则定义:
     * issue:type:date:tenant_id:dimension:level:code
     * issue:              表示事件
     * type                指标类型  1:受理数   2:发生事件数   3:办结数
     * date                时间,日期:类似20200918
     * tenant_id           租户的id
     * dimension           0:部门   1:自己
     * level               层级
     * code                最后一段的code
     *
     * 有code的为6段时:
     *     部门维度 (dimension为数字标识0)      6段值     issue:type:date:dimension:level:code
     *     仅仅自己 (dimension为数字标识1)      1个key    issue:type:date:dimension:level:code
     *
     * 带用户id的:issue:type:date:dimension:userid       key为id:code (当前的受理数没有单人的统计)
     *     单人dimension的值为2
     *
     * @date 2020/9/14 16:37
     **/
    public class IssueAcceptFlinkHandlerByCustomRedisSink {
        private static final Logger LOG = LoggerFactory.getLogger(IssueAcceptFlinkHandlerByCustomRedisSink.class);
        private static final Integer OPERATTYPE_1 = 61;
    
        public static void main(String[] args) {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            PropertiesUtils instance = PropertiesUtils.getInstance();
    
            //重启策略之固定间隔 (Fixed delay)
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                    instance.getFlinkFixedDelayRestartTimes(),
                    Time.of(instance.getFlinkFixedDelayRestartInterval(), TimeUnit.MINUTES)));
    
            //设置间隔多长时间产生checkpoint
            env.enableCheckpointing(instance.getFlinkCheckpointsInterval());
            //设置模式为exactly-once (这是默认值)
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            //确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(instance.getFlinkMinPauseBetweenCheckpoints());
            //检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
            env.getCheckpointConfig().setCheckpointTimeout(instance.getFlinkCheckpointTimeout());
            //同一时间只允许进行一个检查点
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(instance.getFlinkMaxConcurrentCheckpoints());
            // 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】
            env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    
            Properties consumerProps = new Properties();
            consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, instance.getRocketmqNameServer());
            consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, IssueConstants.ROCKETMQ_ACCEPT_CONSUMER_GROUP_1);
            consumerProps.setProperty(RocketMQConfig.CONSUMER_TAG, IssueConstants.ROCKETMQ_ACCEPT_TAG);
    
            String topic = instance.getRocketMqTopics();
            if (StringUtils.isBlank(topic)) {
                return;
            }
            String[] topics = topic.split(IssueConstants.TOPIC_SPLITTER);
            //合并所有source后的Source
            DataStream<Map> finalDataStreamSource = null;
            for(String topicItem : topics) {
                consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, topicItem.trim());
                DataStream<Map> dataStreamSource = env.addSource(new RocketMQSource(new SimpleKeyValueDeserializationSchema("id", "issueInfo"), consumerProps));
                finalDataStreamSource = (null == finalDataStreamSource) ? dataStreamSource : finalDataStreamSource.union(dataStreamSource);
            }
    
            if (null == finalDataStreamSource) {
                return;
            }
    
            SingleOutputStreamOperator<Tuple4<String,Integer,String,String>> mainDataStream = finalDataStreamSource
                    .process(new ProcessFunction<Map, Tuple4<String,Integer,String,String>>() {
    
                        @Override
                        public void processElement(Map in, Context ctx, Collector<Tuple4<String,Integer,String,String>> out) throws Exception {
                            String issueInfo = in.get("issueInfo").toString();
                            LOG.info(issueInfo);
                            try {
                                IssueSyncMessageBodyVO issueMsgVO = JSON.parseObject(issueInfo,IssueSyncMessageBodyVO.class);
    
                                //当天日期
                                String dayTime = DateUtils.dateFormat(new Date(),DateUtils.PATTERN_DATE_SIMPLE);
    
                                //处理key
                                String handlerOrgCode = null;
                                Integer operatType = null;
                                String tenantId = null;
                                if (null != issueMsgVO && null != issueMsgVO.getBody()) {
                                    // 处理方组织code
                                    handlerOrgCode = issueMsgVO.getBody().getHandleOrgCode();
                                    operatType = issueMsgVO.getOperatType();
                                    tenantId = issueMsgVO.getBody().getTenantId();
                                }
                                if (null != operatType && 0 == operatType.compareTo(IssueAcceptFlinkHandlerByCustomRedisSink.OPERATTYPE_1)) {
                                    //generateKeyAndData(handlerOrgCode,operatType,dayTime,out);
                                    Tuple4<String,Integer,String,String> outVal = new Tuple4<>(handlerOrgCode,operatType,dayTime,tenantId);
                                    out.collect(outVal);
                                }
                            } catch (Exception e) {
                                LOG.error("消费事件信息出错,错误的事件是:{},错误信息:{}",issueInfo, e);
                            }
    
                        }
    
                    }).name("issueAccept-mq-source")
                    .uid("issueAccept")
                    .setParallelism(instance.getRockeqMqSourceParallelism());
    
            //创建redis的配置
            mainDataStream.setParallelism(instance.getRedisSinkParallelism());
            mainDataStream.addSink(new IssueAcceptRedisSink())
                    .name("IssueAcceptRedisSink")
                    .uid("IssueAcceptRedisSink");
    
            try {
                String jobName = null;
                if (args.length == 0) {
                    jobName = IssueAcceptFlinkHandlerByCustomRedisSink.class.getSimpleName();
                } else {
                    jobName = args[0];
                }
    
                env.execute(jobName);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172

    1.32.Flink其它案例

    1.32.1.使用DataGen生成数据

    package com.toto.demo.test;
    
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.datagen.DataGenerator;
    import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
    import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator;
    import org.apache.flink.table.data.RowData;
    import org.apache.flink.table.factories.datagen.types.RowDataGenerator;
    
    public class DataGeneratorSourceDemo {
    
        public static void main(String[] args) {
            test1();
        }
    
        private static void test1() {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            String[] fieldNames = new String[] { "id", "state", "score" };
            DataGenerator<?>[] fieldGenerators = new DataGenerator[] { //
                    RandomGenerator.intGenerator(0, 100), //
                    RandomGenerator.booleanGenerator(), //
                    RandomGenerator.intGenerator(0, 100) //
            };
            //第一列是DataGenerator对应的数组,第二列字段名称
            RowDataGenerator rowDataGenerator = new RowDataGenerator(fieldGenerators, fieldNames);
            DataStreamSource<RowData> source =
                    //DataGeneratorSource中的第一个参数是RowDataGenerator,第二个参数是间隔时间,第三个参数是数据条数
                    env.addSource(new DataGeneratorSource<>(rowDataGenerator, 10, 20L), TypeInformation.of(RowData.class))
                            .setParallelism(1);
            source.print().setParallelism(2);
            try {
                env.execute();
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    输出结果:
    1> +I(73,true,43)
    2> +I(35,true,99)
    1> +I(44,false,19)
    2> +I(47,true,8)
    1> +I(93,false,37)
    2> +I(38,false,79)
    1> +I(40,false,16)
    2> +I(70,false,57)
    1> +I(78,false,50)
    2> +I(57,false,71)
    1> +I(58,false,56)
    2> +I(78,true,68)
    1> +I(78,true,67)
    2> +I(51,true,3)
    1> +I(22,false,89)
    2> +I(83,false,0)
    1> +I(42,false,32)
    2> +I(74,false,18)
    1> +I(99,true,73)
    2> +I(84,false,89)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    1.32.2.使用value state进行存储临时数据

    package com.toto.demo.test;
    
    import org.apache.flink.api.common.state.ValueState;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    
    public class StateDemo {
    
        private static final List<Integer> data = new ArrayList<>(Arrays.asList(1, 2, 3));
    
        public static void main(String[] args) {
            state();
        }
    
        public static void state() {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.fromCollection(data, TypeInformation.of(Integer.class)).keyBy(v -> v % 2)
                .process(new KeyedProcessFunction<Integer, Integer, Integer>() {
                    private ValueState<Integer> sumState;
    
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        ValueStateDescriptor<Integer> vsdesc = new ValueStateDescriptor<>("sum", Integer.class);
                        sumState = getRuntimeContext().getState(vsdesc);
                    }
    
                    @Override
                    public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                        int sum = sumState.value() == null ? 0 : sumState.value();
                        System.out.println("oldSum:\t" + sum);
                        System.out.println("value:\t" + value);
                        sum += value;
                        sumState.update(sum);
                        out.collect(sum);
                    }
                }).print().setParallelism(2);
            try {
                System.out.println(env.getExecutionPlan());
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
  • 相关阅读:
    EasyGBS如何解决对讲功能使用异常?
    NodeJs实战-Express构建照片存储网站(2)-上传、展示文件
    软件测试基础教程学习1
    Norgen AAV提取剂盒说明书(含特色)
    ESlint配合Prettier完成代码风格配置
    西门子S7-200SMART 通过向导实现S7通信的具体组态步骤示例
    视频剪辑技巧:如何高效地将多个视频合并成一个新视频
    基于C#制作一个桌面宠物
    Redisson 高性能 Redis 分布式锁源码分析
    竞赛 基于机器视觉的银行卡识别系统 - opencv python
  • 原文地址:https://blog.csdn.net/toto1297488504/article/details/125629723