• 2.3 如何使用FlinkSQL读取&写入到JDBC(MySQL)


    1、JDBC SQL 连接器

    FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据

    添加Maven依赖

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-jdbcartifactId>
    4. <version>3.1.0-1.17version>
    5. dependency>

    注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包

     相关jar可以通过官网下载:JDBC SQL 连接器 


    2、读取 MySQL

    FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用

    1. -- 在FlinkSQL中创建 MySQL Source 表
    2. drop table mysql_source_table;
    3. CREATE TABLE mysql_source_table (
    4. `id` INT,
    5. `title` STRING,
    6. `author` STRING,
    7. `price` DOUBLE,
    8. `qty` INT
    9. ) WITH (
    10. 'connector' = 'jdbc',
    11. 'url' = 'jdbc:mysql://worker01/flink',
    12. 'driver' = 'com.mysql.jdbc.Driver', -- 【可选】不设置时,将自动从url中推导
    13. 'username' = 'xxxx',
    14. 'password' = 'xxxx',
    15. 'table-name' = 'books'
    16. );
    17. -- 批式 sql,查看 JDBC 表中的数据
    18. select * from mysql_source_table;

    运行结果:


    3、写入MySQL

    3.1 何时批量写入MySQL呢?

    FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

    开启checkpoint :

    1. # TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
    2. set execution.checkpointing.interval=300sec;

    设置 flush 前缓存记录的最大值 、flush 间隔时间:

    1. -- TODO 创建sink mysql table
    2. drop table mysql_sink_table;
    3. CREATE TABLE mysql_sink_table (
    4. `id` INT,
    5. `title` STRING,
    6. `author` STRING,
    7. `price` DOUBLE,
    8. `qty` INT
    9. ) WITH (
    10. 'connector' = 'jdbc',
    11. 'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
    12. 'username' = 'xxxx',
    13. 'password' = 'xxxx',
    14. 'table-name' = 'books',
    15. 'sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)
    16. 'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
    17. );

    使用说明:

    FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度

            当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接

            当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优


    3.2 sink mysql table 中主键的作用

    在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作

    否则连接器将以 append 模式工作

             upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行

                                   使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束

           append 模式:对MySQL库中底表做insert操作

     upsert 模式:

    1. -- TODO 创建MySQL 表
    2. CREATE TABLE `books` (
    3. `id` int(11) NOT NULL,
    4. `title` varchar(99) DEFAULT NULL,
    5. `author` varchar(99) DEFAULT NULL,
    6. `price` double DEFAULT NULL,
    7. `qty` int(11) DEFAULT NULL,
    8. PRIMARY KEY (`id`)
    9. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    10. -- TODO 创建FLinkSQL表(sink mysql table)
    11. drop table mysql_sink_table;
    12. CREATE TABLE mysql_sink_table (
    13. `id` INT,
    14. `title` STRING,
    15. `author` STRING,
    16. `price` DOUBLE,
    17. `qty` INT,
    18. PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
    19. ) WITH (
    20. 'connector' = 'jdbc',
    21. 'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
    22. 'username' = 'root',
    23. 'password' = 'xxxx',
    24. 'table-name' = 'books',
    25. 'sink.buffer-flush.max-rows' = '0' -- 实时写入
    26. );
    27. -- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
    28. insert into mysql_sink_table
    29. SELECT * FROM (VALUES
    30. (5,'A Dream in Red Mansions','y', 3.0,1)
    31. , (6,'Journey to the West','y', 3.0,1)
    32. , (7,'Water Margin','y', 3.0,1)
    33. ) AS books (id, title,author,price,qty);

    append 模式:

    1. -- TODO 创建FLinkSQL表(sink mysql table)
    2. drop table mysql_sink_table;
    3. CREATE TABLE mysql_sink_table (
    4. `id` INT,
    5. `title` STRING,
    6. `author` STRING,
    7. `price` DOUBLE,
    8. `qty` INT
    9. ) WITH (
    10. 'connector' = 'jdbc',
    11. 'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
    12. 'username' = 'root',
    13. 'password' = 'xxx',
    14. 'table-name' = 'books',
    15. 'sink.buffer-flush.max-rows' = '0' -- 实时写入
    16. );
    17. -- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
    18. insert into mysql_sink_table
    19. SELECT * FROM (VALUES
    20. (5,'A Dream in Red Mansions','y', 3.0,1)
    21. , (6,'Journey to the West','y', 3.0,1)
    22. , (7,'Water Margin','y', 3.0,1)
    23. ) AS books (id, title,author,price,qty);

    注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败

    insert into 失败:

  • 相关阅读:
    uniapp同步将本地图片转换为base64,支持微信、H5、APP
    Apache ActiveMQ 远程代码执行漏洞影响范围
    Python 全栈系列200 Redis Agent
    Java和前端都不好找工作,计算机毕业可以干嘛?
    apachectl: line 79: 20233 Segmentation fault (core dumped) $HTTPD “$@“
    Cannot resolve symbol ‘TimeUnit‘
    vscode中设置vue用户代码片段
    常见Web漏洞危害及整改建议
    Google Earth Engine(GEE)——ImageCollection (Error)遍历影像集合产生的错误
    数据可视化(Python)
  • 原文地址:https://blog.csdn.net/weixin_42845827/article/details/133890527