Flink SQL
- flink 官网
- https://nightlies.apache.org/flink/flink-docsmaster/zh/docs/connectors/table/datagen/
flink sql版本的WordCount
- package com.wt.flink.sql
- import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
- object Demo1WoreCount {
- def main(args: Array[String]): Unit = {
- //环境设置对象
- val settings: EnvironmentSettings = EnvironmentSettings
- .newInstance() //新实例
- .inStreamingMode() // 在流处理模式下
- //.inBatchMode() // 在批处理迷失下
- .build()
-
- /**
- * flink sql 的环境
- *
- */
- val table: TableEnvironment = TableEnvironment.create(settings)
-
- /**
- * 1. 创建kafka source 表
- *
- */
-
- table.executeSql(
- """
- |CREATE TABLE words (
- | `word` STRING
- |) WITH (
- | 'connector' = 'kafka',
- | 'topic' = 'word',
- | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
- | 'properties.group.id' = 'asdasd',
- | 'scan.startup.mode' = 'earliest-offset',
- | 'format' = 'csv'
- |)
- |
- |""".stripMargin)
-
- /**
- * 创建一个print sink表
- *
- */
- table.executeSql(
- """
- |CREATE TABLE print_table (
- |word STRING,
- |c BIGINT
- |) WITH (
- | 'connector' = 'print'
- |)
- |
- |""".stripMargin)
-
- /**
- * 统计单词的数量
- */
- table.executeSql(
- """
- |insert into print_table
- |select word,count(1) as c
- |from words
- |group by word
- |""".stripMargin)
- }
- }
查看结果:

1、连接器
1、DataGen
用于生成随机数据的工具
只能用于source表
- CREATE TABLE datagen (
- id STRING,
- name STRING,
- age INT
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '5', -- 每秒生成的数据行数据
- 'fields.id.length' = '5', --字段长度限制
- 'fields.name.length'='3',
- 'fields.age.min' ='1', -- 最小值
- 'fields.age.max'='100' -- 最大值
- )
-
- -- ts AS localtimestamp, : localtimestamp获取当前时间戳,
2、Print
print用于打印连续查询的结果的表
print只能用于sink表
- 基于已有的表结构创建print表
- -- LIKE: 基于已有的表创建表
- CREATE TABLE print_table
- WITH ('connector' = 'print')
- LIKE datagen (EXCLUDING ALL)
- 打印数据
- insert into print_table
- |select * from datagen

- 手动设置字段
- CREATE TABLE print_table (
- age INT,
- num BIGINT
- )
- WITH ('connector' = 'print')
- 统计年龄额人数
- insert into age_num
- select age ,count(1) as num
- from datagen
- group by age
3、BlackHole
用于flink性能测试
- CREATE TABLE blackhole_table
- WITH ('connector' = 'blackhole')
- LIKE datagen (EXCLUDING ALL)

4、Kafka
- kafka依赖
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-kafkaartifactId>
- <version>1.15.0version>
- dependency>
- csv依赖
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-csvartifactId>
- <version>1.15.0version>
- dependency>
- kafka source
- CREATE TABLE student_kafka (
- id STRING,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'student',
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
- 'properties.group.id' = 'testGroup',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- 'csv.field-delimiter'=',', -- csv格式数据的分隔符
- 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
- 'csv.allow-comments'='true'--跳过#注释行
- )
参选介绍
scan.startup.mode:
earliest-offset: 读取所有的数据
latest-offset:读取最新的数据,只能读取到任务启动之后生产的数据
group-offsets(默认值): 基于以消费者组读取数据,如果消费者组不存在读取最新的数据
timestamp :指定时间戳读取数据
specific-offsets:指定偏移量读取数据
format:
csv: 文本格式,指定字段时需要按照顺序映射,flink sql会自动解析

- kafka sink, 使用flink向kafka中写数据存在两种情况,
- 将append only流写入kafka
- -- 创建kafka source 表
- CREATE TABLE student_kafka (
- id STRING,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'student',
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
- 'properties.group.id' = 'testGroup',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- 'csv.field-delimiter'=',', -- csv格式数据的分隔符
- 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
- 'csv.allow-comments'='true'--跳过#注释行
- )
- -- 创建 kafka sink表
- CREATE TABLE student_kafka_sink (
- id STRING,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'kafka',-- 只支持追加的流
- 'topic' = 'student_nan',
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
- 'format' = 'csv',
- 'csv.field-delimiter'='\t' -- csv格式数据的分隔符
- )
-
- --执行sql
- -- 非聚合类的连续查询返回的动态表是一个append only表,可以可以写入到kafka中
- insert into student_kafka_sink
- select * from
- student_kafka_source
- where gender ='男'
-
- --使用控制台查看数据
- kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic student_nan

- 更新的流写入kafka
- -- 创建kafka source 表
- CREATE TABLE student_kafka (
- id STRING,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'student',
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
- 'properties.group.id' = 'testGroup',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- 'csv.field-delimiter'=',', -- csv格式数据的分隔符
- 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
- 'csv.allow-comments'='true'--跳过#注释行
- )
- -- 创建 kafka sink表
- CREATE TABLE gender_num_sink (
- gender STRING,
- num BIGINT,
- PRIMARY KEY (gender) NOT ENFORCED-- 设置唯一主键
- ) WITH (
- 'connector' = 'upsert-kafka',
- 'topic' = 'gender_num',
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
- 'key.format' = 'csv',
- 'value.format' = 'csv'
- )
- --执行sql
- -- 将更新的流写入kafka
- -- 已唯一的主键作为kafka中key
- -- 已数据作为kafkavalue
- insert into gender_num_sink
- select gender,count(1) as num
- from student_kafka
- where gender is not null
- group by gender
-
- --使用控制台查看数据
- kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic gender_num
-
提交到集群运行需要先将kafka依赖包上传到flink lib目录下
flink-sql-connector-kafka-1.15.0.jar

