在同步MySQL数据到ES的场景中,选择了canal组件同步数据。
在同步的时候发现canal-adapter中canal-adapter/conf/es7/product.yml 配置文件中sql 语句连表查询的时候会出现无法更新Elasticsearch 中数据的情况,而且日志没有提示异常(idea启动的时候有错误日志),令人百思不得其解。
初步估计是内部解析yml的时候出错了,但具体是什么原因只能看源码调试了。
项目结构图如下:
配置文件: application.yml (## 备注代表需要注意和修改的地方)
- server:
- port: 8081
- spring:
- jackson:
- date-format: yyyy-MM-dd HH:mm:ss
- time-zone: GMT+8
- default-property-inclusion: non_null
-
- canal.conf:
- ## 模式
- mode: kafka #tcp kafka rocketMQ rabbitMQ
- flatMessage: true
- zookeeperHosts:
- syncBatchSize: 1000
- retries: 0
- timeout:
- accessKey:
- secretKey:
- consumerProperties:
- # canal tcp consumer
- canal.tcp.server.host: 127.0.0.1:11111
- canal.tcp.zookeeper.hosts:
- canal.tcp.batch.size: 500
- canal.tcp.username:
- canal.tcp.password:
- # kafka consumer
- ## kafka地址,ip用内网(容器)ip
- kafka.bootstrap.servers: 192.168.0.107:9092
- kafka.enable.auto.commit: false
- kafka.auto.commit.interval.ms: 1000
- kafka.auto.offset.reset: latest
- kafka.request.timeout.ms: 40000
- kafka.session.timeout.ms: 30000
- kafka.isolation.level: read_committed
- kafka.max.poll.records: 1000
- # rocketMQ consumer
- rocketmq.namespace:
- rocketmq.namesrv.addr: 127.0.0.1:9876
- rocketmq.batch.size: 1000
- rocketmq.enable.message.trace: false
- rocketmq.customized.trace.topic:
- rocketmq.access.channel:
- rocketmq.subscribe.filter:
- # rabbitMQ consumer
- rabbitmq.host:
- rabbitmq.virtual.host:
- rabbitmq.username:
- rabbitmq.password:
- rabbitmq.resource.ownerId:
-
- ## 数据库配置
- srcDataSources:
- defaultDS:
- url: jdbc:mysql://192.168.0.107:3306/test?useUnicode=true
- username: root
- password: root
- canalAdapters:
- ## Kafka主题名
- - instance: canal_manager # canal instance Name or mq topic name
- groups:
- - groupId: g1
- outerAdapters:
- - name: logger
- # - name: rdb
- # key: mysql1
- # properties:
- # jdbc.driverClassName: com.mysql.jdbc.Driver
- # jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
- # jdbc.username: root
- # jdbc.password: 121212
- # - name: rdb
- # key: oracle1
- # properties:
- # jdbc.driverClassName: oracle.jdbc.OracleDriver
- # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
- # jdbc.username: mytest
- # jdbc.password: m121212
- # - name: rdb
- # key: postgres1
- # properties:
- # jdbc.driverClassName: org.postgresql.Driver
- # jdbc.url: jdbc:postgresql://localhost:5432/postgres
- # jdbc.username: postgres
- # jdbc.password: 121212
- # threads: 1
- # commitSize: 3000
- # - name: hbase
- # properties:
- # hbase.zookeeper.quorum: 127.0.0.1
- # hbase.zookeeper.property.clientPort: 2181
- # zookeeper.znode.parent: /hbase
- ## Elasticsearch 配置,canal 1.5后name:es7(用es可能会有问题)
- - name: es7
- hosts: http://192.168.0.107:9200 # 127.0.0.1:9200 for rest mode
- properties:
- ## 模式rest
- mode: rest # or rest
- # security.auth: test:123456 # only used for rest mode
- cluster.name: elasticsearch
- # - name: kudu
- # key: kudu
- # properties:
- # kudu.master.address: 127.0.0.1 # ',' split multi address
- 复制代码
customer.ym (## 备注代表需要注意和修改的地方)
- dataSourceKey: defaultDS
- ## Kafka主题
- destination: canal_manager
- groupId: g1
- esMapping:
- ## Elasticsearch 索引
- _index: product
- ## 主键
- _id: _id
- _type: _doc
- upsert: true
- # relations:
- # customer_order:
- # name: customer
- ## 正确的sql
- #sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, r.Pro_No proNo, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
- ## 出错的sql
- sql: "SELECT s.Sales_No _id, s.Sales_Name salesName, p.Pro_Type proType FROM p_sales s INNER JOIN p_salespro_rela r ON r.Sales_No = s.Sales_No LEFT JOIN p_pro_info p ON r.Pro_No = p.Pro_No"
- etlCondition: "where p.c_time>={}"
- commitBatch: 3000
- 复制代码
启动入口:CanalAdapterApplication 全量同步接口类:CommonRest 请求示例:
- // post
- http://127.0.0.1:8081/etl/es7/customer.yml
- 复制代码
启动程序后,发现打印了错误日志。
- 2022-03-18 09:10:56.537 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: logger succeed
- 2022-03-18 09:10:56.742 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## Start loading es mapping config ...
- 2022-03-18 09:10:56.874 [main] INFO c.a.o.c.client.adapter.es.core.config.ESSyncConfigLoader - ## ES mapping config loaded
- 2022-03-18 09:11:00.028 [main] ERROR c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Load canal adapter: es7 failed
- java.lang.RuntimeException: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
- at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
- at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [classes/:na]
- at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [classes/:na]
- at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [classes/:na]
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
- at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
- at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1694) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:579) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:501) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$1(AbstractBeanFactory.java:353) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.cloud.context.scope.GenericScope$BeanLifecycleWrapper.getBean(GenericScope.java:390) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
- at org.springframework.cloud.context.scope.GenericScope.get(GenericScope.java:184) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
- at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:350) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:1089) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.cloud.context.scope.refresh.RefreshScope.eagerlyInitialize(RefreshScope.java:126) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
- at org.springframework.cloud.context.scope.refresh.RefreshScope.start(RefreshScope.java:117) ~[spring-cloud-context-2.0.0.RELEASE.jar:2.0.0.RELEASE]
- at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_271]
- at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_271]
- at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_271]
- at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_271]
- at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:264) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:182) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:144) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:400) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:354) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:888) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
- at org.springframework.context.support.AbstractApplicationContext.__refresh(AbstractApplicationContext.java:553) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.support.AbstractApplicationContext.jrLockAndRefresh(AbstractApplicationContext.java:40002) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:41008) ~[spring-context-5.0.5.RELEASE.jar:5.0.5.RELEASE]
- at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
- at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:759) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
- at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:395) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
- at org.springframework.boot.SpringApplication.run(SpringApplication.java:327) ~[spring-boot-2.0.1.RELEASE.jar:2.0.1.RELEASE]
- at com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication.main(CanalAdapterApplication.java:19) ~[classes/:na]
- Caused by: java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException
- at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:83) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
- at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:52) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
- ... 44 common frames omitted
- Caused by: com.alibaba.druid.sql.parser.ParserException: null
- at com.alibaba.otter.canal.client.adapter.es.core.config.SqlParser.parse(SqlParser.java:71) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
- at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.addSyncConfigToCache(ESAdapter.java:143) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
- at com.alibaba.otter.canal.client.adapter.es.core.ESAdapter.init(ESAdapter.java:75) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
- ... 45 common frames omitted
- 2022-03-18 09:11:00.046 [main] INFO c.alibaba.otter.canal.connector.core.spi.ExtensionLoader - extension classpath dir: /Users/Desktop/workspace/canal-canal-1.1.5/client-adapter/launcher/target/canal-adapter/plugin
- 2022-03-18 09:11:00.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: canal_manager-g1 succeed
- 2022-03-18 09:11:00.101 [Thread-35] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: canal_manager <=============
- 2022-03-18 09:11:00.101 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
- 2022-03-18 09:11:00.113 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8081"]
- 2022-03-18 09:11:00.126 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
- 2022-03-18 09:11:00.128 [Thread-35] INFO o
- 复制代码
查看日志异常好像是sql解析异常:java.lang.RuntimeException: com.alibaba.druid.sql.parser.ParserException,但是sql放到Navicat或其他工具中都可以正常执行,可能是canal内部有自己的解析规则。
根据错误日志断点调试一下,发现具体的问题在ES7xAdapter 适配器初始化的时候出错了。异常信息:“关系条件”列必须位于“选择”列中(Relation condition column must in select columns.)。
// ES同步指定sql格式解析 SqlParser.java
- /**
- * 解析sql
- *
- * @param sql sql
- * @return 视图对象
- */
- public static SchemaItem parse(String sql) {
- try {
- SQLStatementParser parser = new MySqlStatementParser(sql);
- SQLSelectStatement statement = (SQLSelectStatement) parser.parseStatement();
- MySqlSelectQueryBlock sqlSelectQueryBlock = (MySqlSelectQueryBlock) statement.getSelect().getQuery();
-
- SchemaItem schemaItem = new SchemaItem();
- schemaItem.setSql(SQLUtils.toMySqlString(sqlSelectQueryBlock));
- SQLTableSource sqlTableSource = sqlSelectQueryBlock.getFrom();
- List<TableItem> tableItems = new ArrayList<>();
- SqlParser.visitSelectTable(schemaItem, sqlTableSource, tableItems, null);
- tableItems.forEach(tableItem -> schemaItem.getAliasTableItems().put(tableItem.getAlias(), tableItem));
-
- List<FieldItem> fieldItems = collectSelectQueryFields(sqlSelectQueryBlock);
- fieldItems.forEach(fieldItem -> schemaItem.getSelectFields().put(fieldItem.getFieldName(), fieldItem));
-
- schemaItem.init();
-
- if (schemaItem.getAliasTableItems().isEmpty() || schemaItem.getSelectFields().isEmpty()) {
- throw new ParserException("Parse sql error");
- }
- return schemaItem;
- } catch (Exception e) {
- throw new ParserException();
- }
- }
- 复制代码
根据异常提示和测试,我们知道canal的配置文件sql格式要求连表查询的时候,必须将关联条件也查出来,就是说如果表A和表B的关联字段都要查出来。
- // 正确的sql,要将关联的两个表主键都查出来
- select a.id, b.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id
-
- // 错误的sql
- select a.id from a INNER JOIN a.id = c.a_id LEFT JOIN c.b_id = b.id
-
- 复制代码
有些问题还是需要源码才能发现的,就像这个情况,日志只提示了sql解析异常,但是看起来又没有问题,只能去看代码逻辑调试,才能发现根本的原因。个人经验总结,如果有不对的地方,请大家指正