• Flinkcdcmysql实时同步pgsql报错,如何排查后面id


    关注 码龄 粉丝数 原力等级 -- 被采纳 被点赞 采纳率 RanFps 2024-06-20 19:14 采纳率: 0% 浏览 10 首页/ 大数据 / Flinkcdcmysql实时同步pgsql报错,如何排查后面id flink Flink mysql实时同步pgsql。为什么我吧数据同步到pgsql中id一直在DO UPDATE SET 后面这个应该怎么才能排除掉呢 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 创建MySQL CDC源表 String sourceDDL = "CREATE TABLE mysql_source (" + " id INT," + " name STRING," + " age STRING," + " sex STRING," + " phone STRING," + " email STRING," + " birthday STRING," + " PRIMARY KEY (id) NOT ENFORCED" + ") WITH (" + " 'server-time-zone' = 'UTC'," + " 'connector' = 'mysql-cdc', " + " 'hostname' = '*'," + " 'port' = '3306'," + " 'username' = '*'," + " 'password' = '*'," + " 'scan.startup.mode' = 'latest-offset'," + " 'database-name' = '*'," + " 'table-name' = '*'," + " 'scan.incremental.snapshot.chunk.key-column' = 'id'" + ")"; // 注册源表 tableEnv.executeSql(sourceDDL); // 配置 PostgreSQL Sink String postgresSinkDDL = "CREATE TABLE postgres_sink (" + " id INT PRIMARY KEY," + " name STRING," + " age STRING," + " sex STRING," + " phone STRING," + " email STRING," + " birthday STRING" + ") WITH (" + " 'connector' = 'jdbc'," + " 'url' = 'jdbc:postgresql://*:5432/*'," + " 'table-name' = ‘*'," + " 'username' = '*'," + " 'password' = '*'"+ ")"; tableEnv.executeSql(postgresSinkDDL); // 插入语句,将 MySQL 数据插入到 PostgreSQL tableEnv.executeSql("INSERT INTO postgres_sink SELECT * FROM mysql_source").print(); // 触发 Flink 作业执行 env.execute("MySQL to PostgreSQL CDC Example"); Caused by: java.lang.RuntimeException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:181) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.writeRecord(JdbcOutputFormat.java:187) at org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction.invoke(GenericJdbcSinkFunction.java:57) at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44) at org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer.processElement(ConstraintEnforcer.java:247) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:313) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter$OutputCollector.collect(MySqlRecordEmitter.java:154) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.emit(RowDataDebeziumDeserializeSchema.java:162) at com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:144) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitElement(MySqlRecordEmitter.java:129) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.processElement(MySqlRecordEmitter.java:111) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:83) at com.ververica.cdc.connectors.mysql.source.reader.MySqlRecordEmitter.emitRecord(MySqlRecordEmitter.java:55) at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:545) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:785) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: Writing records to JDBC failed. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.checkFlushException(JdbcOutputFormat.java:181) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:212) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.lambda$open$0(JdbcOutputFormat.java:155) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.io.IOException: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO ods.ods_cdc_user1(id, name, age, sex, phone, email, birthday) VALUES (455, 'df', '24', '1', '139', '123@321.com', '1696953132000') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, age=EXCLUDED.age, sex=EXCLUDED.sex, phone=EXCLUDED.phone, email=EXCLUDED.email, birthday=EXCLUDED.birthday was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch. at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:222) ... 8 more Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO ods.ods_cdc_user1(id, name, age, sex, phone, email, birthday) VALUES (455, 'df', '24', '1', '139', '123@321.com', '1696953132000') ON CONFLICT (id) DO UPDATE SET id=EXCLUDED.id, name=EXCLUDED.name, age=EXCLUDED.age, sex=EXCLUDED.sex, phone=EXCLUDED.phone, email=EXCLUDED.email, birthday=EXCLUDED.birthday was aborted: ERROR: modification of distribution columns in OnConflictUpdate is not supported Call getNextException to see other errors in the batch. at org.postgresql.jdbc.BatchResultHandler.handleCompletion(BatchResultHandler.java:199) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:532) at org.postgresql.jdbc.PgStatement.internalExecuteBatch(PgStatement.java:870) at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:893) at org.postgresql.jdbc.PgPreparedStatement.executeBatch(PgPreparedStatement.java:1644) at org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatementImpl.executeBatch(FieldNamedPreparedStatementImpl.java:65) at org.apache.flink.connector.jdbc.internal.executor.TableSimpleStatementExecutor.executeBatch(TableSimpleStatementExecutor.java:64) at org.apache.flink.connector.jdbc.internal.executor.TableBufferReducedStatementExecutor.executeBatch(TableBufferReducedStatementExecutor.java:101) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.attemptFlush(JdbcOutputFormat.java:246) at org.apache.flink.connector.jdbc.internal.JdbcOutputFormat.flush(JdbcOutputFormat.java:216) ... 8 more Caused by: org.postgresql.util.PSQLException: ERROR: modification of distribution columns in OnConflictUpdate is not supported at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:521) ... 16 more 展开全部 收起 写回答 好问题 0 提建议 追加酬金 关注问题 微信扫一扫 点击复制链接 分享 邀请回答 编辑 收藏 删除 结题 收藏 举报 追加酬金 (90%的用户在追加酬金后获得了解决方案) 当前问题酬金 ¥ 0 (可追加 ¥500) ¥ 15¥ 20¥ 50¥ 100¥ 200 支付方式 扫码支付 二维码出错 点击刷新 支付金额 15 元 提供问题酬金的用户不参与问题酬金结算和分配 支付即为同意 《付费问题酬金结算规则》 结题 再想想 删除 再等等

    2条回答 默认 最新

    • 专家-赤兔[在线] 全栈领域优质创作者 2024-06-20 19:15
      关注

      引自免费微信小程序:皆我百晓生

      根据你提供的错误信息,问题似乎出现在尝试将数据插入到PostgreSQL表时发生的。异常堆栈指出插入操作时发生了一个BatchUpdateException,并且具体的错误信息是:“修改分布列在OnConflictUpdate中不受支持”。这可能是因为你在执行INSERT INTO语句时使用了PostgreSQL的ON CONFLICT DO UPDATE语句,试图在发生冲突时更新某些列,但是可能涉及到了一些不应该被修改的列。

      首先,确保你的PostgreSQL表的主键(在你的例子中为id)已经被正确地定义为唯一键或主键。然后,你需要检查你的INSERT语句中的ON CONFLICT DO UPDATE部分,确保你没有试图更新任何被认为是分布列的字段。在PostgreSQL中,分布列通常用于物理存储和查询优化,不应该被直接修改。

      另外,你需要检查你的JDBC连接配置是否正确,特别是URL、用户名和密码等参数。确保你正在连接到正确的数据库和表。此外,还需要检查PostgreSQL的版本是否支持你所使用的特性。

      对于你的Flink代码,确保你已经正确创建了源表和目标表,并且DDL语句是正确的。还需要检查是否有足够的权限来访问和操作数据库中的表。

      如果你仍然遇到问题,你可以尝试以下步骤来进一步调试:

      1. 确保你的PostgreSQL数据库已经正确配置并可以正常工作。你可以尝试使用其他工具(如PgAdmin或命令行工具)来手动插入数据并查看是否遇到相同的问题。
      2. 检查你的Flink配置是否正确,包括JDBC连接参数和表结构定义等。确保所有的参数都是正确的并且与数据库配置匹配。
      3. 查看Flink的日志以获取更多关于错误的详细信息。查看是否有其他警告或错误信息可以帮助你找到问题的根源。

      如果你能提供更多关于你的数据库结构、表定义和具体的INSERT语句的信息,我可能能够提供更具体的帮助。

      评论 编辑记录
    • 阿里嘎多学长 2024-06-20 19:21
      关注

      以下内容由CHATGPT及阿里嘎多学长共同生成、有用望采纳:


      从你提供的错误信息来看,问题出现在尝试使用 OnConflictUpdate 语句更新 PostgreSQL 数据库时,id 字段作为分布列(distribution column)不能被修改。PostgreSQL 在使用 ON CONFLICT 语句时,如果表是使用分布键(distribution key)创建的,那么不能在 DO UPDATE 子句中修改分布键的列。

      根据搜索结果[^3^][^5^][^6^],这个问题通常与 PostgreSQL 的 OnConflictUpdate 语句的使用有关。以下是一些可能的解决方案:

      1. 检查表的分布键:确保你了解 PostgreSQL 表的分布键设置,并且没有尝试在 DO UPDATE 子句中修改这些键。

      2. 使用不同的同步策略:如果可能,考虑使用不同的同步策略或逻辑来避免在 DO UPDATE 子句中修改分布键。

      3. 调整 Flink 配置:在 Flink 配置中,可能需要调整 sink 配置,以确保不会尝试执行不被支持的操作。

      4. 使用自定义逻辑:在 Flink SQL 中,使用自定义的 SQL 逻辑或编写自定义函数来处理更新操作,避免直接使用 OnConflictUpdate

      5. 更新 Flink 版本:如果你使用的是较旧版本的 Flink,可能存在已知的 bug,更新到最新版本可能会解决问题。

      6. 社区支持:如果上述方法都不能解决问题,可以考虑在 Flink 社区或相关技术论坛上寻求帮助。

      参考链接:

      评论 编辑记录
    编辑
    预览

    报告相同问题?

  • 相关阅读:
    java计算机毕业设计高校迎新管理系统源码+数据库+系统+lw文档+部署
    OA项目之我的审批(查询&会议签字)
    pdf也可以制作成可翻页的电子书吗?
    面试中的自我激励:如何展示你的内在驱动力
    Java中该如何面向对象以及类的定义、使用和实例化
    Mybatis源码解析(四):sql语句及#{}、${}的解析
    LVS+Keepalived群集
    HyperGBM开源工程介绍
    Jmeter基础篇
    如何使用Python和Matplotlib创建双Y轴动态风格折线图 | 数据可视化教程
  • 原文地址:https://ask.csdn.net/questions/8121832