• canal同步MySQL的binlog数据时踩了个大坑


    背景

    在同步MySQL数据到ES的场景中,选择了canal组件同步数据。

    问题描述

    在同步的时候发现canal-adapter中canal-adapter/conf/es7/product.yml 配置文件中sql 语句连表查询的时候会出现无法更新Elasticsearch 中数据的情况,而且日志没有提示异常(idea启动的时候有错误日志),令人百思不得其解。

    问题分析

    初步估计是内部解析yml的时候出错了,但具体是什么原因只能看源码调试了。

    下载源码

    GitHub地址

    IDEA导入代码

    项目结构图如下:

    配置文件: application.yml (## 备注代表需要注意和修改的地方)

    1. server:
    2. port: 8081
    3. spring:
    4. jackson:
    5. date-format: yyyy-MM-dd HH:mm:ss
    6. time-zone: GMT+8
    7. default-property-inclusion: non_null
    8. canal.conf:
    9. ## 模式
    10. mode: kafka #tcp kafka rocketMQ rabbitMQ
    11. flatMessage: true
    12. zookeeperHosts:
    13. syncBatchSize: 1000
    14. retries: 0
    15. timeout:
    16. accessKey:
    17. secretKey:
    18. consumerProperties:
    19. # canal tcp consumer
    20. canal.tcp.server.host: 127.0.0.1:11111
    21. canal.tcp.zookeeper.hosts:
    22. canal.tcp.batch.size: 500
    23. canal.tcp.username:
    24. canal.tcp.password:
    25. # kafka consumer
    26. ## kafka地址,ip用内网(容器)ip
    27. kafka.bootstrap.servers: 192.168.0.107:9092
    28. kafka.enable.auto.commit: false
    29. kafka.auto.commit.interval.ms: 1000
    30. kafka.auto.offset.reset: latest
    31. kafka.request.timeout.ms: 40000
    32. kafka.session.timeout.ms: 30000
    33. kafka.isolation.level: read_committed
    34. kafka.max.poll.records: 1000
    35. # rocketMQ consumer
    36. rocketmq.namespace:
    37. rocketmq.namesrv.addr: 127.0.0.1:9876
    38. rocketmq.batch.size: 1000
    39. rocketmq.enable.message.trace: false
    40. rocketmq.customized.trace.topic:
    41. rocketmq.access.channel:
    42. rocketmq.subscribe.filter:
    43. # rabbitMQ consumer
    44. rabbitmq.host:
    45. rabbitmq.virtual.host:
    46. rabbitmq.username:
    47. rabbitmq.password:
    48. rabbitmq.resource.ownerId:
    49. ## 数据库配置
    50. srcDataSources:
    51. defaultDS:
    52. url: jdbc:mysql://192.168.0.107:3306/test?useUnicode=true
    53. username: root
    54. password: root
    55. canalAdapters:
    56. ## Kafka主题名
    57. - instance: canal_manager # canal instance Name or mq topic name
    58. groups:
    59. - groupId: g1
    60. outerAdapters:
    61. - name: logger
    62. # - name: rdb
    63. # key: mysql1
    64. # properties:
    65. # jdbc.driverClassName: com.mysql.jdbc.Driver
    66. # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
    67. # jdbc.username: root
    68. # jdbc.password: 121212
    69. # - name: rdb
    70. # key: oracle1
    71. # properties:
    72. # jdbc.driverClassName: oracle.jdbc.OracleDriver
    73. # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    74. # jdbc.username: mytest
    75. # jdbc.password: m121212
    76. # - name: rdb
    77. # key: postgres1
    78. # properties:
    79. # jdbc.driverClassName: org.postgresql.Driver
    80. # jdbc.url: jdbc:postgresql://localhost:5432/postgres
    81. # jdbc.username: postgres
    82. # jdbc.password: 121212
    83. # threads: 1
    84. # commitSize: 3000
    85. # - name: hbase
    86. # properties:
    87. # hbase.zookeeper.quorum: 127.0.0.1
    88. # hbase.zookeeper.property.clientPort: 2181
    89. # zookeeper.znode.parent: /hbase
    90. ## Elasticsearch 配置,canal 1.5后name:es7(用es可能会有问题)
    91. - name: es7
    92. hosts: http://192.168.0.107:9200 # 127.0.0.1:9200 for rest mode
    93. properties:
    94. ## 模式rest
    95. mode: rest # or rest
    96. # security.auth: test:123456 # only used for rest mode
    97. cluster.name: elasticsearch
    98. # - name: kudu
    99. # key: kudu
    100. # properties:
    101. # kudu.master.address: 127.0.0.1 # ',' split multi address
    102. 复制代码

    customer.ym (## 备注代表需要注意和修改的地方)

    1. dataSourceKey: defaultDS
    2. ## Kafka主题
    3. destination: canal_manager
    4. groupId: g1
    5. esMapping:
    6. ## Elasticsearch 索引
    7. _index: product
    8. ## 主键
    9. _id: _id
    10. _type: _doc
    11. upsert: true
    12. # relations:
    13. # customer_order:
    14. # name: customer
    15. ## 正确的sql
    16. #sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, r.Pro_No proNo, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
    17. ## 出错的sql
    18. sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
    19. etlCondition: "where p.c_time>={}"
    20. commitBatch: 3000
    21. 复制代码

    启动入口:CanalAdapterApplication 全量同步接口类:CommonRest 请求示例:

    1. // post
    2. http://127.0.0.1:8081/etl/es7/customer.yml
    3. 复制代码

    查找问题

    启动程序后,发现打印了错误日志。

    1. 2022-03-18 09:10:56.537 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
    2. 2022-03-18 09:10:56.742 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
    3. 2022-03-18 09:10:56.874 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
    4. 2022-03-18 09:11:00.028 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
    5. java.lang.RuntimeException: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
    6. at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    7. at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [classes/:na]
    8. at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [classes/:na]
    9. at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [classes/:na]
    10. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
    11. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
    12. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
    13. at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
    14. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    15. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    16. at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    17. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    18. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    19. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    20. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    21. at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    22. at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    23. at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    24. at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    25. at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    26. at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    27. at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    28. at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    29. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
    30. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
    31. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
    32. at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
    33. at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    34. at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    35. at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    36. at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    37. at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    38. at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    39. at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    40. at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    41. at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    42. at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    43. at org.springframework.context.support.AbstractApplicationContext.__refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    44. at org.springframework.context.support.AbstractApplicationContext.jrLockAndRefresh(AbstractApplicationContext.java:40002) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    45. at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:41008) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    46. at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    47. at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    48. at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    49. at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
    50. at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[classes/:na]
    51. Caused by: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
    52. at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    53. at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    54. ... 44 common frames omitted
    55. Caused by: com.alibaba.druid.sql.parser.ParserException: null
    56. at com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser.parse(SqlParser.java:71) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    57. at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:143) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    58. at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    59. ... 45 common frames omitted
    60. 2022-03-18 09:11:00.046 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /Users/Desktop/workspace/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin
    61. 2022-03-18 09:11:00.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: canal_manager-g1 succeed
    62. 2022-03-18 09:11:00.101 [Thread-35] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: canal_manager <=============
    63. 2022-03-18 09:11:00.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
    64. 2022-03-18 09:11:00.113 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
    65. 2022-03-18 09:11:00.126 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
    66. 2022-03-18 09:11:00.128 [Thread-35] INFO o
    67. 复制代码

    查看日志异常好像是sql解析异常:java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException,但是sql放到Navicat或其他工具中都可以正常执行,可能是canal内部有自己的解析规则。

    根据错误日志断点调试一下,发现具体的问题在ES7xAdapter 适配器初始化的时候出错了。异常信息:“关系条件”列必须位于“选择”列中(Relation condition column must in select columns.)。

    // ES同步指定sql格式解析 SqlParser.java

    1. /**
    2. * 解析sql
    3. *
    4. * @param sql sql
    5. * @return 视图对象
    6. */
    7. public static SchemaItem parse(String sql) {
    8. try {
    9. SQLStatementParser parser = new MySqlStatementParser(sql);
    10. SQLSelectStatement statement = (SQLSelectStatement) parser.parseStatement();
    11. MySqlSelectQueryBlock sqlSelectQueryBlock = (MySqlSelectQueryBlock) statement.getSelect().getQuery();
    12. SchemaItem schemaItem = new SchemaItem();
    13. schemaItem.setSql(SQLUtils.toMySqlString(sqlSelectQueryBlock));
    14. SQLTableSource sqlTableSource = sqlSelectQueryBlock.getFrom();
    15. List<TableItem> tableItems = new ArrayList<>();
    16. SqlParser.visitSelectTable(schemaItem, sqlTableSource, tableItems, null);
    17. tableItems.forEach(tableItem -> schemaItem.getAliasTableItems().put(tableItem.getAlias(), tableItem));
    18. List<FieldItem> fieldItems = collectSelectQueryFields(sqlSelectQueryBlock);
    19. fieldItems.forEach(fieldItem -> schemaItem.getSelectFields().put(fieldItem.getFieldName(), fieldItem));
    20. schemaItem.init();
    21. if (schemaItem.getAliasTableItems().isEmpty() || schemaItem.getSelectFields().isEmpty()) {
    22. throw new ParserException("Parse sql error");
    23. }
    24. return schemaItem;
    25. } catch (Exception e) {
    26. throw new ParserException();
    27. }
    28. }
    29. 复制代码

    解决方案

    根据异常提示和测试,我们知道canal的配置文件sql格式要求连表查询的时候,必须将关联条件也查出来,就是说如果表A和表B的关联字段都要查出来。

    1. // 正确的sql,要将关联的两个表主键都查出来
    2. select a.id, b.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id
    3. // 错误的sql
    4. select a.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id
    5. 复制代码

    总结

    有些问题还是需要源码才能发现的,就像这个情况,日志只提示了sql解析异常,但是看起来又没有问题,只能去看代码逻辑调试,才能发现根本的原因。个人经验总结,如果有不对的地方,请大家指正

  • 相关阅读:
    手把手带你玩摄像头模组
    Apifox --- 全套服务提升了团队效率,让研测之间充满了爱(记Apifox在工程中的实际应用)【云原生】
    827万!朔黄铁路基于5G边缘计算的智慧牵引变电所研究项目
    《HTML+CSS+JavaScript》之第14章 文本样式
    Windows 安装 Docker Desktop 到其他盘、迁移虚拟硬盘映像文件、压缩虚拟硬盘映像占用空间
    2023-08-31 LeetCode每日一题(一个图中连通三元组的最小度数)
    zlib库的交叉编译记录
    基于遗传优化的模糊聚类算法(GA-FCM)matlab仿真
    Python武器库开发-高级特性篇(八)
    CSS:border作用
  • 原文地址:https://blog.csdn.net/Trouvailless/article/details/128160443