• Flink SQL-连接器


    1. flink 官网
    2. https://nightlies.apache.org/flink/flink-docsmaster/zh/docs/connectors/table/datagen/
    1. package com.wt.flink.sql
    2. import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
    3. object Demo1WoreCount {
    4. def main(args: Array[String]): Unit = {
    5. //环境设置对象
    6. val settings: EnvironmentSettings = EnvironmentSettings
    7. .newInstance() //新实例
    8. .inStreamingMode() // 在流处理模式下
    9. //.inBatchMode() // 在批处理迷失下
    10. .build()
    11. /**
    12. * flink sql 的环境
    13. *
    14. */
    15. val table: TableEnvironment = TableEnvironment.create(settings)
    16. /**
    17. * 1. 创建kafka source 表
    18. *
    19. */
    20. table.executeSql(
    21. """
    22. |CREATE TABLE words (
    23. | `word` STRING
    24. |) WITH (
    25. | 'connector' = 'kafka',
    26. | 'topic' = 'word',
    27. | 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    28. | 'properties.group.id' = 'asdasd',
    29. | 'scan.startup.mode' = 'earliest-offset',
    30. | 'format' = 'csv'
    31. |)
    32. |
    33. |""".stripMargin)
    34. /**
    35. * 创建一个print sink表
    36. *
    37. */
    38. table.executeSql(
    39. """
    40. |CREATE TABLE print_table (
    41. |word STRING,
    42. |c BIGINT
    43. |) WITH (
    44. | 'connector' = 'print'
    45. |)
    46. |
    47. |""".stripMargin)
    48. /**
    49. * 统计单词的数量
    50. */
    51. table.executeSql(
    52. """
    53. |insert into print_table
    54. |select word,count(1) as c
    55. |from words
    56. |group by word
    57. |""".stripMargin)
    58. }
    59. }
    查看结果:

    1、连接器

    1、DataGen

    用于生成随机数据的工具

    只能用于source表

    1. CREATE TABLE datagen (
    2. id STRING,
    3. name STRING,
    4. age INT
    5. ) WITH (
    6. 'connector' = 'datagen',
    7. 'rows-per-second' = '5', -- 每秒生成的数据行数据
    8. 'fields.id.length' = '5', --字段长度限制
    9. 'fields.name.length'='3',
    10. 'fields.age.min' ='1', -- 最小值
    11. 'fields.age.max'='100' -- 最大值
    12. )
    13. -- ts AS localtimestamp, : localtimestamp获取当前时间戳,

    2、Print

    print用于打印连续查询的结果的表

    print只能用于sink表

    • 基于已有的表结构创建print表
    1. -- LIKE: 基于已有的表创建表
    2. CREATE TABLE print_table
    3. WITH ('connector' = 'print')
    4. LIKE datagen (EXCLUDING ALL)
    • 打印数据
    1. insert into print_table
    2. |select * from datagen

    • 手动设置字段
    1. CREATE TABLE print_table (
    2. age INT,
    3. num BIGINT
    4. )
    5. WITH ('connector' = 'print')
    • 统计年龄额人数
    1. insert into age_num
    2. select age ,count(1) as num
    3. from datagen
    4. group by age

    3、BlackHole

    用于flink性能测试

    1. CREATE TABLE blackhole_table
    2. WITH ('connector' = 'blackhole')
    3. LIKE datagen (EXCLUDING ALL)

    4、Kafka

    • kafka依赖
    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-kafkaartifactId>
    4. <version>1.15.0version>
    5. dependency>
    • csv依赖
    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-csvartifactId>
    4. <version>1.15.0version>
    5. dependency>
    • kafka source
    1. CREATE TABLE student_kafka (
    2. id STRING,
    3. name STRING,
    4. age INT,
    5. gender STRING,
    6. clazz STRING
    7. ) WITH (
    8. 'connector' = 'kafka',
    9. 'topic' = 'student',
    10. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    11. 'properties.group.id' = 'testGroup',
    12. 'scan.startup.mode' = 'earliest-offset',
    13. 'format' = 'csv',
    14. 'csv.field-delimiter'=',', -- csv格式数据的分隔符
    15. 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
    16. 'csv.allow-comments'='true'--跳过#注释行
    17. )

    参选介绍

    scan.startup.mode:
    earliest-offset: 读取所有的数据
    latest-offset:读取最新的数据,只能读取到任务启动之后生产的数据
    group-offsets(默认值): 基于以消费者组读取数据,如果消费者组不存在读取最新的数据
    timestamp :指定时间戳读取数据
    specific-offsets:指定偏移量读取数据
    format:
    csv: 文本格式,指定字段时需要按照顺序映射,flink sql会自动解析

    • kafka sink, 使用flink向kafka中写数据存在两种情况,
      • 将append only流写入kafka
    1. -- 创建kafka source 表
    2. CREATE TABLE student_kafka (
    3. id STRING,
    4. name STRING,
    5. age INT,
    6. gender STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'kafka',
    10. 'topic' = 'student',
    11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    12. 'properties.group.id' = 'testGroup',
    13. 'scan.startup.mode' = 'earliest-offset',
    14. 'format' = 'csv',
    15. 'csv.field-delimiter'=',', -- csv格式数据的分隔符
    16. 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
    17. 'csv.allow-comments'='true'--跳过#注释行
    18. )
    19. -- 创建 kafka sink表
    20. CREATE TABLE student_kafka_sink (
    21. id STRING,
    22. name STRING,
    23. age INT,
    24. gender STRING,
    25. clazz STRING
    26. ) WITH (
    27. 'connector' = 'kafka',-- 只支持追加的流
    28. 'topic' = 'student_nan',
    29. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    30. 'format' = 'csv',
    31. 'csv.field-delimiter'='\t' -- csv格式数据的分隔符
    32. )
    33. --执行sql
    34. -- 非聚合类的连续查询返回的动态表是一个append only表,可以可以写入到kafka中
    35. insert into student_kafka_sink
    36. select * from
    37. student_kafka_source
    38. where gender ='男'
    39. --使用控制台查看数据
    40. kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic student_nan

    • 更新的流写入kafka
    1. -- 创建kafka source 表
    2. CREATE TABLE student_kafka (
    3. id STRING,
    4. name STRING,
    5. age INT,
    6. gender STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'kafka',
    10. 'topic' = 'student',
    11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    12. 'properties.group.id' = 'testGroup',
    13. 'scan.startup.mode' = 'earliest-offset',
    14. 'format' = 'csv',
    15. 'csv.field-delimiter'=',', -- csv格式数据的分隔符
    16. 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
    17. 'csv.allow-comments'='true'--跳过#注释行
    18. )
    19. -- 创建 kafka sink表
    20. CREATE TABLE gender_num_sink (
    21. gender STRING,
    22. num BIGINT,
    23. PRIMARY KEY (gender) NOT ENFORCED-- 设置唯一主键
    24. ) WITH (
    25. 'connector' = 'upsert-kafka',
    26. 'topic' = 'gender_num',
    27. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    28. 'key.format' = 'csv',
    29. 'value.format' = 'csv'
    30. )
    31. --执行sql
    32. -- 将更新的流写入kafka
    33. -- 已唯一的主键作为kafka中key
    34. -- 已数据作为kafkavalue
    35. insert into gender_num_sink
    36. select gender,count(1) as num
    37. from student_kafka
    38. where gender is not null
    39. group by gender
    40. --使用控制台查看数据
    41. kafka-console-consumer.sh --bootstrap-server master:9092,node2:9092,node2:9092 --from-beginning --topic gender_num
    • 提交到集群运行需要先将kafka依赖包上传到flink lib目录下

      flink-sql-connector-kafka-1.15.0.jar

    向student的topic中输入一条数据

    1. 1500101005,小核弹,21,女,文科八班
    2. 查看结果:如下

    5、JDBC

    • 增加依赖
    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-jdbcartifactId>
    4. <version>1.15.0version>
    5. dependency>
    • jdbc source --- 有界流

      jdbc 字段按照名称和类型进行映射的,flink sql中表的字段和类型必须和数据库中保持一致

    1. -- 创建jdbc source表
    2. CREATE TABLE student_mysql (
    3. id BIGINT,
    4. name STRING,
    5. age BIGINT,
    6. gender STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'jdbc',
    10. 'url' = 'jdbc:mysql://master:3306/bigdata',
    11. 'table-name' = 'students',
    12. 'username' = 'root',
    13. 'password' = '123456'
    14. )
    15. -- 创建print sink 表
    16. CREATE TABLE print_table
    17. WITH ('connector' = 'print')
    18. LIKE student_mysql (EXCLUDING ALL)
    19. -- 执行sql
    20. insert into print_table
    21. select * from student_mysql

    • jdbc sink

      实时读取kafka中学生表的胡数据,实时统计每个班级学生的人数,将统计的结果保存到mysql中

    1. -- flink sql kafka source表 学生表
    2. CREATE TABLE student_kafka (
    3. id STRING,
    4. name STRING,
    5. age INT,
    6. gender STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'kafka',
    10. 'topic' = 'student',
    11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    12. 'properties.group.id' = 'testGroup',
    13. 'scan.startup.mode' = 'earliest-offset',
    14. 'format' = 'csv',
    15. 'csv.field-delimiter'=',', -- csv格式数据的分隔符
    16. 'csv.ignore-parse-errors'='true', -- 如果出现脏数据据,补null
    17. 'csv.allow-comments'='true'--跳过#注释行
    18. )
    19. -- 在mysql中创建表
    20. CREATE TABLE `clazz_num` (
    21. `clazz` varchar(255) NOT NULL,
    22. `num` bigint(20) DEFAULT NULL,
    23. PRIMARY KEY (`clazz`)
    24. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    25. -- flink sql jdbc sink表
    26. CREATE TABLE clazz_num_mysql (
    27. clazz STRING,
    28. num BIGINT,
    29. PRIMARY KEY (clazz) NOT ENFORCED -- 按照主键更新数据
    30. ) WITH (
    31. 'connector' = 'jdbc',
    32. 'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
    33. 'table-name' = 'clazz_num', -- 需要手动到数据库中创建表
    34. 'username' = 'root',
    35. 'password' = '123456'
    36. )
    37. -- 以下查询返回的是一个更新流,flinksql会自动按照主键更新数据
    38. insert into clazz_num_mysql
    39. select clazz,count(1) as num from
    40. student_kafka
    41. where clazz is not null
    42. group by clazz
    43. -- 生产数据
    44. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student
    45. 1500100001,施笑槐,22,女,文科六班

    如果不在mysql中设置主键,如下:

    设置主键之后,而且我们在kafka中打入数据,在mysql中就会实时更新。做到实时统计

    实时更新的结果如下:

    • 将包含jdbc代码提交到集群运行
    1. # 1、将flink-connector-jdbc-1.15.0.jar依赖上传到flink lib目录下
    2. # 2、将mysql-connector-java-5.1.49.jar mysql 驱动上传到flink lib目录下
    3. # 如果是使用yarn-session模式徐娅偶先重启yarn-session
    4. # 关闭
    5. yarm application -kill application_1658546198162_0005
    6. # 启动
    7. yarn-session-.sh -d
    8. # 将代码打包上传到服务器提交任务
    9. flink run -t yarn-session -Dyarn.application.id=application_1658546198162_0007 -c com.shujia.flink.sql.Demo9JDBCSink flink-1.0.jar

    7、FIleSystem

    本地文件,hdfs 其它的文件系统

    • 读取文件

      可以使用batch模式处理数据

    1. -- 文件 source
    2. CREATE TABLE student_file (
    3. id STRINg,
    4. name STRING,
    5. age INT,
    6. gender STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'filesystem', -- 必选:指定连接器类型
    10. 'path' = 'data/students.txt', -- 必选:指定路径
    11. 'format' = 'csv' -- 必选:文件系统连接器指定 format
    12. )
    13. -- 读取csv格式字段需要按照顺序映射
    14. --print sink
    15. CREATE TABLE print_table
    16. (
    17. clazz STRING,
    18. num BIGINT
    19. )
    20. WITH ('connector' = 'print')
    21. --执行sql
    22. insert into print_table
    23. select clazz,count(1) as num from
    24. student_file
    25. group by clazz

    看结果:

    • 写入文件
    1. -- 创建source 表
    2. CREATE TABLE datagen (
    3. id STRING,
    4. name STRING,
    5. age INT,
    6. gender STRING,
    7. clazz STRING,
    8. ts AS localtimestamp
    9. ) WITH (
    10. 'connector' = 'datagen',
    11. 'rows-per-second' = '500', -- 每秒生成的数据行数据
    12. 'fields.id.length' = '5', --字段长度限制
    13. 'fields.name.length'='3',
    14. 'fields.gender.length'='1',
    15. 'fields.clazz.length'='5',
    16. 'fields.age.min' ='1', -- 最小值
    17. 'fields.age.max'='100' -- 最大值
    18. )
    19. -- 创建file sink表
    20. CREATE TABLE file_sink (
    21. id STRING,
    22. name STRING,
    23. age INT,
    24. gender STRING,
    25. clazz STRING,
    26. `day` STRING,
    27. `hour` STRING
    28. ) PARTITIONED BY (`day`,`hour`) WITH (
    29. 'connector'='filesystem',
    30. 'path'='data/flink_sink',
    31. 'format'='csv',
    32. 'sink.rolling-policy.file-size' ='100kb'--滚动生成新的文件的大小,默认128M
    33. )
    34. -- 执行sql
    35. insert into file_sink
    36. select
    37. id,name,age,gender,clazz,
    38. DATE_FORMAT(ts, 'yyyy-MM-dd') as `day`,
    39. DATE_FORMAT(ts, 'HH') as `hour`
    40. from
    41. datagen

    数据所在的文件如下:

    打开文件,数据结果如下:

  • 相关阅读:
    Python二进制文件转换为文本文件
    [Java] Spring Boot Auto Configure(Spring Boot自动装配)的原理
    数字转型新动力,开源创新赋能数字经济高质量发展
    java计算机毕业设计高校实习实训管理系统源码+mysql数据库+系统+lw文档+部署
    java 中文繁简体转换工具 opencc4j 使用介绍 1.8.0
    HTML仿腾讯微博首页(Dreamweaver网页作业)
    Python二分查找详解
    『现学现忘』Git后悔药 — 33、revert撤销(二)
    curl verbose模式有什么用
    不同场景下的JMETER设置
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/126065421