• 2.2 如何使用FlinkSQL读取&写入到文件系统(HDFS\Local\Hive)


    目录

    1、文件系统 SQL 连接器

    2、如何指定文件系统类型

    3、如何指定文件格式

    4、读取文件系统

    4.1 开启 目录监控 

    4.2 可用的 Metadata

    5、写出文件系统

    5.1 创建分区表

    5.2 滚动策略、文件合并、分区提交

    5.3 指定 Sink Parallelism

    6、示例_通过FlinkSQL读取kafka在写入hive表

    6.1、创建 kafka source表用于读取kafka

    6.2、创建 hdfs sink表用于写出到hdfs

    6.3、insert into 写入到 hdfs_sink_table

    6.4、查询 hdfs_sink_table

    6.5、创建hive表,指定local


    1、文件系统 SQL 连接器

    文件系统连接器允许从本地分布式文件系统进行读写数据

    官网链接:文件系统 SQL 连接器


    2、如何指定文件系统类型

    创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型

    参考官网:文件系统类型

    1. CREATE TABLE filesystem_table (
    2. id INT,
    3. name STRING,
    4. ds STRING
    5. ) partitioned by (ds) WITH (
    6. 'connector' = 'filesystem',
    7. -- 本地文件系统
    8. 'path' = 'file:///URI',
    9. -- HDFS文件系统
    10. 'path' = 'hdfs://URI',
    11. -- 阿里云对象存储
    12. 'path' = 'oss://URI',
    13. 'format' = 'json'
    14. );

    3、如何指定文件格式

    FlinkSQL 文件系统连接器支持多种format,来读取和写入文件

    比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型

    来对数据进行解析后映射到表中的字段中

    1. CREATE TABLE filesystem_table_file_format (
    2. id INT,
    3. name STRING,
    4. ds STRING
    5. ) partitioned by (ds) WITH (
    6. 'connector' = 'filesystem',
    7. -- 指定文件格式类型
    8. 'format' = 'json|csv|orc|raw'
    9. );

    4、读取文件系统

    FlinkSQL可以将单个文件或整个目录的数据读取到单个表中

    注意:

            1、当读取目录时,对目录中的文件进行 无序的读取

            2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止

                 当开启目录监控(source.monitor-interval)时,才是流处理模式

    4.1 开启 目录监控 

    通过设置 source.monitor-interval 属性来开启目录监控,以便在新文件出现时继续扫描

    注意:

            只会对指定目录内新增文件进行读取,不会读取更新后的旧文件

    1. -- 目录监控
    2. drop table filesystem_source_table;
    3. CREATE TABLE filesystem_source_table (
    4. id INT,
    5. name STRING,
    6. `file.name` STRING NOT NULL METADATA
    7. ) WITH (
    8. 'connector' = 'filesystem',
    9. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
    10. 'format' = 'json',
    11. 'source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
    12. );
    13. -- 持续读取
    14. select * from filesystem_source_table;

    4.2 可用的 Metadata

    使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取

    注意: 所有 metadata 都是只读的

    1. -- 可用的Metadata
    2. drop table filesystem_source_table_read_metadata;
    3. CREATE TABLE filesystem_source_table_read_metadata (
    4. id INT,
    5. name STRING,
    6. `file.path` STRING NOT NULL METADATA,
    7. `file.name` STRING NOT NULL METADATA,
    8. `file.size` BIGINT NOT NULL METADATA,
    9. `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
    10. ) WITH (
    11. 'connector' = 'filesystem',
    12. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
    13. 'format' = 'json'
    14. );
    15. select * from filesystem_source_table_read_metadata;

    运行结果:


    5、写出文件系统

    5.1 创建分区表

    FlinkSQL支持创建分区表,并且通过 insert into(追加) insert overwrite(覆盖) 写入数据

    1. -- 创建分区表
    2. drop table filesystem_source_table_partition;
    3. CREATE TABLE filesystem_source_table_partition (
    4. id INT,
    5. name STRING,
    6. ds STRING
    7. ) partitioned by (ds) WITH (
    8. 'connector' = 'filesystem',
    9. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
    10. 'partition.default-name' = 'default_partition',
    11. 'format' = 'json'
    12. );
    13. -- 动态分区写入
    14. insert into filesystem_source_table_partition
    15. SELECT * FROM (VALUES
    16. (1,'a','20231010')
    17. , (2,'b','20231010')
    18. , (3,'c','20231011')
    19. , (4,'d','20231011')
    20. , (5,'e','20231012')
    21. , (6,'f','20231012')
    22. ) AS user1 (id,name,ds);
    23. -- 静态分区写入
    24. insert into filesystem_source_table_partition partition(ds = '20231010')
    25. SELECT * FROM (VALUES
    26. (1,'a')
    27. , (2,'b')
    28. , (3,'c')
    29. , (4,'d')
    30. , (5,'e')
    31. , (6,'f')
    32. ) AS user1 (id,name);
    33. -- 查询分区表数据
    34. select * from filesystem_source_table_partition where ds = '20231010';

    5.2 滚动策略、文件合并、分区提交

    可以看之前的博客:flink写入文件时分桶策略

    官网链接:官网分桶策略


    5.3 指定 Sink Parallelism

    当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度

    注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常

    1. CREATE TABLE hdfs_sink_table (
    2. `log` STRING,
    3. `dt` STRING, -- 分区字段,天
    4. `hour` STRING -- 分区字段,小时
    5. ) partitioned by (dt,`hour`) WITH (
    6. 'connector' = 'filesystem',
    7. 'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
    8. 'sink.parallelism' = '2', -- 指定sink算子并行度
    9. 'format' = 'raw'
    10. );

    6、示例_通过FlinkSQL读取kafka在写入hive表

    需求:

            使用FlinkSQL将kafka数据写入到hdfs指定目录中

            根据kafka的timestamp进行分区(按小时分区)

    6.1、创建 kafka source表用于读取kafka

    1. -- TODO 创建读取kafka表时,同时读取kafka元数据字段
    2. drop table kafka_source_table;
    3. CREATE TABLE kafka_source_table(
    4. `log` STRING,
    5. `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
    6. ) WITH (
    7. 'connector' = 'kafka',
    8. 'topic' = '20231017',
    9. 'properties.bootstrap.servers' = 'worker01:9092',
    10. 'properties.group.id' = 'FlinkConsumer',
    11. 'scan.startup.mode' = 'earliest-offset',
    12. 'format' = 'raw'
    13. );

    6.2、创建 hdfs sink表用于写出到hdfs

    1. drop table hdfs_sink_table;
    2. CREATE TABLE hdfs_sink_table (
    3. `log` STRING,
    4. `dt` STRING, -- 分区字段,天
    5. `hour` STRING -- 分区字段,小时
    6. ) partitioned by (dt,`hour`) WITH (
    7. 'connector' = 'filesystem',
    8. 'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
    9. 'sink.parallelism' = '2', -- 指定sink算子并行度
    10. 'format' = 'raw'
    11. );

    6.3、insert into 写入到 hdfs_sink_table

    1. -- 流式 sql,插入文件系统表
    2. insert into hdfs_sink_table
    3. select
    4. log
    5. ,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
    6. ,DATE_FORMAT(`timestamp`,'HH') as `hour`
    7. from kafka_source_table;

    6.4、查询 hdfs_sink_table

    1. -- 批式 sql,使用分区修剪进行选择
    2. select * from hdfs_sink_table;

    6.5、创建hive表,指定local

    1. create table `kafka_to_hive` (
    2. `log` string comment '日志数据'
    3. comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string)
    4. row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
    5. LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';

  • 相关阅读:
    一文带你深入理解【Java基础】· 泛型
    [onnxrumtime]onnxruntime和cuda对应关系表
    YOLOv6、PP-YOLOE、PicoDet选择TOOD
    生存分析的图你也要拼接 图形拼接r 不同的图形组合在一起
    第28期 | GPTSecurity周报
    若依前后端分离版开源项目学习
    南瓜科学新品上线 开辟益智玩具新世界
    12108 - Extraordinarily Tired Students (UVA)
    内核中自旋锁的使用
    力扣每日一题:805. 数组的均值分割【折半查找+二进制枚举】
  • 原文地址:https://blog.csdn.net/weixin_42845827/article/details/133862535