• flink重温笔记(十六): flinkSQL 顶层 API ——实时数据流结合外部系统


    Flink学习笔记

    前言:今天是学习 flink 的第 16 天啦!学习了 flinkSQL 与企业级常用外部系统结合,主要是解决大数据领域数据计算后,写入到文件,kafka,还是mysql等 sink 的问题,即数据计算完后保存到哪里的问题!结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

    Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

    喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


    二、FlinkSQL 连接外部系统

    1. 输出到文件

    例子:将表结果输出到文件系统中

    package cn.itcast.day01.sink;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.ConnectTableDescriptor;
    import org.apache.flink.table.descriptors.Csv;
    import org.apache.flink.table.descriptors.FileSystem;
    import org.apache.flink.table.descriptors.Schema;
    
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.*;
    
    
    /**
     * @author lql
     * @time 2024-03-12 15:25:14
     * @description TODO
     */
    public class FsSinkTest {
        public static void main(String[] args) throws Exception {
            // todo 1) 配置 table 环境
            // 1. 配置流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 2. 配置设置环境
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
    
            // 3. 配置表环境
            StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
    
            // todo 2) 从文件中读取数据
            String filePath = FsSinkTest.class.getClassLoader().getResource("order.csv").getPath();
            DataStreamSource<String> inputStream = env.readTextFile(filePath);
    
            // todo 3) 将表数据转化类型
            SingleOutputStreamOperator<OrderInfo> stream = inputStream.map(new MapFunction<String, OrderInfo>() {
                @Override
                public OrderInfo map(String data) throws Exception {
                    String[] dataArrary = data.split(",");
                    return new OrderInfo(
                            dataArrary[0],
                            dataArrary[1],
                            Double.parseDouble(dataArrary[2]),
                            dataArrary[3]);
                }
            });
    
            // todo 4) 将数据流转化为表
            Table table = bsTableEnv.fromDataStream(stream);
    
            // todo 5) 调用 api 方式
            Table result = table
                    .select($("id"), $("timestamp"), $("money"), $("category"))
                    .filter($("category").isEqual("电脑"));
    
            // todo 6) 将表转化为流打印
            bsTableEnv.toAppendStream(result, Row.class).print("结果数据>>>");
    
            // todo 7) 将查询的结果写入到文件中
            ConnectTableDescriptor connectTableDescriptor = bsTableEnv
                    .connect(new FileSystem().path("D:\\IDEA_Project\\BigData_Java\\flinksql_pro\\data\\output\\order.txt"))
                    .withFormat(new Csv())
                    .withSchema(
                            new Schema()
                                    .field("id", DataTypes.STRING())
                                    .field("name", DataTypes.STRING())
                                    .field("money", DataTypes.DOUBLE())
                                    .field("category", DataTypes.STRING())
                    );
    
            // todo 8) 将通过connect创建的输出文件注册为表对象
            connectTableDescriptor.createTemporaryTable("outputOrder");
    
            // todo 9) 将表查询的结果插入到临时表中
            table.executeInsert("outputOrder");
    
            // todo 10) 执行程序
            env.execute();
        }
    
        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        public static class OrderInfo {
            private String id;
            private String timestamp;
            private Double money;
            private String category;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105

    结果:在 output 目录下生成了一个 order.txt 的文件

    总结:

    • 1- connect 方法:存储在哪里,存储什么地方,存储什么格式
    • 2- createTemporaryTable():创建临时表
    • 3- 将之前结果表插入到临时表中

    2. 更新模式(Update Mode)

    2.1 追加模式(Append Mode)
    • 表(动态表)和外部连接器只交换插入(Insert)消息。

    2.2 撤回模式(Retract Mode)
    • 表和外部连接器交换的是添加(Add)和撤回(Retract)消息。
    • 应用场景:
      • 插入数据时,它会被编码为添加消息。
      • 删除数据时,它会被编码为撤回消息;
      • 更新数据时,会先发送一个已更新行的撤回消息,然后再发送一个更新行的添加消息。
    • 这种模式允许对表中的数据进行修改和删除,但需要注意的是,它不能定义key

    2.3 更新插入模式(Upsert Mode)
    • 动态表和外部连接器交换 Upsert 和 Delete 消息。
    • 这种模式需要一个唯一的 key,通过这个 key 可以传递更新消息。
    • 应用场景:
      • 插入数据时,它会使用 Upsert 消息。
      • 删除数据时,它会使用 Delete 消息。
      • 更新数据时,它也会使用 Upsert 消息,并通过 key 来标识要更新的行。
    • 这种模式在效率上更高,因为它只需要发送一条消息即可完成更新操作。

    案例演示:从 kafka 读取数据,实时聚合操作,撤回模式

    package cn.itcast.day01.sink;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.Csv;
    import org.apache.flink.table.descriptors.Kafka;
    import org.apache.flink.table.descriptors.Schema;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.*;
    
    /**
     * @author lql
     * @time 2024-03-12 17:33:03
     * @description TODO
     */
    public class KafkaSinkTest {
        public static void main(String[] args) throws Exception {
            // todo 1) 初始化 flinkSQL 环境
            // 1.1 配置流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 1.2 配置setting环境
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
    
            // 1.3 配置表表环境
            StreamTableEnvironment tbEnv = StreamTableEnvironment.create(env, bsSettings);
    
            // todo 2) 读取 kafka 数据源
            tbEnv.connect(
                    new Kafka()
                    .version("universal") // 指定 kafka 版本
                    .topic("order") // 定义主题
                    .property("bootstrap.servers","node1:9092")
            ).withFormat(new Csv())
                    .withSchema(new Schema()
                            .field("id", DataTypes.STRING())
                            .field("timestamp", DataTypes.STRING())
                            .field("money", DataTypes.DOUBLE())
                            .field("category", DataTypes.STRING())
                            .field("pt", DataTypes.TIMESTAMP(3))
                            // 使用 protime,指定字段名定义处理时间字段
                            // 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema
                            .proctime()
                    ).createTemporaryTable("kafkaInputTable");
    
            // todo 3) 表的查询操作
            // 3.1 通过表环境获取到数据表:from
            Table orderTable = tbEnv.from("kafkaInputTable");
    
            // 3.2 将表转为 stream 后打印
            tbEnv.toAppendStream(orderTable, Row.class).print("Table API>>>>");
    
            // 3.2 调用 table api 进行聚合操作
            Table aggResultTable = orderTable.groupBy($("category"))
                    .select($("category"),
                            $("money").sum().as("totalMoney"),
                            $("id").count().as("cnt")
                    );
    
            tbEnv.toRetractStream(aggResultTable,Row.class).print("agg result:>>>");
    
            // todo 4) 启动程序,将数据写入到kafka的时候,可以不加execute代码
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    结果:显示 false 即撤回,显示 true 即添加

    Table API>>>>> +I[user_001, 1621718199, 10.1, 电脑, 2024-03-12T10:42:18.335Z]
    agg result:>>>> (true,+I[电脑, 10.1, 1])
    Table API>>>>> +I[user_001, 1621718201, 14.1, 手机, 2024-03-12T10:42:33.626Z]
    agg result:>>>> (true,+I[手机, 14.1, 1])
    Table API>>>>> +I[user_002, 1621718202, 82.5, 手机, 2024-03-12T10:42:50.130Z]
    agg result:>>>> (false,-U[手机, 14.1, 1])
    agg result:>>>> (true,+U[手机, 96.6, 2])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    总结:

    • 实时聚合操作结果不可以简单 toAppendStream 打印,需要使用更新模式toRetractStream
    • 这种聚合结果更新操作暂时不适合写入 kafka!

    3. 写入到 Kafka

    例子:将查询的结果数据写入到 kafka 中

    package cn.itcast.day01.sink;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.DataTypes;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.Csv;
    import org.apache.flink.table.descriptors.Kafka;
    import org.apache.flink.table.descriptors.Schema;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.*;
    
    /**
     * @author lql
     * @time 2024-03-12 19:29:56
     * @description TODO
     */
    public class KafkaSinkTest1 {
        public static void main(String[] args) throws Exception {
            // Todo 1) 配置 flink SQL 环境
            // 1. 配置流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 2. 配置 flink settings 环境
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
    
            // 3. 配置表环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
    
            // Todo 2) kafka 数据源
            tableEnv.connect(new Kafka()
                    .version("universal")
                    .topic("order")
                    .property("bootstrap.servers","node1:9092")
            ).withFormat(new Csv())
                    .withSchema(new Schema()
                            .field("id", DataTypes.STRING())
                            .field("timestamp", DataTypes.STRING())
                            .field("money", DataTypes.DOUBLE())
                            .field("category", DataTypes.STRING())
                            .field("pt", DataTypes.TIMESTAMP(3))
                            // 使用 protime,指定字段名定义处理时间字段
                            // 这个 proctime 属性只能通过附加逻辑字段,进行扩展物理 schema
                            .proctime()
                    ).createTemporaryTable("kafkaInputTable");
    
            // Todo 3) table API方式提取数据
            // 3.1 通过表环境获取数据表
            Table ordertable = tableEnv.from("kafkaInputTable");
            tableEnv.toAppendStream(ordertable, Row.class).print("Table API>>>");
    
            // 3.2 编写逻辑提取数据
            Table tableResult = ordertable
                    .select($("id"), $("timestamp"), $("money"), $("category"))
                    .filter($("category").isEqual("电脑"));
    
            // 3.3 将表数据转化为流数据打印
            tableEnv.toAppendStream(tableResult, Row.class).printToErr("API 抽取的数据>>>");
    
            // Todo 4) 将抽取的数据写入到 kafka 中
            tableEnv.connect(new Kafka()
                    .version("universal") //指定版本
                    .topic("orderResult")//定义主题
                    .property("bootstrap.servers", "node1:9092")
            ).withFormat(new Csv())
                    .withSchema(new Schema()
                            .field("id", DataTypes.STRING())
                            .field("timestamp", DataTypes.STRING())
                            .field("money", DataTypes.DOUBLE())
                            .field("category", DataTypes.STRING())
                    ).createTemporaryTable("kafkaOutputTable");
    
            //todo 5)将查询结果输出到kafka中
            tableResult.executeInsert("kafkaOutputTable");
    
            //todo 6) 注意:将数据写入到kafka的时候,可以不加execute代码
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    结果:kafka 的 orderResult 这个主题中保存了数据!

    总结:

    • 结果表,调用 executeInsert 写入到 sink 表中!
    • 在读取数据源的时候,添加了一个字段 pt,调用 proctime 方法,作为处理时间!

    4. 写入到 MySQL

    样本数据:

    {"id":1,"timestamp":"2020-05-08T01:03.00Z","category":"电脑","areaName":"石家庄","money":"1450"}
    {"id":2,"timestamp":"2020-05-08T01:01.00Z","category":"手机","areaName":"北京","money":"1450"}
    {"id":3,"timestamp":"2020-05-08T01:03.00Z","category":"手机","areaName":"北京","money":"8412"}
    {"id":4,"timestamp":"2020-05-08T05:01.00Z","category":"电脑","areaName":"上海","money":"1513"}
    {"id":5,"timestamp":"2020-05-08T01:03.00Z","category":"家电","areaName":"北京","money":"1550"}
    {"id":6,"timestamp":"2020-05-08T01:01.00Z","category":"电脑","areaName":"深圳","money":"1550"}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    mysql 数据表:

    DROP TABLE IF EXISTS `order_test`;
    CREATE TABLE `order_test` (
      `id` varchar(255) NOT NULL,
      `timestamp` varchar(255) DEFAULT NULL,
      `category` varchar(255) DEFAULT NULL,
      `areaName` varchar(255) DEFAULT NULL,
      `money` double DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    案例演示:

    package cn.itcast.day01.sink;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    /**
     * @author lql
     * @time 2024-03-12 20:35:31
     * @description TODO
     */
    public class MySQLSinkTest {
        public static void main(String[] args) throws Exception {
            // Todo 1) 配置 flink SQL 环境
            // 1. 配置流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 2. 配置 flink settings 环境
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build();
    
            // 3. 配置表环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
    
            // Todo 2) 配置 kafka 数据源,使用建表的方式,注意指定数据源是 json
            String sourceTable = "CREATE TABLE KafkaInputTable (\n" +
                    "  `id` varchar,\n" +
                    "  `timestamp` varchar,\n" +
                    "  `category` varchar,\n" +
                    "  `areaName` varchar,\n" +
                    "  `money` double\n" +
                    ") WITH (\n" +
                    "  'connector' = 'kafka',\n" +
                    "  'topic' = 'order',\n" +
                    "  'properties.bootstrap.servers' = 'node1:9092',\n" +
                    "  'properties.group.id' = 'testGroup',\n" +
                    "  'scan.startup.mode' = 'earliest-offset',\n" +
                    "  'format' = 'json'\n" +
                    ")";
    
            // 执行建立数据源表语句
            tableEnv.executeSql(sourceTable);
    
            // Todo 3) 表的查询
            Table orderTable = tableEnv.from("KafkaInputTable");
    
            // 将表数据转化为 datastream,并打印出来
            tableEnv.toAppendStream(orderTable, Row.class).print("SQL>>>");
    
            // Todo 4) 将结果数据写入到 mysql中
            String sinkTable = "CREATE TABLE order_test (\n" +
                    "  `id` varchar,\n" +
                    "  `timestamp` varchar,\n" +
                    "  `category` varchar,\n" +
                    "  `areaName` varchar,\n" +
                    "  `money` double\n" +
                    ") WITH (\n" +
                    "   'connector' = 'jdbc',\n" +
                    "   'url' = 'jdbc:mysql://node1:3306/test?characterEncoding=utf-8&useSSL=false',\n" +
                    "   'table-name' = 'order_test'," +
                    "   'driver'='com.mysql.jdbc.Driver'," +
                    "   'username' = 'root'," +
                    "   'password' = '123456'," +
                    "   'sink.buffer-flush.interval'='1s'," +
                    "   'sink.buffer-flush.max-rows'='1'," +
                    "   'sink.max-retries' = '5'" +
                    ")";
    
            // 执行建表数据
            tableEnv.executeSql(sinkTable);
    
            // 插入语句逻辑
            //定义sql语句
            String insert = "INSERT INTO order_test SELECT * FROM KafkaInputTable";
    
            //todo 5)将源表的数据写入到目标表中
            tableEnv.executeSql(insert);
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85

    结果:json 数据源源不断,解析到 mysql 里面

    总结:

    • kafka 作为数据源,mysql 作为 sink,都可以用建表的方式解决!
    • 原理就是查询一个表,插入到另一个表中!

  • 相关阅读:
    开放平台架构指南
    C#判断字符串的显示宽度
    教你在批量将视频逆时针旋转90度的同时添加马赛克
    产品外观设计公司怎么选?这篇文章告诉你
    LlamaIndex使用指南
    C++交换a和b的方法
    项目时间管理-架构真题(二十四)
    零基础html学习/刷题-第二期
    直播回顾 | 论道原生:云原生大数据建设实践
    go语言面试(第一轮)请你说说 TCP 和 UDP 的区别
  • 原文地址:https://blog.csdn.net/m0_60732994/article/details/136718118