• 【Flink系列】开启jdbc批量写入


    背景

    开发Flink应用要求计算结果实时写入数据库的,一般业务写入TPS在600-800,如果生产同时跑十几个任务,数据库写入TPS接近一万,对数据库造成了较大压力,使用窗口的优化方向不可行:

    1. 计算任务的key值较为分散(如用户,商户维度),小窗口(1分钟、5分钟)计算无法减少写入次数,大窗口(10分钟、1小时)实时性太差;

    2. 无法保证上游流水100%有序准时到达,使用窗口计算容易漏算流水;


    优化思路

    使用Flink应用对流水进行计算和统计,结果一般字段较少,每条计算结果的数据量不大,如果开启批量写入,可以降低和数据库之间的网络交换次数,也可提升数据库的数据写入效率;


    数据库连接配置

    • rewriteBatchedStatements        

    配置此参数为true以后,jdbc驱动会在executeBatch时将SQL改写,将多条Insert语句合并为一条,效果如下:

    改写前:

    1. INSERT INTO `t` (`a`) VALUES(10);
    2. INSERT INTO `t` (`a`) VALUES(11);
    3. INSERT INTO `t` (`a`) VALUES(12);

    改写后:

    INSERT INTO `t` (`a`) VALUES(10),(11),(12);
    • useServerPrepStmts

    这个参数是用来节省数据库SQL解析和执行计划消耗的,开启useServerPrepStmts以后,服务器可以将已经解析的SQL反复使用,只在每次客户端提交新的写入请求时填入'?'占位符的参数;

    值得注意的是,useServerPrepStmts开启后,客户端不再对SQL预编译,也就是说上面说的SQL改写也不会发生,所以在批量写入时,把此参数关闭,或保持默认配置;

    关于rewriteBatchedStatements和useServerPrepStmts配合使用的性能测试,参考:

    MySql PreparedStatement executeBatch过慢问题 - 灰信网(软件开发博客聚合)

    • cachePrepStmts

    虽然 useServerPrepStmts = true 能让服务端执行预处理语句,但默认情况下客户端每次执行完后会 close 预处理语句,并不会复用,这样预处理的效率甚至不如文本执行。所以建议开启 useServerPrepStmts = true 后同时配置 cachePrepStmts = true,这会让客户端缓存预处理语句。

    • prepStmtCacheSqlLimit

    在配置 cachePrepStmts 后还需要注意 prepStmtCacheSqlLimit 配置(默认为 256),该配置控制客户端缓存预处理语句的最大长度,超过该长度将不会被缓存。

    在一些场景 SQL 的长度可能超过该配置,导致预处理 SQL 不能复用,建议根据应用 SQL 长度情况决定是否需要调大该值。

    • prepStmtCacheSqlLimit

    控制缓存的预处理语句数目(默认为 25),如果应用需要预处理的 SQL 种类很多且希望复用预处理语句,可以调大该值。


    配置参考

    jdbc:mysql://localhost:3306/mydb?charset=utf8mb4&useSSL=false&useConfigs=maxPerformance& cachePrepStmts=true&prepStmtCacheSqlLimit=8192&prepStmtCacheSize=1024&rewriteBatchedStatements=true&allowMultiQueries=true

    SQL改写

    原先SQL是这么写的:

    1. INSERT INTO
    2. rts_clear_branch_day_trans(stl_branch_id,
    3. stl_org_id,
    4. time_date,
    5. trans_count,
    6. trans_amount,
    7. mer_fee,
    8. acq_shr_amt)
    9. VALUES(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE
    10. trans_count = ?,
    11. trans_amount = ?,
    12. mer_fee = ? ,
    13. acq_shr_amt = ?

    改写后:

    1. INSERT INTO
    2. rts_clear_branch_day_trans(stl_branch_id,
    3. stl_org_id,
    4. time_date,
    5. trans_count,
    6. trans_amount,
    7. mer_fee,
    8. acq_shr_amt)
    9. VALUES(?,?,?,?,?,?,?) ON DUPLICATE KEY UPDATE
    10. trans_count = VALUES(`trans_count`),
    11. trans_amount = VALUES(`trans_amount`),
    12. mer_fee = VALUES(`mer_fee`),
    13. acq_shr_amt = VALUES(`acq_shr_amt`)

    Batch配置

    new JdbcExecutionOptions.Builder().withBatchSize(100).withBatchIntervalMs(1000).withMaxRetries(3).build(),
    
  • 相关阅读:
    算法排序基础(全版)
    立可得_第9章_统计分析
    Java -- 每日一问:Java常见的垃圾收集器有哪些?
    863. All Nodes Distance K in Binary Tree
    OpenCV自学笔记十五:图像轮廓
    Vue watch computed 生命周期执行顺序
    Net 高级调试之六:对象检查之值类型、引用类型、数组和异常的转储
    CSRF和XSS是什么?
    基于单片机的红外遥控解码程序设计与实现
    七月day26——二叉树
  • 原文地址:https://blog.csdn.net/RL_LEEE/article/details/127769802