业务中经常出现一些千万乃至亿级别的大表,此时可能考虑分库分表(Sharding-JDBC、MyCat等方案),也常同步数据进入ES中;同步数据这一业务场景中,Flink CDC是一个很不错的解决方案。
如mysql、postgresql、sqlserver等,flink cdc通过读取binlog日志(注意:请先开启binlog日志),进行数据同步,实时性较好。
对数据的解析和消费进行了二次封装,使用者只需增加简单的配置,实现FlinkConsumerListener接口,关注编写业务代码即可。
show coding
创建一个springboot项目
- <dependency>
- <groupId>com.kwingroupId>
- <artifactId>flinkartifactId>
- <version>0.0.1-SNAPSHOTversion>
- dependency>
flink:
pipeline-name: flinkCDCTest
mysqlDataSource:
- port: 3306
hostname: 127.0.0.1
databaseList:
- flinktest
tableList:
- flinktest.student
username: root
password: 123456
如上,针对flinktest数据库的student表进行binlog监听。
student实体
- import lombok.Data;
-
- /**
- * @author kwin
- * @Date 2022/7/25 18:27
- **/
- @Data
- public class Student {
- private Long id;
-
- private String name;
-
- private Integer age;
-
- private Integer maxInx;
- }
消费者
- import com.kwin.demo.server.module.flink.test.entity.Student;
- import com.kwin.flink.sink.FlinkConsumerListener;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- /**
- * @author kwin
- * @Date 2022/7/25 18:29
- **/
- @Slf4j
- @Component
- public class StudentConsumerListener implements FlinkConsumerListener
{ - @Override
- public String getDBName() {
- return "flinktest";
- }
-
- @Override
- public String getTable() {
- return "student";
- }
-
- @Override
- public void insert(Student data) {
- System.out.println("insert: " + data);
- }
-
- @Override
- public void update(Student srcData, Student destData) {
- System.out.println("update: \nsrc:" + srcData + "\ndest:" + destData);
- }
-
- @Override
- public void delete(Student data) {
- System.out.println("delete:"+data);
- }
- }
启动项目
flinktest.student修改数据时:

flinktest.student插入数据时:

flinktest.student删除数据时:

如上,使用者只需实现FlinkConsumerListener接口,即可对指定表的数据进行消费和业务逻辑操作。