flink sql常用配置
设置输出结果格式
SET sql - client. execution. result- mode = tableau;
kafka source to mysql sink
kafka
topic: bop_log_realtime
数据结构:
{"timestamp" :"2023-10-31 14:26:02.528" , "serverip" :"10.13.177.209" , "level" :"INFO" , "servicename" :"bop-fms-query-info" , "traceid" :"" , "spanid" :"" , "parent" :"" , "message" :"Resolving eureka endpoints via configuration" }
mysql表:
库名:flink_test
CREATE TABLE ` bop_log_realtime_warning` (
` id` bigint ( 20 ) NOT NULL AUTO_INCREMENT ,
` serverip` varchar ( 255 ) NOT NULL DEFAULT '' ,
` timestamp` varchar ( 255 ) NOT NULL DEFAULT '' ,
` level` varchar ( 255 ) NOT NULL DEFAULT '' ,
` servicename` varchar ( 255 ) NOT NULL DEFAULT '' ,
` traceid` varchar ( 255 ) NOT NULL DEFAULT '' ,
` spanid` varchar ( 255 ) NOT NULL DEFAULT '' ,
` parent` varchar ( 255 ) NOT NULL DEFAULT '' ,
` message` varchar ( 255 ) NOT NULL DEFAULT '' ,
` updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ,
PRIMARY KEY ( ` id` )
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE kafka_log_realtime_json (
` serverip` STRING
, ` timestamp` STRING
, ` level` STRING
, ` servicename` STRING
, ` traceid` STRING
, ` spanid` STRING
, ` parent` STRING
, ` message` STRING
) WITH (
'connector' = 'kafka' ,
'topic' = 'bop_log_realtime' ,
'properties.bootstrap.servers' = '10.2.25.221:9092,10.2.25.221:9093' ,
'properties.group.id' = 'testGroup2' ,
'format' = 'json' ,
'scan.startup.mode' = 'latest-offset'
) ;
CREATE TABLE bop_log_realtime_warning (
` serverip` STRING
, ` timestamp` STRING
, ` level` STRING
, ` servicename` STRING
, ` traceid` STRING
, ` spanid` STRING
, ` parent` STRING
, ` message` STRING
) WITH (
'connector' = 'jdbc'
, 'url' = 'jdbc:mysql://m3309i.hebe.grid.xx.com.cn:3309/flink_test?zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai'
, 'username' = 'super_mis'
, 'password' = 'mis_password'
, 'table-name' = 'bop_log_realtime_warning'
) ;
insert into bop_log_realtime_warning
SELECT
` serverip`
, ` timestamp`
, ` level`
, ` servicename`
, ` traceid`
, ` spanid`
, ` parent`
, ` message`
FROM kafka_log_realtime_json;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
窗口函数 开窗
datagen 自动生成数据表
CREATE TABLE ws (
id INT ,
vc INT ,
pt AS PROCTIME( ) ,
et AS cast( CURRENT_TIMESTAMP as timestamp ( 3 ) ) ,
WATERMARK FOR et AS et - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen' ,
'rows-per-second' = '10' ,
'fields.id.min' = '1' ,
'fields.id.max' = '3' ,
'fields.vc.min' = '1' ,
'fields.vc.max' = '100'
) ;
CREATE TABLE sink (
id INT ,
ts BIGINT ,
vc INT
) WITH (
'connector' = 'print'
) ;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
tumble 滚动窗口
滚动窗口 窗口大小5 秒
select
id,
sum ( vc) vcSum,
window_start,
window_end
from table (
TUMBLE( table ws, descriptor( et) , INTERVAL '5' SECOND )
)
group by id, window_start, window_end;
hop 滑动窗口
滑动窗口 滑动步长5 秒 窗口大小10 秒
注意:窗口大小= 滑动步长的整数倍(底层会优化成多个小滚动窗口)
select
id,
sum ( vc) vcSum,
window_start,
window_end
from table (
hop( table ws, descriptor( et) , INTERVAL '5' SECOND , INTERVAL '10' SECOND )
)
group by id, window_start, window_end;
cumulate 累积窗口
注意:窗口大小= 累积步长的整数倍
select
id,
sum ( vc) vcSum,
window_start,
window_end
from table (
CUMULATE( table ws, descriptor( et) , INTERVAL '5' SECOND )
)
group by id, window_start, window_end;
grouping sets 多维分析
select
id,
sum ( vc) vcSum,
window_start,
window_end
from table (
TUMBLE( table ws, descriptor( et) , INTERVAL '5' SECOND )
)
group by window_start, window_end,
grouping sets ( ( id) ) ;
over 函数
TopN