• flink sql 使用


    1.准备工作

    安装flink 1.16.2 

    将以下jar包放到/data/cmpt/flink-1.16.2/lib 目录下

    1. antlr-runtime-3.5.2.jar   
    2. flink-connector-hive_2.12-1.16.2.jar
    3. flink-connector-jdbc-1.16.2.jar
    4. mysql-connector-java-6.0.6.jar
    5. hive-exec-3.1.3.jar             
    6. libfb303-0.9.3.jar 

    将flink-table-planner_2.12-1.16.2.jar 移动到/data/cmpt/flink-1.16.2/lib目录下

    mv /data/cmpt/flink-1.16.2/opt/flink-table-planner_2.12-1.16.2.jar /data/cmpt/flink-1.16.2/lib/

    将flink-table-loader_2.12-1.16.2.jar 删除

    修改flink配置

    1. jobmanager.rpc.address: localhost
    2. jobmanager.rpc.port: 6123
    3. jobmanager.bind-host: localhost
    4. jobmanager.memory.process.size: 6g
    5. jobmanager.execution.failover-strategy: region
    6. taskmanager.bind-host: localhost
    7. taskmanager.host: localhost
    8. taskmanager.memory.process.size: 7g
    9. taskmanager.numberOfTaskSlots: 50
    10. parallelism.default: 1
    11. rest.port: 8080
    12. rest.address: 0.0.0.0
    13. rest.bind-port: 8080-8090
    14. rest.bind-address: 0.0.0.0
    15. web.submit.enable: false
    16. web.cancel.enable: false

    启动flink

    1. ./start-cluster.sh 
    2. ./sql-client.sh 

    2.使用mysql外部表

    mysql中建表

    1. CREATE TABLE `mysql_user`  (
    2.   `id` varchar(32CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
    3.   `name` varchar(255CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL,
    4.   `age` int(11NULL DEFAULT NULL,
    5.   PRIMARY KEY (`id`) USING BTREE
    6. ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci ROW_FORMAT = Dynamic;

    flink sql 建立mysql外部表

    1. CREATE TABLE t_user (
    2.   `id` varchar(32),
    3.   `name` varchar(255),
    4.   `age` int
    5. WITH (
    6. 'connector' = 'jdbc',
    7. 'url' = 'jdbc:mysql://10.16.60.31:3306/datasets?useUnicode=true&characterEncoding=UTF-8&useSSL=false',
    8. 'table-name' = 'mysql_user', -- 需要手动到数据库中创建表
    9. 'username' = 'root',
    10. 'password' = '123456'
    11. )

    flink sql插入数据

    1. Flink SQL> insert into t_user values('A1''name_1'10);
    2. [INFO] Submitting SQL update statement to the cluster...
    3. [INFO] SQL update statement has been successfully submitted to the cluster:
    4. Job ID: 5d0ea93304314d3ca20b240b150bd0ad

    mysql中查看数据

    1. mysql> select * from mysql_user;
    2. +----+--------+------+
    3. | id | name   | age  |
    4. +----+--------+------+
    5. | A1 | name_1 |   10 |
    6. +----+--------+------+
    7. 1 row in set (0.00 sec)

    3.使用mysql catalog

    创建mysql_catalog

    1. Flink SQL> CREATE CATALOG mysql_catalog WITH(
    2. >     'type' = 'jdbc',
    3. >     'default-database' = 'datasets',
    4. >     'username' = 'root',
    5. >     'password' = '123456',
    6. >     'base-url' = 'jdbc:mysql://10.16.60.31:3306'
    7. > );
    8. Flink SQL> use catalog mysql_catalog;
    9. Flink SQL> show databases;
    10. +---------------+
    11. | database name |
    12. +---------------+
    13. |      datasets |
    14. |      dataxweb |
    15. |         dinky |
    16. |      hivemeta |
    17. +---------------+
    18. 4 rows in set
    19. Flink SQL> use datasets;
    20. Flink SQL> show tables;
    21. +---------------------+
    22. |          table name |
    23. +---------------------+
    24. |       uk_price_paid |
    25. |       user_behavior |
    26. | user_behavior_mysql |
    27. +---------------------+
    28. 3 rows in set
    29. Flink SQL> select * from uk_price_paid limit 10;

    64ba9e5ac3884ceba6bb1077ae9f5f66.png

    默认输出结果模式是表模式table。

    设置输出结果模式

    1. SET 'sql-client.execution.result-mode' = 'table';
    2. SET 'sql-client.execution.result-mode' = 'changelog';
    3. SET 'sql-client.execution.result-mode' = 'tableau';
    1. Flink SQL> SET 'sql-client.execution.result-mode' = 'tableau';
    2. Flink SQL> select * from uk_price_paid limit 10;

    2b48e638cd39463db4be3ee0e1122552.png

    4.使用hive catalog

    hive中建表

    1. hive (datasets)> CREATE TABLE `hive_user` (
    2.                >   `id` varchar(32),
    3.                >   `name` varchar(255),
    4.                >   `age` int
    5.                > );

    hive中插入数据(hive不支持记录级别的数据插入和更新,插入一条记录需要几秒到几十秒,生产环境不这样使用,这里只是为了演示)

    hive (datasets)> insert into hive_user values('A1''name_1'10);

    hive中查询

    1. hive (datasets)> select * from hive_user;
    2. OK
    3. hive_user.id    hive_user.name  hive_user.age
    4. A1      name_1  10
    5. Time taken: 0.205 seconds, Fetched: 1 row(s)

    flinksql 中创建catalog

    1. Flink SQL> CREATE CATALOG hive_catalog WITH (
    2. >     'type' = 'hive',
    3. >     'default-database' = 'datasets',
    4. >     'hive-conf-dir' = '/data/olap/hive-3.1.3/conf'
    5. > );
    6. Flink SQL> show catalogs;
    7. +-----------------+
    8. |    catalog name |
    9. +-----------------+
    10. default_catalog |
    11. |    hive_catalog |
    12. +-----------------+
    13. 2 rows in set
    14. Flink SQL> use catalog hive_catalog;
    15. Flink SQL> select * from hive_user;

    ec744f21a5544e20a2ebd816f3d4e3eb.png

    flinksql中插入数据

    1. Flink SQL> insert into hive_user values('A2''name_2'20);
    2. [INFO] Submitting SQL update statement to the cluster...
    3. [INFO] SQL update statement has been successfully submitted to the cluster:
    4. Job ID: 01cd773a57997afe9e43cd91fccad99a
    5. Flink SQL> select * from hive_user;

    3200728f5a734bbaa0a5b5b8a91b9681.png

    hive数据更新

    1. hive (datasets)> select * from hive_user;
    2. OK
    3. hive_user.id    hive_user.name  hive_user.age
    4. A1      name_1  10
    5. A2      name_2  20
    6. Time taken: 0.113 seconds, Fetched: 2 row(s)

     

     

     

  • 相关阅读:
    083-OCP题库日记
    C语言面试题(拓展)
    JIRA 在 2024 年完全停止服务器版本支持
    jQuery元素的筛选
    css 渐变下划线实现 移入移出 动画
    如此简单易懂的方式 让网站支持PWA
    IDEA代码替换
    【Java练习生】每日面试题学习打卡!
    Spring全家桶简介
    Baize_h1mini六足机器人零件准备
  • 原文地址:https://blog.csdn.net/shangjg03/article/details/133318715