• Flink-join(流表关联流表,流表关联维表),自定义函数,sql如何保证数据唯一性,执行一组sql的好处...


    1、Regular Joins

    将两个关联表长存再状态中,可以一直关联上
    会导致状态越来越大
    和批处理关联结果是一样的

    1. -- 创建学生表流表,数据再kafka中
    2. drop table student_join;
    3. CREATE TABLE student_join (
    4. id String,
    5. name String,
    6. age int,
    7. gender STRING,
    8. clazz STRING
    9. ) WITH (
    10. 'connector' = 'kafka',
    11. 'topic' = 'student_join',
    12. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    13. 'properties.group.id' = 'asdasdasd',
    14. 'format' = 'csv',
    15. 'scan.startup.mode' = 'latest-offset'
    16. );
    17. -- 分数表
    18. drop table score_join;
    19. CREATE TABLE score_join (
    20. s_id String,
    21. c_id String,
    22. sco int
    23. ) WITH (
    24. 'connector' = 'kafka',
    25. 'topic' = 'score_join',
    26. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    27. 'properties.group.id' = 'asdasdasd',
    28. 'format' = 'csv',
    29. 'scan.startup.mode' = 'latest-offset'
    30. );
    31. --- inner join
    32. select a.id,a.name,b.sco from
    33. student_join as a
    34. inner join
    35. score_join as b
    36. on a.id=b.s_id
    37. -- left outer join
    38. select a.id,a.name,b.sco from
    39. student_join as a
    40. left join
    41. score_join as b
    42. on a.id=b.s_id
    43. -- full outer join
    44. select a.id,a.name,b.sco from
    45. student_join as a
    46. full join
    47. score_join as b
    48. on a.id=b.s_id
    49. -- 创建生产者向两个topic中生产数据
    50. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
    51. 1500100001,1000001,98
    52. 1500100001,1000002,5
    53. 1500100001,1000003,0
    54. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
    55. 1500100001,施笑槐,22,女,文科六班
    56. 1500100002,吕金鹏,24,男,文科七班
    inner join

    left join

    2、Interval Joins

    1. -- 创建学生表流表,数据再kafka中
    2. CREATE TABLE student_join_proc (
    3. id String,
    4. name String,
    5. age int,
    6. gender STRING,
    7. clazz STRING,
    8. stu_time as PROCTIME()
    9. ) WITH (
    10. 'connector' = 'kafka',
    11. 'topic' = 'student_join',
    12. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    13. 'properties.group.id' = 'asdasdasd',
    14. 'format' = 'csv',
    15. 'scan.startup.mode' = 'latest-offset'
    16. );
    17. -- 分数表
    18. CREATE TABLE score_join_proc (
    19. s_id String,
    20. c_id String,
    21. sco int,
    22. sco_time as PROCTIME()
    23. ) WITH (
    24. 'connector' = 'kafka',
    25. 'topic' = 'score_join',
    26. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    27. 'properties.group.id' = 'asdasdasd',
    28. 'format' = 'csv',
    29. 'scan.startup.mode' = 'latest-offset'
    30. );
    31. -- Interval Joins
    32. select a.id,a.name,b.sco from
    33. student_join_proc as a, score_join_proc as b
    34. where a.id=b.s_id
    35. and a.stu_time BETWEEN b.sco_time - INTERVAL '15' SECOND AND b.sco_time
    36. -- 创建生产者向两个topic中生产数据
    37. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
    38. 1500100001,1000001,98
    39. 1500100001,1000002,5
    40. 1500100002,1000003,0
    41. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic student_join
    42. 1500100001,施笑槐,22,女,文科六班
    43. 1500100002,吕金鹏,24,男,文科七班
    44. -- 先键入学生信息,在15秒中之内键入成绩信息就会被录入到其中。如果超过15秒中之后再次键入可以关联的成绩信息,就不会被录入到其中

    3、Temporal Joins

    1. -- 订单表
    2. CREATE TABLE orders (
    3. order_id STRING, -- 订单编号
    4. price DECIMAL(32,2), --订单的价格
    5. currency STRING, -- 汇率表主键
    6. order_time TIMESTAMP(3), -- 订单发生的事件
    7. WATERMARK FOR order_time AS order_time -- 设置事件时间和水位线
    8. ) WITH (
    9. 'connector' = 'kafka',
    10. 'topic' = 'orders',
    11. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    12. 'properties.group.id' = 'asdasdasd',
    13. 'format' = 'csv',
    14. 'scan.startup.mode' = 'latest-offset'
    15. );
    16. --汇率表
    17. CREATE TABLE currency_rates (
    18. currency STRING, -- 汇率表主键
    19. conversion_rate DECIMAL(32, 2), -- 汇率
    20. update_time TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL, --汇率更新时间
    21. WATERMARK FOR update_time AS update_time,--时间字段和水位线
    22. PRIMARY KEY(currency) NOT ENFORCED--设置主键
    23. ) WITH (
    24. 'connector' = 'kafka',
    25. 'topic' = 'bigdata17.currency_rates',
    26. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    27. 'properties.group.id' = 'asdasdasd',
    28. 'format' = 'canal-json',
    29. 'scan.startup.mode' = 'earliest-offset',
    30. 'canal-json.ignore-parse-errors' = 'true'
    31. );
    32. -- 查询汇率表
    33. select
    34. currency ,
    35. conversion_rate,
    36. update_time
    37. from
    38. currency_rates
    39. -- Temporal Joins
    40. SELECT
    41. order_id,
    42. price,
    43. orders.currency,
    44. conversion_rate,
    45. order_time
    46. FROM orders
    47. LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
    48. ON orders.currency = currency_rates.currency;
    49. -- 订单表数据
    50. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic orders
    51. 001,1000.0,1001,2022-08-02 23:13:52
    52. 001,1000.0,1001,2022-08-02 23:13:54
    53. 001,1000.0,1001,2022-08-02 23:33:36
    54. 001,1000.0,1001,2022-08-02 23:46:01
    mysql 中的表的变化如下:

    两表聚合之后的结果如下:

    4、流表(kafka)关联维表(hbase,mysql)

    1、常规的join

    使用常规join做维表关联,会出现数据库中维表更新了,但是flink中无法捕获更新,只能关联到任务刚启动时读取的数据

    1. -- 创建一个jdbc维表 -- 有界流
    2. CREATE TABLE student_mysql (
    3. id BIGINT,
    4. name STRING,
    5. age BIGINT,
    6. gender STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'jdbc',
    10. 'url' = 'jdbc:mysql://master:3306/bigdata17',
    11. 'table-name' = 'students',
    12. 'username' = 'root',
    13. 'password' = '123456'
    14. );
    15. select
    16. cast(id as STRING)
    17. from
    18. student_mysql
    19. -- 分数表 -- 无界流
    20. CREATE TABLE score_join (
    21. s_id String,
    22. c_id String,
    23. sco int
    24. ) WITH (
    25. 'connector' = 'kafka',
    26. 'topic' = 'score_join',
    27. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    28. 'properties.group.id' = 'asdasdasd',
    29. 'format' = 'csv',
    30. 'scan.startup.mode' = 'latest-offset'
    31. );
    32. -- 无界流关联有界流
    33. select b.id,b.name,a.sco from
    34. score_join as a
    35. join
    36. student_mysql as b
    37. on
    38. a.s_id=cast(b.id as STRING)
    39. -- 创建生产者向两个topic中生产数据
    40. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
    41. 1500100003,1000001,98
    42. 1500100004,1000002,5
    43. 1500100001,1000003,0
    44. 1500101000,1000003,12

    上述方法有不足点,如下:

    2、Lookup Join
    1. -- 创建一个jdbc维表 -- 有界流
    2. CREATE TABLE student_mysql (
    3. id BIGINT,
    4. name STRING,
    5. age BIGINT,
    6. gender STRING,
    7. clazz STRING
    8. ) WITH (
    9. 'connector' = 'jdbc',
    10. 'url' = 'jdbc:mysql://master:3306/bigdata17',
    11. 'table-name' = 'students',
    12. 'username' = 'root',
    13. 'password' = '123456',
    14. 'lookup.cache.max-rows' = '100' ,-- 开启缓存,指定缓存数据量,可以提高关联性能
    15. 'lookup.cache.ttl' = '30s' -- 缓存过期时间,一般会按照维表更新频率设置
    16. );
    17. -- 分数表 -- 无界流
    18. CREATE TABLE score_join (
    19. s_id String,
    20. c_id String,
    21. sco int,
    22. pro_time as PROCTIME() -- Lookup Join关联方式,流表需要有一个时间字段
    23. ) WITH (
    24. 'connector' = 'kafka',
    25. 'topic' = 'score_join',
    26. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    27. 'properties.group.id' = 'asdasdasd',
    28. 'format' = 'csv',
    29. 'scan.startup.mode' = 'latest-offset'
    30. );
    31. SELECT
    32. b.id,b.name,b.age,a.sco
    33. FROM score_join as a
    34. LEFT JOIN student_mysql FOR SYSTEM_TIME AS OF a.pro_time as b
    35. ON cast(a.s_id as BIGINT)= b.id;
    36. -- 创建生产者向两个topic中生产数据
    37. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic score_join
    38. 1500100003,1000001,98
    39. 1500100004,1000002,5
    40. 1500100001,1000003,0

    11、自定义函数

    1、编写自定义函数

    1. import org.apache.flink.table.functions.ScalarFunction
    2. class SubstringFunction extends ScalarFunction{
    3. /**
    4. * eval 只能叫这个方法名
    5. * @return
    6. * 字符串切分
    7. */
    8. def eval(s: String, begin: Integer, end: Integer): String = {
    9. s.substring(begin, end)
    10. }
    11. }

    2、将项目打包上传到集群

    3、启动sql-client,指定jar包

    或者将包放在flink的lib目录

    sql-client.sh  -j flink-1.0-SNAPSHOT.jar

    4、创建自定义函数

    1. CREATE TEMPORARY SYSTEM FUNCTION
    2. substringFunction
    3. AS 'com.wt.flink.sql.MyFunction'
    4. LANGUAGE SCALA;
    1. -- source表
    2. CREATE TABLE words (
    3. `word` STRING
    4. ) WITH (
    5. 'connector' = 'kafka',
    6. 'topic' = 'w_exactly_once',
    7. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    8. 'properties.group.id' = 'asdasd',
    9. 'scan.startup.mode' = 'earliest-offset',
    10. 'format' = 'csv'
    11. );
    12. -- 在mysql中创建表
    13. CREATE TABLE `word_count` (
    14. `word` varchar(255) NOT NULL,
    15. `c` bigint(20) DEFAULT NULL,
    16. PRIMARY KEY (`word`)
    17. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    18. -- flink sql jdbc sink表
    19. CREATE TABLE word_count (
    20. word STRING,
    21. c BIGINT,
    22. PRIMARY KEY (word) NOT ENFORCED -- 按照主键更新数据
    23. ) WITH (
    24. 'connector' = 'jdbc',
    25. 'url' = 'jdbc:mysql://master:3306/bigdata17?useUnicode=true&characterEncoding=UTF-8',
    26. 'table-name' = 'word_count', -- 需要手动到数据库中创建表
    27. 'username' = 'root',
    28. 'password' = '123456'
    29. );
    30. -- 统计单词数量,将结果保存到数据库中
    31. insert into word_count
    32. select word,count(1) as c from words
    33. group by word
    34. --生产数据
    35. kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic w_exactly_once
    • 如果sql执行失败,直接重启sql,会出现反压,因为需要处理历史数据

    • 开启checkpiint

      • 可以在flink配置文件中统一开启
    • 创建一个sql文件,把所有的sql放在sql文件

    vim word_count.sql

    1. CREATE TABLE words (
    2. `word` STRING
    3. ) WITH (
    4. 'connector' = 'kafka',
    5. 'topic' = 'words_exactly_once',
    6. 'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
    7. 'properties.group.id' = 'asdasd',
    8. 'scan.startup.mode' = 'earliest-offset',
    9. 'format' = 'csv'
    10. );
    11. -- flink sql jdbc sink表
    12. CREATE TABLE word_count (
    13. word STRING,
    14. c BIGINT,
    15. PRIMARY KEY (word) NOT ENFORCED -- 按照主键更新数据
    16. ) WITH (
    17. 'connector' = 'jdbc',
    18. 'url' = 'jdbc:mysql://master:3306/bigdata?useUnicode=true&characterEncoding=UTF-8',
    19. 'table-name' = 'word_count', -- 需要手动到数据库中创建表
    20. 'username' = 'root',
    21. 'password' = '123456'
    22. );
    23. -- 重启sql之前增加参数
    24. SET 'execution.savepoint.path' = 'hdfs://master:9000/flink/checkpoint/8cbd9fb08bfbac13d3fd2dc58b1a6de7/chk-46';
    25. -- 重启sql
    26. insert into word_count
    27. select word,count(1) as c from words
    28. group by word;
    • 重启sql
    sql-client.sql -f word_count.sql

    12、执行一组sql

    1. CREATE TABLE print_table (
    2. word STRING,
    3. c BIGINT
    4. )
    5. WITH ('connector' = 'print');
    6. -- 执行多个inert into 语句
    7. -- 原表只需要读取一次就可以了
    8. EXECUTE STATEMENT SET
    9. BEGIN
    10. insert into print_table
    11. select word,count(1) as c from words
    12. group by word;
    13. insert into word_count
    14. select word,count(1) as c from words
    15. group by word;
    16. END;
  • 相关阅读:
    火花塞工作原理
    API 系列讲解之常见测试
    《影响力》笔记
    645仪表以JSON格式上发方法
    spi协议精讲
    程序员的数学课09 似然估计:如何利用 MLE 对参数进行估计?
    [附源码]JAVA毕业设计旅游景点展示平台的设计与实现(系统+LW)
    10min快速回顾C++语法(三)
    SSH 与 SSM
    React查询、搜索类功能的实现
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/126146859