向student的topic中输入一条数据
1500101005,小核弹,21,女,文科八班 查看结果:如下

5、JDBC
- 增加依赖
- <dependency>
- <groupId>org.apache.flinkgroupId>
- <artifactId>flink-connector-jdbcartifactId>
- <version>1.15.0version>
- dependency>
-
jdbc source --- 有界流
jdbc 字段按照名称和类型进行映射的,flink sql中表的字段和类型必须和数据库中保持一致
- -- 创建jdbc source表
- CREATE TABLE student_mysql (
- id BIGINT,
- name STRING,
- age BIGINT,
- gender STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/bigdata',
- 'table-name' = 'students',
- 'username' = 'root',
- 'password' = '123456'
- )
-
- -- 创建print sink 表
- CREATE TABLE print_table
- WITH ('connector' = 'print')
- LIKE student_mysql (EXCLUDING ALL)
-
- -- 执行sql
- insert into print_table
- select * from student_mysql

-
jdbc sink
实时读取kafka中学生表的胡数据,实时统计每个班级学生的人数,将统计的结果保存到mysql中
- -- flink sql kafka source表 学生表
- CREATE TABLE student_kafka (
- id STRING,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'student',
- 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
- 'properties.group.id' = 'testGroup',
- 'scan.startup.mode' = 'earliest-offset',
- 'format' = 'csv',
- 'csv.field-delimiter'=',', -- csv格式数据的分隔符
- 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
- 'csv.allow-comments'='true'--跳过#注释行
- )
- -- 在mysql中创建表
- CREATE TABLE `clazz_num` (
- `clazz` varchar(255) NOT NULL,
- `num` bigint(20) DEFAULT NULL,
- PRIMARY KEY (`clazz`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-
- -- flink sql jdbc sink表
- CREATE TABLE clazz_num_mysql (
- clazz STRING,
- num BIGINT,
- PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键更新数据
- ) WITH (
- 'connector' = 'jdbc',
- 'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
- 'table-name' = 'clazz_num', -- 需要手动到数据库中创建表
- 'username' = 'root',
- 'password' = '123456'
- )
-
- -- 以下查询返回的是一个更新流,flinksql会自动按照主键更新数据
- insert into clazz_num_mysql
- select clazz,count(1) as num from
- student_kafka
- where clazz is not null
- group by clazz
-
- -- 生产数据
- kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student
- 1500100001,施笑槐,22,女,文科六班
如果不在mysql中设置主键,如下:

设置主键之后,而且我们在kafka中打入数据,在mysql中就会实时更新。做到实时统计

实时更新的结果如下:

- 将包含jdbc代码提交到集群运行
- # 1、将flink-connector-jdbc-1.15.0.jar依赖上传到flink lib目录下
- # 2、将mysql-connector-java-5.1.49.jar mysql 驱动上传到flink lib目录下
-
- # 如果是使用yarn-session模式徐娅偶先重启yarn-session
- # 关闭
- yarm application -kill application_1658546198162_0005
- # 启动
- yarn-session-.sh -d
-
- # 将代码打包上传到服务器提交任务
- flink run -t yarn-session -Dyarn.application.id=application_1658546198162_0007 -c com.shujia.flink.sql.Demo9JDBCSink flink-1.0.jar
7、FIleSystem
本地文件,hdfs 其它的文件系统
-
读取文件
可以使用batch模式处理数据
- -- 文件 source
- CREATE TABLE student_file (
- id STRINg,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING
- ) WITH (
- 'connector' = 'filesystem', -- 必选:指定连接器类型
- 'path' = 'data/students.txt', -- 必选:指定路径
- 'format' = 'csv' -- 必选:文件系统连接器指定 format
- )
- -- 读取csv格式字段需要按照顺序映射
-
- --print sink
- CREATE TABLE print_table
- (
- clazz STRING,
- num BIGINT
- )
- WITH ('connector' = 'print')
-
- --执行sql
- insert into print_table
- select clazz,count(1) as num from
- student_file
- group by clazz
看结果:

- 写入文件
- -- 创建source 表
- CREATE TABLE datagen (
- id STRING,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING,
- ts AS localtimestamp
- ) WITH (
- 'connector' = 'datagen',
- 'rows-per-second' = '500', -- 每秒生成的数据行数据
- 'fields.id.length' = '5', --字段长度限制
- 'fields.name.length'='3',
- 'fields.gender.length'='1',
- 'fields.clazz.length'='5',
- 'fields.age.min' ='1', -- 最小值
- 'fields.age.max'='100' -- 最大值
- )
- -- 创建file sink表
- CREATE TABLE file_sink (
- id STRING,
- name STRING,
- age INT,
- gender STRING,
- clazz STRING,
- `day` STRING,
- `hour` STRING
- ) PARTITIONED BY (`day`,`hour`) WITH (
- 'connector'='filesystem',
- 'path'='data/flink_sink',
- 'format'='csv',
- 'sink.rolling-policy.file-size' ='100kb'--滚动生成新的文件的大小,默认128M
- )
-
- -- 执行sql
- insert into file_sink
- select
- id,name,age,gender,clazz,
- DATE_FORMAT(ts, 'yyyy-MM-dd') as `day`,
- DATE_FORMAT(ts, 'HH') as `hour`
- from
- datagen
数据所在的文件如下:

打开文件,数据结果如下:

