• Flink SQl 语法(hint,with,select,分组窗口聚合,时间属性(处理,事件))


    1、查询语句

    1、hint

    在对表进行查询的是偶动态修改表的属性

    1. -- 创建表
    2. CREATE TABLE word (
    3. lines STRING
    4. )
    5. WITH (
    6. 'connector' = 'kafka',
    7. 'topic' = 'word',
    8. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    9. 'properties.group.id' = 'testGroup',
    10. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
    11. 'format' = 'csv',
    12. 'csv.field-delimiter'='\t'
    13. )
    14. -- 加载hive函数
    15. LOAD MODULE hive WITH ('hive-version' = '1.2.1');
    16. --统计单词的数量
    17. --不动态指定开始读取的参数
    18. select word,count(1) from
    19. word,
    20. lateral table(explode(split(lines,','))) as t(word)
    21. group by word
    22. -- OPTIONS 动态指定参数
    23. select word,count(1) from
    24. word /*+ OPTIONS('scan.startup.mode'='latest-offset') */ ,
    25. lateral table(explode(split(lines,','))) as t(word)
    26. group by word

    3、WITH
    1. -- temp可以在后面的sql中使用多次
    2. with temp as (
    3. select word from word,
    4. lateral table(explode(split(lines,','))) as t(word)
    5. )
    6. select * from temp
    7. union all
    8. select * from temp

    4、SELECT
    1. SELECT order_id, price
    2. FROM
    3. (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)

    5、分组窗口聚合

    老版本语法,新版本中不推荐使用

    1. -- PROCTIME(): 获取处理时间的函数
    2. CREATE TABLE words_window (
    3. lines STRING,
    4. proc_time as PROCTIME()
    5. ) WITH (
    6. 'connector' = 'kafka',
    7. 'topic' = 'words',
    8. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    9. 'properties.group.id' = 'testGroup',
    10. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
    11. 'format' = 'csv',
    12. 'csv.field-delimiter'='\t'
    13. )
    14. -- TUMBLE:滚动窗口
    15. -- HOP": 滑动黄口
    16. -- SESSION: 会话窗口
    17. --TUMBLE:处理时间的滑动窗口
    18. select
    19. word,
    20. TUMBLE_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间
    21. TUMBLE_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口开始使时间
    22. count(1) as c
    23. from
    24. words_window,
    25. lateral table(explode(split(lines,','))) as t(word)
    26. group by
    27. word,
    28. TUMBLE(proc_time, INTERVAL '5' SECOND) -- 每5秒计算一次

    • 会话窗口

      一段时间没有数据开始计算

      暂时只能在老板本api中使用

    1. CREATE TABLE words_window (
    2. lines STRING,
    3. proc_time as PROCTIME()
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'words',
    7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    8. 'properties.group.id' = 'testGroup',
    9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
    10. 'format' = 'csv',
    11. 'csv.field-delimiter'='\t'
    12. )
    13. select
    14. word,
    15. SESSION_START(proc_time, INTERVAL '5' SECOND) as s, -- 窗口开始时间
    16. SESSION_END(proc_time, INTERVAL '5' SECOND) as e, -- 窗口结束使时间
    17. count(1) as c
    18. from
    19. words_window,
    20. lateral table(explode(split(lines,','))) as t(word)
    21. group by
    22. word,
    23. SESSION(proc_time, INTERVAL '5' SECOND) -- 会话超过5秒中没有发送消息,就开始进行计算

    6、TVFs(重点)
    • 滚动窗口函数
    1. CREATE TABLE words_window (
    2. lines STRING,
    3. proc_time as PROCTIME()
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'words',
    7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    8. 'properties.group.id' = 'testGroup',
    9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
    10. 'format' = 'csv',
    11. 'csv.field-delimiter'='\t'
    12. )
    13. -- TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
    14. -- TUMBLE: 窗口函数,可以给原表增加床i偶开始时间,窗口的结束时间,窗口时间
    15. -- TABLE words_window : 指定原表
    16. -- DESCRIPTOR(proc_time) 指定时间字段,可以处理时间,也可以是事件时间
    17. -- INTERVAL '5' SECOND 指定窗口大小
    18. SELECT lines,proc_time,window_start,window_end,window_time FROM TABLE(
    19. TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
    20. );
    21. -- 在划分和窗口之后进行聚合计算
    22. SELECT word,window_start,count(1) as c FROM
    23. TABLE(
    24. TUMBLE(TABLE words_window, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
    25. ),
    26. lateral table(explode(split(lines,','))) as t(word)
    27. group by word,window_start
    • 滑动窗口函数

      一条数据会出现在多个窗口中,所以输入一条数据,会输出多条数据

    1. CREATE TABLE words_window (
    2. lines STRING,
    3. proc_time as PROCTIME()
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'words',
    7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    8. 'properties.group.id' = 'testGroup',
    9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
    10. 'format' = 'csv',
    11. 'csv.field-delimiter'='\t'
    12. )
    13. -- HOP: 滑动窗口函数,需要指定窗口大小和滑动时间
    14. -- 输入一条数据会输出多条数据
    15. with temp as (
    16. select * from words_window /*+ OPTIONS('scan.startup.mode'='latest-offset') */
    17. )
    18. SELECT * FROM
    19. TABLE(
    20. HOP(TABLE temp , DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND)
    21. )
    22. ;
    23. -- 窗口止呕进行聚合
    24. with temp as (
    25. select * from words_window /*+ OPTIONS('scan.startup.mode'='latest-offset') */
    26. )
    27. SELECT word ,window_start,count(1) as c FROM
    28. TABLE(
    29. HOP(TABLE temp, DESCRIPTOR(proc_time), INTERVAL '5' SECOND, INTERVAL '15' SECOND)),
    30. lateral table(explode(split(lines,','))) as t(word)
    31. group by word,window_start
    32. ;

    7、时间属性

    1、处理时间

    使用PROCTIME()函数给表增加一个时间字段

    1. CREATE TABLE student_kafka_proc_time (
    2. id STRING,
    3. name STRING,
    4. age INT,
    5. gender STRING,
    6. clazz STRING,
    7. proc as PROCTIME() -- 处理时间字段
    8. ) WITH (
    9. 'connector' = 'kafka',
    10. 'topic' = 'student',
    11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    12. 'properties.group.id' = 'testGroup',
    13. 'scan.startup.mode' = 'earliest-offset',
    14. 'format' = 'csv',
    15. 'csv.field-delimiter'=',', -- csv格式数据的分隔符
    16. 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
    17. 'csv.allow-comments'='true'--跳过#注释行
    18. )
    19. -- 使用处理时间可以做窗口统计
    20. SELECT clazz,window_start,count(1) as c FROM
    21. TABLE(
    22. TUMBLE(TABLE student_kafka_proc_time, DESCRIPTOR(proc), INTERVAL '5' SECOND)
    23. )
    24. group by clazz,window_start

    2、事件时间

    • 测试数据

      1. 1500100001,施笑槐,22,,文科六班,2022-07-20 16:44:10
      2. 1500100001,施笑槐,22,,文科六班,2022-07-20 16:44:11
      3. 1500100001,施笑槐,22,,文科六班,2022-07-20 16:44:12
      4. 1500100001,施笑槐,22,,文科六班,2022-07-20 16:44:20
      5. 1500100001,施笑槐,22,,文科六班,2022-07-20 16:44:15
      6. 1500100001,施笑槐,22,,文科六班,2022-07-20 16:44:25
    • 创建表指定时间字段和水位线

      1. -- TIMESTAMP(3) flink的时间戳类型
      2. -- ts - INTERVAL '5' SECOND 水位线前移5秒
      3. CREATE TABLE student_kafka_event_time (
      4. id STRING,
      5. name STRING,
      6. age INT,
      7. gender STRING,
      8. clazz STRING,
      9. ts TIMESTAMP(3),
      10. WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 指定时间字段和水位线
      11. ) WITH (
      12. 'connector' = 'kafka',
      13. 'topic' = 'student_event_time',
      14. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
      15. 'properties.group.id' = 'testGroup',
      16. 'scan.startup.mode' = 'earliest-offset',
      17. 'format' = 'csv'
      18. )
      19. -- 使用事件时间 做窗口函数统计
      20. -- 每一条数据都会计算出一个结果,会取更新之前已经输出的结果
      21. -- 不存在数据丢失问题
      22. -- 需要将统计结果保存在状态中
      23. SELECT clazz,window_start,count(1) as c FROM
      24. TABLE(
      25. TUMBLE(TABLE student_kafka_event_time, DESCRIPTOR(ts), INTERVAL '5' SECOND)
      26. )
      27. group by clazz,window_start
      28. -- 分钟窗口统计
      29. -- 如果数据乱序可能会丢失数据
      30. -- 不需要将统计的结果保存在状态中
      31. select
      32. clazz,
      33. TUMBLE_START(ts, INTERVAL '5' SECOND) as s, -- 窗口开始时间
      34. TUMBLE_END(ts, INTERVAL '5' SECOND) as e, -- 窗口开始使时间
      35. count(1) as c
      36. from
      37. student_kafka_event_time
      38. group by
      39. clazz,
      40. TUMBLE(ts, INTERVAL '5' SECOND) -- 没4秒计算一次
      41. -- 生产数据
      42. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_event_time

    练习

    统计单词的数量,
    每隔5秒统计一次
    每个窗口中取单词数量最多个两个单词

    1. CREATE TABLE words_window_demo (
    2. lines STRING,
    3. proc_time as PROCTIME()
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'words',
    7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    8. 'properties.group.id' = 'testGroup',
    9. 'scan.startup.mode' = 'earliest-offset',-- 读取所有的数据
    10. 'format' = 'csv',
    11. 'csv.field-delimiter'='\t'
    12. )
    13. -- 在夫林卡 sql 流处理中row_number()必须要取topN
    14. select * from (
    15. select
    16. word,
    17. window_start,
    18. c,
    19. row_number() over(partition by window_start order by c desc) as r
    20. from (
    21. select word,window_start,count(1) as c from
    22. TABLE(
    23. TUMBLE(TABLE words_window_demo, DESCRIPTOR(proc_time), INTERVAL '5' SECOND)
    24. ),
    25. lateral table(explode(split(lines,','))) as t(word)
    26. group by word,window_start
    27. ) as a
    28. ) as b
    29. where r <= 2
    • 统计每个城市中每个区县的车流量
    • 每隔5分钟统计一次,统计最近15分钟的数据
    • 每个城市中取车流量最大的前2个区县
    • 将统计好的结果保存到数据库中
    1. -- 数据
    2. {
    3. "car": "皖AK0H90",
    4. "city_code": "340100",
    5. "county_code": "340111",
    6. "card": 117303031813010,
    7. "camera_id": "00004",
    8. "orientation": "北",
    9. "road_id": 34130440,
    10. "time": 1614799929,
    11. "speed": 84.51
    12. }
    13. -- TIMESTAMP(3) flink的时间戳类型
    14. -- ts - INTERVAL '5' SECOND 水位线前移5秒
    15. -- 创建表读取kafka中的json数据
    16. CREATE TABLE cars_kafka_event_time (
    17. car STRING,
    18. city_code STRING,
    19. county_code STRING,
    20. card BIGINT,
    21. camera_id STRING,
    22. orientation STRING,
    23. road_id BIGINT,
    24. `time` BIGINT,
    25. speed DOUBLE,
    26. ts_ltz AS TO_TIMESTAMP_LTZ(`time`, 3),
    27. WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 指定时间字段和水位线
    28. ) WITH (
    29. 'connector' = 'kafka',
    30. 'topic' = 'car_test',
    31. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    32. 'properties.group.id' = 'carGroup',
    33. 'scan.startup.mode' = 'earliest-offset',
    34. 'format' = 'json'
    35. )
    36. -- 测试一下是否存在数据
    37. select * from cars_kafka_event_time
    38. -- 统计每个城市中每个区县的车流量,每隔5分钟统计一次,统计最近15分钟的数据,每个城市中取车流量最大的前2个区县
    39. select *
    40. from (
    41. select
    42. county_code
    43. ,city_code
    44. ,window_start
    45. , c
    46. ,row_number() over(partition by window_start order by c desc) as r
    47. from
    48. (
    49. with temp as (
    50. select * from cars_kafka_event_time /*+ OPTIONS('scan.startup.mode'='latest-offset') */
    51. )
    52. SELECT
    53. county_code
    54. ,city_code
    55. ,window_start
    56. ,count(1) as c
    57. FROM
    58. TABLE(
    59. HOP(TABLE temp, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '15' SECOND))
    60. group by county_code,city_code,window_start
    61. ) as b ) as h
    62. where r <= 2;
    63. -- 创建mysql的sink表
    64. CREATE TABLE clazz_num_mysql (
    65. country_city_r_count STRING,
    66. window_start STRING,
    67. PRIMARY KEY (country_city_r_count) NOT ENFORCED -- 按照主键更新数据
    68. ) WITH (
    69. 'connector' = 'jdbc',
    70. 'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8',
    71. 'table-name' = 'city_top_2', -- 需要手动到数据库中创建表
    72. 'username' = 'root',
    73. 'password' = '123456'
    74. );
    75. -- 发送到mysql中
    76. insert into clazz_num_mysql
    77. select concat_ws('_',county_code,city_code,r,c) country_city_r_count ,window_start
    78. from (
    79. select
    80. cast(county_code as STRING) county_code
    81. ,cast(city_code as STRING) city_code
    82. ,cast(window_start as STRING) window_start
    83. ,cast(c as STRING) c
    84. ,cast(row_number() over(partition by window_start order by c desc) as STRING) as r
    85. from
    86. (
    87. with temp as (
    88. select * from cars_kafka_event_time
    89. )
    90. SELECT
    91. county_code
    92. ,city_code
    93. ,window_start
    94. ,count(1) as c
    95. FROM
    96. TABLE(
    97. HOP(TABLE temp, DESCRIPTOR(ts_ltz), INTERVAL '5' SECOND, INTERVAL '15' SECOND))
    98. group by county_code,city_code,window_start
    99. ) as b ) as h
    100. where r <= 2;
    101. -- mysql 中的查询方法如下(笨方法)
    102. select SUBSTRING_INDEX(country_city_r_count,'_',1) as country ,SUBSTRING_INDEX(SUBSTRING_INDEX(country_city_r_count,'_',2),'_',1)as city,SUBSTRING_INDEX(SUBSTRING_INDEX(country_city_r_count,'_',3) ,'_',-1) as topn , SUBSTRING_INDEX(country_city_r_count,'_',-1) as count_car ,window_start from city_top_2

  • 相关阅读:
    TypeScript和JavaScript有什么不同?
    报表生成器FastReport .Net用户指南:关于脚本(上)
    LeetCode 面试题 16.05. 阶乘尾数
    python通用日志使用
    高项 10 项目沟通和干系人管理
    【无标题】
    第1章 初识MyBatis框架
    数据结构之常见排序算法
    Java基础面试题突击系列6
    【机器学习300问】26、什么是SVM支持向量机?
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/126091927