• Flink SQL -- 命令行的使用


    1、启动Flink SQL
    1. 首先启动Flink的集群,选择独立集群模式或者是session的模式。此处选择是时session的模式:
    2. yarn-session.sh -d
    3. 在启动Flink SQL的client:
    4. sql-client.sh
    2、kafka SQL 连接器
    1. 在使用kafka作为数据源的时候需要上传jar包到flnik的lib下:
    2. /usr/local/soft/flink-1.15.2/lib
    3. 可以去官网找对应的版本下载上传。

     

    1. 1、创建表:
    2. 再流上定义表
    3. 再flink中创建表相当于创建一个视图(视图中不存数据,只有查询视图时才会去原表中读取数据)
    4. CREATE TABLE students (
    5. sid STRING,
    6. name STRING,
    7. age INT,
    8. sex STRING,
    9. clazz STRING
    10. ) WITH (
    11. 'connector' = 'kafka',
    12. 'topic' = 'student',
    13. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    14. 'properties.group.id' = 'testGroup',
    15. 'scan.startup.mode' = 'earliest-offset',
    16. 'format' = 'csv'
    17. )
    18. 2、查询数据(连续查询):
    19. select clazz,count(1) as c from students group by clazz;
    3、客户端为维护和可视化结果提供了三种的模式:

            1、表格模式(默认使用的模式),(table mode),在内存中实体化结果,并将结果用规则的分页表格可视化展示出来

    SET 'sql-client.execution.result-mode' = 'table';

            2、变更日志模式,(changelog mode),不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

    SET 'sql-client.execution.result-mode' = 'changelog';

            3、Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

    SET 'sql-client.execution.result-mode' = 'tableau';

    4、 Flink SQL流批一体:
            1、流处理:

                    a、流处理即可以处理有界流也可以处理无界流

                    b、流处理的输出的结果是连续的结果

                    c、流处理的底层是持续流的模型,上游的Task和下游的Task同时启动等待数据的到达

    SET 'execution.runtime-mode' = 'streaming'; 
            2、批处理:

                    a、批处理只能用于处理有界流

                    b、输出的是最终的结果

                    c、批处理的底层是MapReduce模型,会先执行上游的Task,在执行下游的Task 

    SET 'execution.runtime-mode' = 'batch';
    1. Flink做批处理,读取一个文件:
    2. -- 创建一个有界流的表
    3. CREATE TABLE students_hdfs (
    4. sid STRING,
    5. name STRING,
    6. age INT,
    7. sex STRING,
    8. clazz STRING
    9. )WITH (
    10. 'connector' = 'filesystem', -- 必选:指定连接器类型
    11. 'path' = 'hdfs://master:9000/data/spark/stu/students.txt', -- 必选:指定路径
    12. 'format' = 'csv' -- 必选:文件系统连接器指定 format
    13. );
    14. select clazz,count(1) as c from
    15. students_hdfs
    16. group by clazz
    5、Flink SQL的连接器:
            1、kafka SQL 连接器

    对于一些参数需要从官网进行了解。

                    1、kafka source 

    1. -- 创建kafka 表
    2. CREATE TABLE students_kafka (
    3. `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    4. `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    5. sid STRING,
    6. name STRING,
    7. age INT,
    8. sex STRING,
    9. clazz STRING
    10. ) WITH (
    11. 'connector' = 'kafka',
    12. 'topic' = 'students', -- 数据的topic
    13. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    14. 'properties.group.id' = 'testGroup', -- 消费者组
    15. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
    16. 'format' = 'csv' -- 读取数据的格式
    17. );

                    2、kafka sink 

    1. -- 创建kafka 表
    2. CREATE TABLE students_kafka_sink (
    3. sid STRING,
    4. name STRING,
    5. age INT,
    6. sex STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'kafka',
    10. 'topic' = 'students_sink', -- 数据的topic
    11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    12. 'properties.group.id' = 'testGroup', -- 消费者组
    13. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
    14. 'format' = 'csv' -- 读取数据的格式
    15. );
    16. -- 将查询结果保存到kafka中
    17. insert into students_kafka_sink
    18. select * from students_hdfs;
    19. kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink

            3、将更新的流写入到kafka中 

    因为在Kafka是一个消息队列,是不会去重的。所以只需要将读取数据的格式改成canal-json。当数据被读取回来还是原来的流模式。

    1. CREATE TABLE clazz_num_kafka (
    2. clazz STRING,
    3. num BIGINT
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'clazz_num', -- 数据的topic
    7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    8. 'properties.group.id' = 'testGroup', -- 消费者组
    9. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
    10. 'format' = 'canal-json' -- 读取数据的格式
    11. );
    12. -- 将更新的数据写入kafka需要使用canal-json格式,数据中会带上操作类型
    13. {"data":[{"clazz":"文科一班","num":71}],"type":"INSERT"}
    14. {"data":[{"clazz":"理科三班","num":67}],"type":"DELETE"}
    15. insert into clazz_num_kafka
    16. select clazz,count(1) as num from
    17. students
    18. group by clazz;
    19. kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
            2、 hdfs SQL 连接器

                    1、hdfs source

                            Flink读取文件可以使用有界流的方式,也可以是无界流方式。

    1. -- 有界流
    2. CREATE TABLE students_hdfs_batch (
    3. sid STRING,
    4. name STRING,
    5. age INT,
    6. sex STRING,
    7. clazz STRING
    8. )WITH (
    9. 'connector' = 'filesystem', -- 必选:指定连接器类型
    10. 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
    11. 'format' = 'csv' -- 必选:文件系统连接器指定 format
    12. );
    13. select * from students_hdfs_batch;
    14. -- 无界流
    15. -- 基于hdfs做流处理,读取数据是以文件为单位,延迟比kafka大
    16. CREATE TABLE students_hdfs_stream (
    17. sid STRING,
    18. name STRING,
    19. age INT,
    20. sex STRING,
    21. clazz STRING
    22. )WITH (
    23. 'connector' = 'filesystem', -- 必选:指定连接器类型
    24. 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
    25. 'format' = 'csv' , -- 必选:文件系统连接器指定 format
    26. 'source.monitor-interval' = '5000' -- 每隔一段时间扫描目录,生成一个无界流
    27. );
    28. select * from students_hdfs_stream;

                    2、hdfs sink

    1. -- 1、批处理模式(使用方式和底层原理和hive类似)
    2. SET 'execution.runtime-mode' = 'batch';
    3. -- 创建表
    4. CREATE TABLE clazz_num_hdfs (
    5. clazz STRING,
    6. num BIGINT
    7. )WITH (
    8. 'connector' = 'filesystem', -- 必选:指定连接器类型
    9. 'path' = 'hdfs://master:9000/data/clazz_num', -- 必选:指定路径
    10. 'format' = 'csv' -- 必选:文件系统连接器指定 format
    11. );
    12. -- 将查询结果保存到表中
    13. insert into clazz_num_hdfs
    14. select clazz,count(1) as num
    15. from students_hdfs_batch
    16. group by clazz;
    17. -- 2、流处理模式
    18. SET 'execution.runtime-mode' = 'streaming';
    19. -- 创建表,如果查询数据返回的十更新更改的流需要使用canal-json格式
    20. CREATE TABLE clazz_num_hdfs_canal_json (
    21. clazz STRING,
    22. num BIGINT
    23. )WITH (
    24. 'connector' = 'filesystem', -- 必选:指定连接器类型
    25. 'path' = 'hdfs://master:9000/data/clazz_num_canal_json', -- 必选:指定路径
    26. 'format' = 'canal-json' -- 必选:文件系统连接器指定 format
    27. );
    28. insert into clazz_num_hdfs_canal_json
    29. select clazz,count(1) as num
    30. from students_hdfs_stream
    31. group by clazz;
    3、MySQL SQL 连接器

            1、整合:

    1. # 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib
    2. flink-connector-jdbc-1.15.2.jar
    3. mysql-connector-java-5.1.49.jar
    4. # 2、需要重启flink集群
    5. yarn application -kill [appid]
    6. yarn-session.sh -d
    7. # 3、重新进入sql命令行
    8. sql-client.sh

             2、mysql   source 

    1. -- 有界流
    2. -- flink中表的字段类型和字段名需要和mysql保持一致
    3. CREATE TABLE students_jdbc (
    4. id BIGINT,
    5. name STRING,
    6. age BIGINT,
    7. gender STRING,
    8. clazz STRING,
    9. PRIMARY KEY (id) NOT ENFORCED -- 主键
    10. ) WITH (
    11. 'connector' = 'jdbc',
    12. 'url' = 'jdbc:mysql://master:3306/student',
    13. 'table-name' = 'students',
    14. 'username' ='root',
    15. 'password' ='123456'
    16. );
    17. select * from students_jdbc;

            3、mysql sink 

    1. -- 创建kafka 表
    2. CREATE TABLE students_kafka (
    3. `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    4. `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    5. sid STRING,
    6. name STRING,
    7. age INT,
    8. sex STRING,
    9. clazz STRING
    10. ) WITH (
    11. 'connector' = 'kafka',
    12. 'topic' = 'students', -- 数据的topic
    13. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    14. 'properties.group.id' = 'testGroup', -- 消费者组
    15. 'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
    16. 'format' = 'csv' -- 读取数据的格式
    17. );
    18. -- 创建mysql sink表
    19. CREATE TABLE clazz_num_mysql (
    20. clazz STRING,
    21. num BIGINT,
    22. PRIMARY KEY (clazz) NOT ENFORCED -- 主键
    23. ) WITH (
    24. 'connector' = 'jdbc',
    25. 'url' = 'jdbc:mysql://master:3306/student',
    26. 'table-name' = 'clazz_num',
    27. 'username' ='root',
    28. 'password' ='123456'
    29. );
    30. --- 再mysql创建接收表
    31. CREATE TABLE clazz_num (
    32. clazz varchar(10),
    33. num BIGINT,
    34. PRIMARY KEY (clazz) -- 主键
    35. ) ;
    36. -- 将sql查询结果实时写入mysql
    37. -- 将更新更改的流写入mysql,flink会自动按照主键更新数据
    38. insert into clazz_num_mysql
    39. select
    40. clazz,
    41. count(1) as num from
    42. students_kafka
    43. group by clazz;
    44. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 插入一条数据
            4、DataGen:用于生成随机数据,一般用在高性能测试上
    1. -- 创建包(只能用于source表)
    2. CREATE TABLE students_datagen (
    3. sid STRING,
    4. name STRING,
    5. age INT,
    6. sex STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'datagen',
    10. 'rows-per-second'='5', -- 每秒随机生成的数据量
    11. 'fields.age.min'='1',
    12. 'fields.age.max'='100',
    13. 'fields.sid.length'='10',
    14. 'fields.name.length'='2',
    15. 'fields.sex.length'='1',
    16. 'fields.clazz.length'='4'
    17. );

            5、print:用于高性能测试 只能用于sink表
    1. CREATE TABLE print_table (
    2. sid STRING,
    3. name STRING,
    4. age INT,
    5. sex STRING,
    6. clazz STRING
    7. ) WITH (
    8. 'connector' = 'print'
    9. );
    10. insert into print_table
    11. select * from students_datagen;
    12. 结果需要在提交的任务中查看。
            6、BlackHole :是用于高性能测试使用,在后面可以用于Flink的反压的测试。
    1. CREATE TABLE blackhole_table (
    2. sid STRING,
    3. name STRING,
    4. age INT,
    5. sex STRING,
    6. clazz STRING
    7. ) WITH (
    8. 'connector' = 'blackhole'
    9. );
    10. insert into blackhole_table
    11. select * from students_datagen;
    6、SQL 语法
            1、Hints:

                   用于提示执行,在Flink中可以动态的修改表中的属性,在Spark中可以用于广播。在修改动态表中属性后,不需要在重新建表,就可以读取修改后的需求。

    1. CREATE TABLE students_kafka (
    2. `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    3. `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    4. sid STRING,
    5. name STRING,
    6. age INT,
    7. sex STRING,
    8. clazz STRING
    9. ) WITH (
    10. 'connector' = 'kafka',
    11. 'topic' = 'students', -- 数据的topic
    12. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
    13. 'properties.group.id' = 'testGroup', -- 消费者组
    14. 'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
    15. 'format' = 'csv' -- 读取数据的格式
    16. );
    17. -- 动态修改表属性,可以在查询数据时修改读取kafka数据的位置,不需要重新创建表
    18. select * from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;
    19. -- 有界流
    20. CREATE TABLE students_hdfs (
    21. sid STRING,
    22. name STRING,
    23. age INT,
    24. sex STRING,
    25. clazz STRING
    26. )WITH (
    27. 'connector' = 'filesystem', -- 必选:指定连接器类型
    28. 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
    29. 'format' = 'csv' -- 必选:文件系统连接器指定 format
    30. );
    31. -- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
    32. select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' ) */;
             2、WITH:

                    当一段SQL语句在被多次使用的时候,就将通过with给这个SQL起一个别名,类似于封装起来,就是为这个SQL创建一个临时的视图(并不是真正的视图),方便下次使用。

    1. CREATE TABLE students_hdfs (
    2. sid STRING,
    3. name STRING,
    4. age INT,
    5. sex STRING,
    6. clazz STRING
    7. )WITH (
    8. 'connector' = 'filesystem', -- 必选:指定连接器类型
    9. 'path' = 'hdfs://master:9000/data/student', -- 必选:指定路径
    10. 'format' = 'csv' -- 必选:文件系统连接器指定 format
    11. );
    12. -- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
    13. select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' ) */;
    14. -- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用
    15. with tmp as (
    16. select * from students_hdfs
    17. /*+ OPTIONS('source.monitor-interval' = '5000' ) */
    18. where clazz='文科一班'
    19. )
    20. select * from tmp
    21. union all
    22. select * from tmp;
            3、DISTINCT:

    在flink 的流处理中,使用distinct,flink需要将之前的数据保存在状态中,如果数据一直增加,状态会越来越大 状态越来越大,checkpoint时间会增加,最终会导致flink任务出问题

    1. select
    2. count(distinct sid)
    3. from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;
    4. select
    5. count(sid)
    6. from (
    7. select
    8. distinct *
    9. from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */
    10. );

    注意事项:

           1、 当Flink Client客户端退出来以后,里面创建的动态表就不存在了。这些表结构是元数据,是存储在内存中的。

            2、当在进行where过滤的时候,字符串会出现三种情况:空的字符串、空格字符串、null的字符串,三者是有区别的:

            这三者是不同的概念,在进行where过滤的时候过滤的条件是不同的。

    1. 1、过滤空的字符串:
    2. where s!= ‘空字符串’
    3. 2、过滤空格字符串:
    4. where s!= ‘空格’
    5. 3、过滤null字符串:
    6. where s!= null
    1. Flink SQL中常见的函数:
    2. from_unixtime:
    3. 以字符串格式 string 返回数字参数 numberic 的表示形式(默认为 ‘yyyy-MM-dd HH:mm:ss’
    4. to_timestamp:
    5. 将格式为 string2(默认为:‘yyyy-MM-dd HH:mm:ss’)的字符串 string1 转换为 timestamp

  • 相关阅读:
    Google Earth Engine —— 1986-2020年植被覆盖度一元线性回归分析(黄河流域上游为例)
    电脑蓝屏怎么办 七大原因及解决办法来帮你
    图片的懒加载
    el-tree 获取过滤后的树结构
    GaussDB数据库SQL系列:DROP & TRUNCATE & DELETE
    入门数据库Days5
    const char *p,char const *p和char *const p区别
    如何在Android项目中制作和使用三方包(jar文件)
    IceRPC之调用管道与传出请求->快乐的RPC
    从原理到应用,人人都懂的ChatGPT指南
  • 原文地址:https://blog.csdn.net/m0_62078954/article/details/134321431