• Flink SQL 常用作业sql


    flink sql常用配置

    设置输出结果格式
    SET sql-client.execution.result-mode=tableau;
    
    • 1
    • 2

    kafka source to mysql sink

    kafka 
    topic: bop_log_realtime
    数据结构:
    {"timestamp":"2023-10-31 14:26:02.528","serverip":"10.13.177.209","level":"INFO","servicename":"bop-fms-query-info","traceid":"","spanid":"","parent":"","message":"Resolving eureka endpoints via configuration"}
    
    mysql表:
    库名:flink_test
    CREATE TABLE `bop_log_realtime_warning` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `serverip` varchar(255) NOT NULL DEFAULT '',
      `timestamp` varchar(255) NOT NULL DEFAULT '',
      `level` varchar(255) NOT NULL DEFAULT '',
      `servicename` varchar(255) NOT NULL DEFAULT '',
      `traceid` varchar(255) NOT NULL DEFAULT '',
      `spanid` varchar(255) NOT NULL DEFAULT '',
      `parent` varchar(255) NOT NULL DEFAULT '',
      `message` varchar(255) NOT NULL DEFAULT '',
      `updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
    CREATE TABLE kafka_log_realtime_json (
     `serverip` STRING
      ,`timestamp` STRING
      ,`level` STRING
      ,`servicename` STRING
      ,`traceid` STRING
      ,`spanid` STRING
      ,`parent` STRING
      ,`message` STRING
    ) WITH (
     'connector' = 'kafka',
     'topic' = 'bop_log_realtime',
     'properties.bootstrap.servers' = '10.2.25.221:9092,10.2.25.221:9093',
     'properties.group.id' = 'testGroup2',
     'format' = 'json',
     'scan.startup.mode' = 'latest-offset'
    );
    
    CREATE TABLE bop_log_realtime_warning (
        `serverip` STRING
        ,`timestamp` STRING
        ,`level` STRING
        ,`servicename` STRING
        ,`traceid` STRING
        ,`spanid` STRING
        ,`parent` STRING
        ,`message` STRING
    ) WITH (
    'connector' = 'jdbc'
    ,'url' = 'jdbc:mysql://m3309i.hebe.grid.xx.com.cn:3309/flink_test?zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai'
    ,'username' = 'super_mis'
    ,'password' = 'mis_password'
    ,'table-name' = 'bop_log_realtime_warning'
    );
    
    insert into bop_log_realtime_warning 
    SELECT
        `serverip` 
        ,`timestamp` 
        ,`level` 
        ,`servicename` 
        ,`traceid` 
        ,`spanid` 
        ,`parent` 
        ,`message` 
     FROM kafka_log_realtime_json;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67

    窗口函数 开窗

    datagen 自动生成数据表

    CREATE TABLE ws (
      id INT,
      vc INT,
      pt AS PROCTIME(), --处理时间
      et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间
      WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.id.min' = '1',
      'fields.id.max' = '3',
      'fields.vc.min' = '1',
      'fields.vc.max' = '100'
    );
    
    CREATE TABLE sink (
      id INT,
      ts BIGINT,
      vc INT
    ) WITH (
      'connector' = 'print'
    );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    tumble 滚动窗口

    滚动窗口 窗口大小5select
      id,
      sum(vc) vcSum,
      window_start,
      window_end
     from table(
     	TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND)
     )
     group by id, window_start, window_end;
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    hop 滑动窗口

    滑动窗口 滑动步长5秒 窗口大小10秒
    注意:窗口大小=滑动步长的整数倍(底层会优化成多个小滚动窗口)
    select
      id,
      sum(vc) vcSum,
      window_start,
      window_end
     from table(
     	hop(table ws, descriptor(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND)
     )
     group by id, window_start, window_end;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    cumulate 累积窗口

    注意:窗口大小=累积步长的整数倍
    select
      id,
      sum(vc) vcSum,
      window_start,
      window_end
     from table(
     	CUMULATE(table ws, descriptor(et), INTERVAL '5' SECOND)
     )
     group by id, window_start, window_end;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    grouping sets 多维分析

    select
      id,
      sum(vc) vcSum,
      window_start,
      window_end
     from table(
     	TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND)
     )
     group by window_start, window_end,
     grouping sets ( (id) );
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    over 函数

    TopN

  • 相关阅读:
    最小公倍数
    java常见题
    Docker consul的容器服务更新与发现
    Web前端第四次作业
    基于微信小程序的在线小说阅读系统,附数据库、教程
    时间空间复杂度
    智能手表上的音频(一):架构
    计算机网络-子网划分
    Flask--认识flask与环境准备
    初学Vue(全家桶)-第18天(vue3):compositionAPI-组合API
  • 原文地址:https://blog.csdn.net/fzy629442466/article/details/134218926