• 使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程


    使用Apache Flink实现实时数据同步与清洗:MySQL和Oracle到目标MySQL的ETL流程

    实现数据同步的ETL(抽取、转换、加载)过程通常涉及从源系统(如数据库、消息队列或文件)中抽取数据,进行必要的转换,然后将数据加载到目标系统(如另一个数据库或数据仓库)。在这里,我们将展示如何使用Apache Flink来实现一个从MySQL或Oracle数据库抽取数据并同步到另一个MySQL数据库的ETL过程。

    • 1. 从源数据库(MySQL和Oracle)实时抽取数据
    • 2. 对数据进行清洗和转换
    • 3. 将转换后的数据写入目标数据库(MySQL)
      请添加图片描述

    我们将使用Apache Flink来实现这个流程。Flink具有强大的数据流处理能力,适合处理实时数据同步和转换任务。

    环境准备

    • 确保MySQL和Oracle数据库运行**,并创建相应的表。
    • 创建Spring Boot项目,并添加Flink、MySQL JDBC、和Oracle JDBC驱动的依赖。

    第一步:创建源和目标数据库表

    假设我们有以下三个表:

    • source_mysql_table(MySQL中的源表)
    • source_oracle_table(Oracle中的源表)
    • target_table(目标MySQL表)

    MySQL源表

    CREATE DATABASE source_mysql_db;
    USE source_mysql_db;
    
    CREATE TABLE source_mysql_table (
        id INT AUTO_INCREMENT PRIMARY KEY,
        user_id VARCHAR(255) NOT NULL,
        action VARCHAR(255) NOT NULL,
        timestamp VARCHAR(255) NOT NULL
    );
    

    Oracle源表

    CREATE TABLE source_oracle_table (
        id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY,
        user_id VARCHAR2(255) NOT NULL,
        action VARCHAR2(255) NOT NULL,
        timestamp VARCHAR2(255) NOT NULL,
        PRIMARY KEY (id)
    );
    

    目标MySQL表

    CREATE DATABASE target_db;
    USE target_db;
    
    CREATE TABLE target_table (
        id INT AUTO_INCREMENT PRIMARY KEY,
        user_id VARCHAR(255) NOT NULL,
        action VARCHAR(255) NOT NULL,
        timestamp VARCHAR(255) NOT NULL
    );
    

    第二步:添加项目依赖

    在pom.xml中添加Flink、MySQL和Oracle相关的依赖:

    <dependencies>
        
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starterartifactId>
        dependency>
    
        
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_2.12artifactId>
            <version>1.14.0version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-clients_2.12artifactId>
            <version>1.14.0version>
        dependency>
    
        
        <dependency>
            <groupId>mysqlgroupId>
            <artifactId>mysql-connector-javaartifactId>
            <version>8.0.23version>
        dependency>
    
        
        <dependency>
            <groupId>com.oracle.database.jdbcgroupId>
            <artifactId>ojdbc8artifactId>
            <version>19.8.0.0version>
        dependency>
    dependencies>
    

    第三步:编写Flink ETL任务

    创建一个Flink任务类来实现ETL逻辑。

    创建一个POJO类表示数据结构

    package com.example.flink;
    
    public class UserAction {
        private int id;
        private String userId;
        private String action;
        private String timestamp;
    
        // Getters and setters
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getUserId() {
            return userId;
        }
    
        public void setUserId(String userId) {
            this.userId = userId;
        }
    
        public String getAction() {
            return action;
        }
    
        public void setAction(String action) {
            this.action = action;
        }
    
        public String getTimestamp() {
            return timestamp;
        }
    
        public void setTimestamp(String timestamp) {
            this.timestamp = timestamp;
        }
    }
    

    编写Flink任务类

    package com.example.flink;
    
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.util.Collector;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    
    @Component
    public class FlinkETLJob implements CommandLineRunner {
    
        @Override
        public void run(String... args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 从MySQL读取数据
            DataStream<UserAction> mysqlDataStream = env.addSource(new MySQLSource());
    
            // 从Oracle读取数据
            DataStream<UserAction> oracleDataStream = env.addSource(new OracleSource());
    
            // 合并两个数据流
            DataStream<UserAction> mergedStream = mysqlDataStream.union(oracleDataStream);
    
            // 清洗和转换数据
            DataStream<UserAction> transformedStream = mergedStream.map(new MapFunction<UserAction, UserAction>() {
                @Override
                public UserAction map(UserAction value) throws Exception {
                    // 进行清洗和转换
                    value.setAction(value.getAction().toUpperCase());
                    return value;
                }
            });
    
            // 将数据写入目标MySQL数据库
            transformedStream.addSink(new MySQLSink());
    
            // 执行任务
            env.execute("Flink ETL Job");
        }
    
        public static class MySQLSource implements SourceFunction<UserAction> {
            private static final String JDBC_URL = "jdbc:mysql://localhost:3306/source_mysql_db";
            private static final String JDBC_USER = "source_user";
            private static final String JDBC_PASSWORD = "source_password";
            private volatile boolean isRunning = true;
    
            @Override
            public void run(SourceContext<UserAction> ctx) throws Exception {
                try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
                    while (isRunning) {
                        String sql = "SELECT * FROM source_mysql_table";
                        try (PreparedStatement statement = connection.prepareStatement(sql);
                             ResultSet resultSet = statement.executeQuery()) {
                            while (resultSet.next()) {
                                UserAction userAction = new UserAction();
                                userAction.setId(resultSet.getInt("id"));
                                userAction.setUserId(resultSet.getString("user_id"));
                                userAction.setAction(resultSet.getString("action"));
                                userAction.setTimestamp(resultSet.getString("timestamp"));
                                ctx.collect(userAction);
                            }
                        }
                        Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
                    }
                }
            }
    
            @Override
            public void cancel() {
                isRunning = false;
            }
        }
    
        public static class OracleSource implements SourceFunction<UserAction> {
            private static final String JDBC_URL = "jdbc:oracle:thin:@localhost:1521:orcl";
            private static final String JDBC_USER = "source_user";
            private static final String JDBC_PASSWORD = "source_password";
            private volatile boolean isRunning = true;
    
            @Override
            public void run(SourceContext<UserAction> ctx) throws Exception {
                try (Connection connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD)) {
                    while (isRunning) {
                        String sql = "SELECT * FROM source_oracle_table";
                        try (PreparedStatement statement = connection.prepareStatement(sql);
                             ResultSet resultSet = statement.executeQuery()) {
                            while (resultSet.next()) {
                                UserAction userAction = new UserAction();
                                userAction.setId(resultSet.getInt("id"));
                                userAction.setUserId(resultSet.getString("user_id"));
                                userAction.setAction(resultSet.getString("action"));
                                userAction.setTimestamp(resultSet.getString("timestamp"));
                                ctx.collect(userAction);
                            }
                        }
                        Thread.sleep(5000); // 模拟实时数据流,每5秒查询一次
                    }
                }
            }
    
            @Override
            public void cancel() {
                isRunning = false;
            }
        }
    
        public static class MySQLSink extends RichFlatMapFunction<UserAction, Void> {
            private static final String JDBC_URL = "jdbc:mysql://localhost:3306/target_db";
            private static final String JDBC_USER = "target_user";
            private static final String JDBC_PASSWORD = "target_password";
            private transient Connection connection;
            private transient PreparedStatement statement;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
                String sql = "INSERT INTO target_table (user_id, action, timestamp) VALUES (?, ?, ?)";
                statement = connection.prepareStatement(sql);
            }
    
            @Override
            public void flatMap(UserAction value, Collector<Void> out) throws Exception {
                statement.setString(1, value.getUserId());
                statement.setString(2, value.getAction());
                statement.setString(3, value.getTimestamp());
                statement.executeUpdate();
            }
    
            @Override
            public void close() throws Exception {
                super.close();
                if (statement != null) {
                    statement.close();
                }
                if (connection != null) {
                    connection.close();
                }
            }
        }
    }
    

    第四步:配置Spring Boot

    在application.properties中添加必要的配置:

    # Spring Boot configuration
    server.port=8080
    

    第五步:运行和测试

    • 启动MySQL和Oracle数据库:确保你的源和目标数据库已经运行,并且创建了相应的数据库和表。
    • 启动Spring Boot应用:启动Spring Boot应用程序,会自动运行Flink ETL任务。
    • 测试Flink ETL任务:插入一些数据到源数据库的表中,验证数据是否同步到目标数据库的表中。

    总结

    通过上述步骤,你可以在Spring Boot项目中集成Flink并实现实时数据同步和ETL流程。这个示例展示了如何从MySQL和Oracle源数据库实时抽取数据,进行数据清洗和转换,并将结果加载到目标MySQL数据库中。根据你的具体需求,你可以扩展和修改这个示例,处理更复杂的数据转换和加载逻辑。

  • 相关阅读:
    prometheus监控带安全认证的elasticsearch
    嵌入式开发:编写简单协作调度器的7个步骤
    C++的struct和class的区别
    在C#中如何自定义配置上周和本周起始日来查询业务数据?
    基本图形学概念
    工程坐标转换方法C#代码实现
    从预训练损失的角度,理解语言模型的涌现能力
    主线程调用return和pthread_exit有什么区别?
    LabVIEW如何修复或重置NI MAX数据库文件
    【多线程】线程安全以及synchronized锁的总结
  • 原文地址:https://blog.csdn.net/qq_38411796/article/details/139770741