• 记一次flink postgres cdc丢数据


    空指针异常

    业务库使用的是postgresql, 使用flink cdc同步几个表到hudi时,发现其中有一个表在同步过程中一直报空指针异常

    Caused by: java.lang.NullPointerException
    at org.postgresql.jdbc.FieldMetadata.getSize(FieldMetadata.java:79)
    at org.postgresql.util.LruCache.put(LruCache.java:128)
    at org.postgresql.util.LruCache.putAll(LruCache.java:154)
    at org.postgresql.jdbc.PgResultSetMetaData.fetchFieldMetaData(PgResultSetMetaData.java:267)
    at org.postgresql.jdbc.PgResultSetMetaData.isAutoIncrement(PgResultSetMetaData.java:57)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    异常分析

    org.postgresql.jdbc.PgResultSetMetaData.fetchFieldMetaData打断点调试,其实也就通过sql去查表的元数据而已,但是观察查询结果ResultSet有两个列元数据的schema值为null,这才导致了上述的空指针出现:
    在这里插入图片描述
    结果集中有57条记录,但是我的表只有55列,一开始以为这个表有什么虚拟列之类的,但是观察两条多出来的数据不太像属于schema的内容,首先tableOid就不对了,tableName也感觉是随机生成的。最终发现多出的两条记录是表的数据记录,tableOid为表的主键值,多出的两条是相邻的主键(1033629,1033630),schema对应的是第5列也恰恰为null.

    • 查询表的列元数据时,为什么会有表的数据记录混在里面?sql单独拎出来查询是可以得到正常结果55列的。
    • 表的数据很多,为什么每次这里都固定多出相同的两条?

    异常解决

    从表面看,像是使用jdbc的过程中由于并发而引发的数据脏乱问题。这种问题很难调试,也没有太多时间去发追踪,所以把结果集中schema为null的数据过滤作为解决方法。

    if(null!=schemaName)
    	md.put(key, fieldMetadata);
    
    • 1
    • 2

    数据丢失

    flink cdc job无异常运行一段时间后,产生了另一个问题,sink到hudi的表少了2条数据。通过二分查询的方式将source/sink表做比较,所幸表的数据并不多,最终也确定了这两条数据就是上面的1033629,1033630

    Exported {} of {} records for table
    
    • 1

    从程序日志看到,cdc snapshot阶段读取出来后数据就少了,而不是写入hudi后丢失的。

    分析与解决

    io.debezium.relational.RelationalSnapshotChangeEventSource#createDataEventsForTable方法就是全量把表数据查询到ResultSet中,然后循环将每行Row数据转为Object[]发往下游的flink算子,调试发现问题是出现在下面的代码:

    final Object[] row = jdbcConnection.rowToArray(table, schema(), rs, columnArray);
    
    • 1

    正好,rowToArray会垂直向下调用到开头的空指针异常的堆栈,即在将行转为数组过程中需要获取该表的元数据,PostgresConnection#getColumnValue

    final ResultSetMetaData metaData = rs.getMetaData();
    final String columnTypeName = metaData.getColumnTypeName(columnIndex);// 主要方法,获取列的类型名称,int8->bigserial类型映射
    final PostgresType type = ((PostgresSchema) schema).getTypeRegistry().get(columnTypeName);  //类型名称对应的PgType
    
    • 1
    • 2
    • 3

    在获取列的类型名称时,会进一步去获取列的元数据,对于自动递增的列会做 int8->bigserial类型映射,bigserial对应的PostgresType是int8,没必要做多一次无用的转化,直接改了以上的代码避免进入org.postgresql.jdbc.PgResultSetMetaData.fetchFieldMetaData

    final ResultSetMetaData metaData = rs.getMetaData();
    final String columnTypeName =( (PgResultSetMetaData) metaData)._getPGType(columnIndex-1); //直接从ResultSetMetaData获取类型
    final PostgresType type = ((PostgresSchema) schema).getTypeRegistry().get(columnTypeName);
    
    • 1
    • 2
    • 3

    getPGType为protected,需要放大访问权限为public,为了安全增加了_getPGType

    总结

    目前这样的方式解决了空指针和数据丢失的问题,这应该算并发引起的两种不同现象。不清楚产生并发的根是在哪里,所以这种方法解决了当前场景的异常,但是不确定是否还会发生在其它地方,同步在github上提了issue:https://github.com/ververica/flink-cdc-connectors/issues/1480,期待更好的解决办法 。

  • 相关阅读:
    汽车4S集团数据分析
    如果我们是那晚负责修复 B 站崩了的开发人员
    关于Selemium那些事
    大数据运维实战第二十九课 Hadoop 跨集群数据迁移应用实践
    notepad++ 设置文件名标签在左边
    facebook怎么加好友
    雅思学习总结
    uni-app连接蓝牙多次回调
    Set接口的实现类---TreeSet
    羊孒个羊过关思路,让你在好友榜单排名第一
  • 原文地址:https://blog.csdn.net/czmacd/article/details/126462372