• flinksql kafka到mysql累计指标练习


    flinksql 累计指标练习

    数据流向:kafka ->kafka ->mysql

    模拟写数据到kafka topic:wxt中

    import com.alibaba.fastjson.JSONObject;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    public class KafkaProducerExample {
        public static void main(String[] args) throws Exception {
            // 设置kafka服务器地址和端口号
            String kafkaServers = "localhost:9092";
    
            // 设置producer属性
            Properties properties = new Properties();
            properties.put("bootstrap.servers", kafkaServers);
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            // 创建Kafka producer对象
            KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    
            // 发送消息
            String topic = "wxt";
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("id", 9);
            jsonObject.put("name", "王大大");
            jsonObject.put("age", 11);
    
            // 将JSON对象转换成字符串
            String jsonString = jsonObject.toString();
    
            // 输出JSON字符串
            System.out.println("JSON String: " + jsonString);
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, jsonString);
            producer.send(record);
    
            // 关闭producer
            producer.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    kafka topic :wxt1
    在这里插入图片描述

    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    
    
    public class KafkaToMysqlJob {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            TableEnvironment tEnv = TableEnvironment.create(settings);
    
            // 定义Kafka连接属性
            String kafkaBootstrapServers = "localhost:9092";
            String kafkaTopic = "wxt";
            String groupId = "wxt1";
    
            // 注册Kafka表
            tEnv.executeSql("CREATE TABLE kafka_table (\n" +
                    "  id INT,\n" +
                    "  name STRING,\n" +
                    "  age INT,\n" +
                    "  proctime as PROCTIME()\n" +
                    ") WITH (\n" +
                    "  'connector' = 'kafka',\n" +
                    "  'topic' = '" + kafkaTopic + "',\n" +
                    "  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +
                    "  'properties.group.id' = '" + groupId + "',\n" +
                    "  'format' = 'json',\n" +
                    "  'scan.startup.mode' = 'earliest-offset'\n" +
                    ")");
    
            // 注册Kafka表
            // latest-offset
            //earliest-offset
            tEnv.executeSql("CREATE TABLE kafka_table2 (\n" +
                    "  window_start STRING,\n" +
                    "  window_end STRING,\n" +
                    "  name STRING,\n" +
                    "  age INT\n" +
                    ") WITH (\n" +
                    "  'connector' = 'kafka',\n" +
                    "  'topic' = 'wxt2',\n" +
                    "  'properties.bootstrap.servers' = '" + kafkaBootstrapServers + "',\n" +
                    "  'properties.group.id' = 'kafka_table2',\n" +
                    "  'format' = 'json',\n" +
                    "  'scan.startup.mode' = 'latest-offset',\n" +
                    "  'value.format' = 'csv'\n" +
                    ")");
    
    
            tEnv.executeSql("CREATE TABLE mysql_sink_table (\n" +
                    "  window_start String,\n" +
                    "   window_end String,\n" +
                    "    name String,\n" +
                    "    age INT\n" +
                    ") WITH (\n" +
                    "   'connector' = 'jdbc',\n" +
                    "   'url' = 'jdbc:mysql://localhost:3306/tests?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',\n" +
                    "   'username' = 'root',\n" +
                    "   'password' = '12345678',\n" +
                    "   'table-name' = 'leiji_age'\n" +
                    ")");
    
             tEnv.executeSql("insert into kafka_table2 select cast(window_start as string) as window_start,cast(window_end as string) as window_end,name,sum(age) as age\n" +
                     "from TABLE( CUMULATE( TABLE kafka_table, DESCRIPTOR(proctime), INTERVAL '20' SECOND, INTERVAL '1' DAY))\n" +
                     "group by  window_start,window_end,name");
    
            tEnv.executeSql("insert into mysql_sink_table select window_start,window_end,name,age from kafka_table2");
    
            env.execute("KafkaToMysqlJob");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    kafka topic :wxt2
    在这里插入图片描述
    mysql结果数据:
    在这里插入图片描述
    pom文件

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>org.examplegroupId>
        <artifactId>flinksqlartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <properties>
            <maven.compiler.source>8maven.compiler.source>
            <maven.compiler.target>8maven.compiler.target>
            <scala.binary.version>2.12scala.binary.version>
            <flink.version>1.14.3flink.version>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-api-java-bridge_${scala.binary.version}artifactId>
                <version>${flink.version}version>
                
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-planner_${scala.binary.version}artifactId>
                <version>${flink.version}version>
                
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
                <version>${flink.version}version>
                
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-table-commonartifactId>
                <version>${flink.version}version>
                
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clients_${scala.binary.version}artifactId>
                <version>${flink.version}version>
            dependency>
            
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
                <version>1.18.22version>
                <scope>providedscope>
            dependency>
            
            <dependency>
                <groupId>com.alibabagroupId>
                <artifactId>fastjsonartifactId>
                <version>1.2.73version>
            dependency>
            <dependency>
                <groupId>org.slf4jgroupId>
                <artifactId>slf4j-simpleartifactId>
                <version>1.7.15version>
            dependency>
    
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-csvartifactId>
                <version>${flink.version}version>
            dependency>
    
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-jsonartifactId>
                <version>${flink.version}version>
            dependency>
    
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-jsonartifactId>
                <version>${flink.version}version>
            dependency>
    
            <dependency>
                <groupId>org.slf4jgroupId>
                <artifactId>slf4j-simpleartifactId>
                <version>1.7.15version>
            dependency>
    
            <dependency>
                <groupId>org.apache.commonsgroupId>
                <artifactId>commons-math3artifactId>
                <version>3.5version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
                <version>${flink.version}version>
            dependency>
    
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-connector-jdbc_2.11artifactId>
                <version>${flink.version}version>
            dependency>
            <dependency>
                <groupId>com.mysqlgroupId>
                <artifactId>mysql-connector-jartifactId>
                <version>8.0.31version>
            dependency>
            <dependency>
                <groupId>mysqlgroupId>
                <artifactId>mysql-connector-javaartifactId>
                <version>5.1.38version>
            dependency>
    
            
            <dependency>
                <groupId>commons-iogroupId>
                <artifactId>commons-ioartifactId>
                <version>2.11.0version>
            dependency>
    
            <dependency>
                <groupId>org.antlrgroupId>
                <artifactId>antlr-runtimeartifactId>
                <version>3.5.2version>
            dependency>
    
    
            <dependency>
                <groupId>org.apache.thriftgroupId>
                <artifactId>libfb303artifactId>
                <version>0.9.3version>
            dependency>
    
            <dependency>
                <groupId>org.slf4jgroupId>
                <artifactId>slf4j-simpleartifactId>
                <version>1.7.15version>
            dependency>
    
            <dependency>
                <groupId>org.slf4jgroupId>
                <artifactId>slf4j-log4j12artifactId>
                <version>1.7.7version>
                <scope>runtimescope>
            dependency>
            <dependency>
                <groupId>log4jgroupId>
                <artifactId>log4jartifactId>
                <version>1.2.17version>
                <scope>runtimescope>
            dependency>
    
        dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-compiler-pluginartifactId>
                    <version>3.5.1version>
                    <configuration>
                        <source>1.8source>
                        <target>1.8target>
                    configuration>
                plugin>
    
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-shade-pluginartifactId>
                    <version>3.1.1version>
                    <configuration>
                        
                    configuration>
                    <executions>
                        <execution>
                            <phase>packagephase>
                            <goals>
                                <goal>shadegoal>
                            goals>
                        execution>
                    executions>
                plugin>
    
                <plugin>
                    <groupId>org.apache.maven.pluginsgroupId>
                    <artifactId>maven-dependency-pluginartifactId>
                    <version>2.10version>
                    <executions>
                        <execution>
                            <id>copy-dependenciesid>
                            <phase>packagephase>
                            <goals>
                                <goal>copy-dependenciesgoal>
                            goals>
                            <configuration>
                                <outputDirectory>${project.build.directory}/liboutputDirectory>
                            configuration>
                        execution>
                    executions>
                plugin>
                <plugin>
                    <groupId>org.scala-toolsgroupId>
                    <artifactId>maven-scala-pluginartifactId>
                    <version>2.15.2version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compilegoal>
                                <goal>testCompilegoal>
                            goals>
                        execution>
                    executions>
                plugin>
            plugins>
        build>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198
    • 199
    • 200
    • 201
    • 202
    • 203
    • 204
    • 205
    • 206
    • 207
    • 208
    • 209
    • 210
    • 211
    • 212
    • 213
    • 214
    • 215
    • 216
    • 217
    • 218
    • 219
    • 220
    • 221
  • 相关阅读:
    WebMagic轻量级爬虫框架实战-根据关键词爬取某网站数据
    深度学习Course5第四周Transformers习题整理
    性能测试知识科普(二)
    在x86虚拟机搭建arm64交叉编译环境记录
    树莓派 RaspBerryPi 网络配置相关与改造usb网络摄像头
    CF1535F String Distance
    每日刷题|回溯法解决全排列问题
    LNMP架构
    【虹科干货】Redis Enterprise 自动分层技术:大数据集高性能解决方案
    Python--使用selenium通过chromedriver调用谷歌浏览器
  • 原文地址:https://blog.csdn.net/weixin_47699191/article/details/134059249