• 基于binlog实现数据加工处理


    场景举例:

    为了查询方便性,目前订单中存在好多冗余字段,例如用户昵称,但是当昵称对应表变化时候,好多同学可能就直接在修改昵称的地方手 动调用订单接口更新昵称,但这样不仅代码结构混乱而且耦合严重

    使用说明:

    下面举例过程只是基于单机简单示例,没有加任务线程池、并发考虑。例外个人建议如果是同步重要的数据不要用此种同步方式,用不好可 能存在丢数据风险,向我上面说的一些仅展示无业务用户的字段可以用此同步

    具体使用

    应用maven核心依赖,其他spring等依赖可以自己添加

     
    <dependency>
      <groupId>com.github.shyikogroupId>
      <artifactId>mysql-binlog-connector-javaartifactId>
      <version>0.21.0version>
    dependency>
     
    <dependency>
      <groupId>mysqlgroupId>
      <artifactId>mysql-connector-javaartifactId>
      <version>6.0.6version>
    dependency>
    
    <dependency>
      <groupId>com.github.jsqlparsergroupId>
      <artifactId>jsqlparserartifactId>
      <version>4.5version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    下面是我举例说明具体实现代码,用户直接实现BizTableListener接口自定义自己想同步表逻辑即可,举例中我只放了些核心代码,具体代码已附件上传

    代码

    1.配置数据库信息

    package com.data.binlog.config;
    
    import lombok.Data;
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    @Data
    @Component
    @ConfigurationProperties(
            prefix = "binlog.datasource"
    )
    public class DataSourceConfig {
    
        private String url;
    
        private int port;
    
        private String username;
    
        private String passwd;
    
        private String db;
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    logging:
      config: classpath:log4j2/log4j2-dev.yml
    binlog:
      datasource:
        url: mysql的地址
        db: 数据库
        port: 端口
        username: 用户名
        passwd: 密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.加载binlog配置信息

    package com.data.binlog.config;
    
    import com.data.binlog.listener.BinLogEventListener;
    import com.github.shyiko.mysql.binlog.BinaryLogClient;
    import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import javax.annotation.Resource;
    
    @Configuration
    public class BinaryLogClientConfig {
    
        @Resource
        private BinLogEventListener binLogEventListener;
        @Resource
        private DataSourceConfig dataSourceConfig;
        @Bean
        public void BinaryLog() throws Exception{
            BinaryLogClient client = new BinaryLogClient(dataSourceConfig.getUrl(), dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());
            EventDeserializer eventDeserializer = new EventDeserializer();
            client.setEventDeserializer(eventDeserializer);
            client.registerEventListener(binLogEventListener);
            client.connect();
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    package com.data.binlog.listener;
    
    import cn.hutool.core.util.ObjectUtil;
    import com.data.binlog.config.DataSourceConfig;
    import com.data.binlog.context.TableContext;
    import com.data.binlog.listener.biz.BizTableRouteProcess;
    import com.data.binlog.param.BinLogItem;
    import com.data.binlog.param.ColumnInfo;
    import com.github.shyiko.mysql.binlog.BinaryLogClient;
    import com.github.shyiko.mysql.binlog.event.*;
    import com.google.common.collect.Lists;
    import com.google.common.collect.Maps;
    import lombok.SneakyThrows;
    import lombok.extern.slf4j.Slf4j;
    import net.sf.jsqlparser.JSQLParserException;
    import net.sf.jsqlparser.parser.CCJSqlParserUtil;
    import net.sf.jsqlparser.statement.Statement;
    import net.sf.jsqlparser.statement.alter.Alter;
    import net.sf.jsqlparser.util.TablesNamesFinder;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.io.Serializable;
    import java.sql.*;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * binlog监听主类
     */
    @Component
    @Slf4j
    public class BinLogEventListener implements BinaryLogClient.EventListener {
        //调用用户自定义表处理实现类
        @Resource
        private BizTableRouteProcess bizTableRouteProcess;
        @Resource
        private DataSourceConfig dataSourceConfig;
    
        @Override
        public void onEvent(Event event) {
            EventHeader header = event.getHeader();
            EventType eventType = header.getEventType();
            System.out.println("监听的事件类型:" + eventType);
            Map<String, Map<String, ColumnInfo>> tableIdColumnMap = TableContext.getTableIdColumnMap();
            Map<String, String> tableNameMap = TableContext.getTableNameMap();
            Map<String, String> tableIdMap = TableContext.getTableIdMap();
            if (eventType == EventType.TABLE_MAP) {
                TableMapEventData tableData = event.getData();
                String db = tableData.getDatabase();
                String table = tableData.getTable();
                String tableId = String.valueOf(tableData.getTableId());
                try {
                    tableNameMap.put(table, tableId);
                    tableIdMap.put(tableId, table);
                    //如果缓存中有数据结构则直接掉过
                    if (tableIdColumnMap.get(tableId) == null) {
                        Map<String, ColumnInfo> columnInfoMap = getColMap(db, table);
                        tableIdColumnMap.put(tableId, columnInfoMap);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            if (eventType == EventType.QUERY) {
                //更新表结构会进此逻辑
                QueryEventData queryEventData = event.getData();
                String execSql = queryEventData.getSql();
                Statement statement = null;
                try {
                    statement = CCJSqlParserUtil.parse(execSql);
                } catch (JSQLParserException e) {
                    log.error("{} sql语句格式错误", execSql);
                    return;
                }
                //如果字段发生更新需要删除重新获取
                if (statement instanceof Alter) {
                    Alter alterStatement = (Alter) statement;
                    String tableName = alterStatement.getTable().getName();
                    //数据结构有变化清除
                    tableIdColumnMap.remove(tableNameMap.get(tableName));
                    log.error("数据结构变化", tableName);
                }
            }
            if (EventType.isWrite(eventType)) {
                //获取事件体
                WriteRowsEventData data = event.getData();
            } else if (EventType.isUpdate(eventType)) {
                UpdateRowsEventData data = (UpdateRowsEventData) event.getData();
                for (Map.Entry<Serializable[], Serializable[]> mapEntry : data.getRows()) {
                    Map<String, ColumnInfo> columnInfoMap = tableIdColumnMap.get(String.valueOf(data.getTableId()));
                    Map<String, Serializable> before = Maps.newHashMap();
                    Map<String, Serializable> after = Maps.newHashMap();
                    Map<String, Object[]> change = Maps.newHashMap();
                    BinLogItem binLogItem = new BinLogItem();
                    binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
                    columnInfoMap.entrySet().forEach(entry -> {
                        String column = entry.getKey();
                        ColumnInfo columnInfo = entry.getValue();
                        Serializable beforeValue = mapEntry.getKey()[columnInfo.getIdx()];
                        Serializable afterValue = mapEntry.getValue()[columnInfo.getIdx()];
                        before.put(column, beforeValue);
                        after.put(column, afterValue);
                        if (!ObjectUtil.equals(beforeValue, afterValue)) {
                            change.put(column, Lists.newArrayList(beforeValue, afterValue).toArray());
                        }
                    });
                    binLogItem.setEventType(eventType);
                    binLogItem.setTimestamp(event.getHeader().getTimestamp());
                    binLogItem.setBefore(before);
                    binLogItem.setAfter(after);
                    binLogItem.setColumnChangeMap(change);
                    binLogItem.setColumnInfoMap(columnInfoMap);
                    bizTableRouteProcess.route(binLogItem);
                }
                System.out.println(data);
            } else if (EventType.isDelete(eventType)) {
                DeleteRowsEventData data = event.getData();
                BinLogItem binLogItem = new BinLogItem();
                binLogItem.setTableName(tableIdMap.get(String.valueOf(data.getTableId())));
                binLogItem.setEventType(eventType);
                binLogItem.setTimestamp(event.getHeader().getTimestamp());
                bizTableRouteProcess.route(binLogItem);
                System.out.println(data);
            }
    
        }
    
        /**
         * 获取表中所有的字段信息
         *
         * @param db
         * @param table
         * @return
         * @throws Exception
         */
        public Map<String, ColumnInfo> getColMap(String db, String table) throws Exception {
            Map<String, ColumnInfo> map = new HashMap<>();
            try {
                Class.forName("com.mysql.jdbc.Driver");
                // 保存当前注册的表的column信息
                Connection connection = DriverManager.getConnection("jdbc:mysql://" + dataSourceConfig.getUrl() + ":"
                        + dataSourceConfig.getPort(), dataSourceConfig.getUsername(), dataSourceConfig.getPasswd());// 执行sql
                String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +
                        "DATA_TYPE, ORDINAL_POSITION -1 as ORDINAL_POSITION , case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +
                        " FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ? order by ordinal_position";
                String sql = "SELECT * FROM TENANT_DOMAIN_REL";
                PreparedStatement ps = connection.prepareStatement(preSql);
                ps.setString(1, db);
                ps.setString(2, table);
                ResultSet rs = ps.executeQuery();
                while (rs.next()) {
                    String schema = rs.getString("TABLE_SCHEMA");
                    String tableName = rs.getString("TABLE_NAME");
                    String column = rs.getString("COLUMN_NAME");
                    int idx = rs.getInt("ORDINAL_POSITION");
                    String dataType = rs.getString("DATA_TYPE");
                    String isPKC = rs.getString("IS_PKC");
                    ColumnInfo columnInfo = new ColumnInfo(idx, schema, tableName, column, dataType, "Y".equals(isPKC));
                    map.put(column, columnInfo);
                }
                ps.close();
                rs.close();
    
            } catch (SQLException e) {
            }
            return map;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    package com.data.binlog.listener.biz;
    
    import cn.hutool.core.util.ObjectUtil;
    import cn.hutool.core.util.StrUtil;
    import com.data.binlog.annotation.BinLogTable;
    import com.data.binlog.param.BinLogItem;
    import com.data.binlog.param.ColumnInfo;
    import com.jdl.edu.core.common.utils.SpringContextHolder;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.Resource;
    import java.util.*;
    
    /**
     * 用户自定义实现路由
     */
    @Component
    public class BizTableRouteProcess {
    
        public static Map<String, List<BizTableListener>> tableIdColumnMap = new HashMap<>();
        @Resource
        private SpringContextHolder springContextHolder;
    
    
        @PostConstruct
        public void initRoute() {
            // 使用ServiceLoader加载UserService接口的所有实现
            List<BizTableListener> serviceImpl = SpringContextHolder.getBeanList(BizTableListener.class);
            // 遍历加载的实现并获取它们的Class对象
            if(serviceImpl != null){
                for (BizTableListener implementation : serviceImpl) {
                    BinLogTable table =  implementation.getClass().getAnnotation(BinLogTable.class);
                    if(table !=null && ObjectUtil.isNotEmpty(table.tableName())){
                        String tableName = table.tableName();
                        BizTableListener bean =  SpringContextHolder.getBean(StrUtil.lowerFirst(implementation.getClass().getSimpleName()),BizTableListener.class);
                        List<BizTableListener> tableImplList = tableIdColumnMap.getOrDefault(tableName,new ArrayList<>());
                        tableImplList.add(bean);
                        tableIdColumnMap.put(tableName,tableImplList);
                    }
                }
            }
    
        }
    
    
        public void route(BinLogItem binLogItem) {
            List<BizTableListener> tableImplList =  tableIdColumnMap.get(binLogItem.getTableName());
            if(ObjectUtil.isEmpty(tableImplList)){
               return;
            }
            tableImplList.forEach(impl->{
                impl.listener(binLogItem);
            });
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58

    下面是对user表变化的处理加工类,可以热插拔,直接打注解即可

    package com.data.binlog.listener.biz;
    
    import com.data.binlog.param.BinLogItem;
    
    /**
     * 自定义处理类统一实现接口
     */
    public interface BizTableListener {
    
    
        public void listener(BinLogItem binLogItem);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    package com.data.binlog.listener.biz.user;
    
    import com.data.binlog.annotation.BinLogTable;
    import com.data.binlog.listener.biz.BizTableListener;
    import com.data.binlog.param.BinLogItem;
    import com.github.shyiko.mysql.binlog.event.EventType;
    import org.springframework.stereotype.Component;
    
    /**
     * 用户想监听哪些表直接实现BizTableListener接口打BinLogTable注解上表明对应的表名
     */
    @BinLogTable(tableName = "user")
    @Component
    public class UserTableListener implements BizTableListener {
    
        private static Long currentTs = null;
    
        @Override
        public void listener(BinLogItem binLogItem) {
            //全量更新才需判断
            if (currentTs != null && binLogItem.getTimestamp() != null && currentTs >= binLogItem.getTimestamp()) {
                System.out.println("当前消息不是最新数据");
                return;
            }
            if (EventType.isUpdate(binLogItem.getEventType())) {
                System.out.println("自己的逻辑");
            } else {
                ///
            }
            currentTs = binLogItem.getTimestamp();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    总结

    代码附件: https://download.csdn.net/download/zhaoyonghenghcl/89221807
    上面是关于binlog同步基本代码实现,代码已分享链接,有问题随时沟通

  • 相关阅读:
    使用adb shell 命令接收串口发送过来的16进制数据 或者 发送16进制数据
    振弦采集仪应用于隧道安全监测
    将可调用对象转换为 c 风格指针
    【计算机基础】优雅的PPT就应该这样设计
    如何成为fpga工程师
    敦煌网“星云计划”:助商家开拓新流量和新玩法,测评补单快速提高产品排名和销量
    创建vMix虚拟集
    Next.js 热更新 Markdown 文件变更
    spring boot+ vue+ mysql开发的一套厘米级高精度定位系统源码
    OFDM 十六讲 2- OFDM and the DFT
  • 原文地址:https://blog.csdn.net/zhaoyonghenghcl/article/details/138194785