• 再学DataX


    一、DataX简介

    DataX官网文档:https://github.com/alibaba/DataX/blob/master/introduction.md

    DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。

    在这里插入图片描述

    1.1、DataX 3.0框架设计

    在这里插入图片描述

    DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

    Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
    Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
    Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

    1.2、DataX3.0核心架构

    DataX 3.0 开源版本支持单机多线程模式完成同步作业运行

    在这里插入图片描述

    1、Job

    DataX完成单个数据同步的作业。一个Job对应一个进程。Job模块负责task切分,TaskGroup管理等。

    2、Task

    Task是DataX的最小单元,每个task负责一部分数据同步工作。

    3、TaskGroup

    Job在切分完多个Task后,会调用DataX的scheduler模块,根据配置的并发量,将拆分成的多个Task分配到不同的TaskGroup中,每个TaskGroup负责以一定并发运行分配给他的全部Task,每个TaskGroup默认的并发量是5.

    4、Task的执行流程

    每个Task由TaskGroup启动,每个Task对固定启动Reader—>Channel—>Writer的线程来完成数据同步工作。

    DataX Job运行起来后,由Job监控并等待每个TaskGroup的task执行完成,等所有TaskGroup任务执行完成后,Job成功退出。否则,异常退出。

    5、DataX调度流程

    举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张分表的mysql数据同步到odps里面。

    DataX的调度决策思路是:

    1)DataX Job根据分库分表切分成了100个Task。

    2)由于配置了20个并发,每个TaskGroup默认并发度是5,所以需要4个TaskGroup

    3)由4个TaskGroup平均切分100个Task,每个TaskGroup被分到了25个Task,共启动5个并发。

    1.3、DataX优势

    1、可靠的监控

    2、数据转换功能丰富

    3、精准的流控

    4、同步性能好

    5、容错机制健壮

    6、使用体验好

    二、DataX源码解读

    2.1、入口类:Engine

    入口类为com.alibaba.datax.core.Engine.java main函数

    1、解析args入参:

    Options options = new Options();
    options.addOption("job", true, "Job config.");
    options.addOption("jobid", true, "Job unique id.");
    options.addOption("mode", true, "Job runtime mode.");
    
    BasicParser parser = new BasicParser();
    CommandLine cl = parser.parse(options, args);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    针对命令行参数采用了org.apache.commons的BasicParser解析,针对任务的配置文件则通过其本身的ConfigParser进行解析(可以支持本地和网络文件)。

    2、启动Engine
    参数启动完毕后,调用Engine.start方法启动

    ConfigurationValidate.doValidate(configuration);
    Engine engine = new Engine();
    engine.start(configuration);
    
    • 1
    • 2
    • 3

    然后选择是Job模式还是TaskGroup模式:

    boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
    
    • 1

    实际上基本都是Job模式,后续我们主要以JobContainer为切入点,另一个则为TaskGroupContainer。两者均继承自AbstractContainer基类,并通过调用他们的start方法进行启动。

    2.2、JobContainer

    JobContainer.start方法是入口

    preHander

    Job前置操作,即初始化preHandler插件并执行其preHandler

    1)init
    初始化reader和writer,实际方法中根据读写插件各自执行了对应的初始化方法:

    //必须先Reader ,后Writer
    this.jobReader = this.initJobReader(jobPluginCollector);
    this.jobWriter = this.initJobWriter(jobPluginCollector);
    
    • 1
    • 2
    • 3

    2)prepare
    全局准备工作,比如odpswriter清空目标表。由于读写插件的特殊性质,其方法内部主要也是执行了各类型插件的方法来实现准备工作

    this.prepareJobReader();
    this.prepareJobWriter();
    
    • 1
    • 2

    3)split
    拆分Task,参数adviceNumber为建议的拆分数。除此之外我们还可以通过字节和事务的限速来进行控制,从而决定Channel的数量。

    • job.setting.speed.byte:总BPS限速,如果存在值则单个Channel的BPS不能为空,通过总限速除以单个Channel限速得出Channel的需求数量;
    • core.transport.channel.speed.byte:单个Channel的BPS限速;
    • job.setting.speed.record:总TPS限速,如果存在则单个Channel的TPS不能为空,通过总限速除以单个Channel限速得出Channel的需求数量;
    • core.transport.channel.speed.record:单个Channel的TPS限速;

    4)schedule
    schedule首先完成的工作是把上一步reader和writer split的结果整合到具体taskGroupContainer中, 同时不同的执行模式调用不同的调度策略,将所有任务调度起来

    由于实际任务是由TaskGroupContainer执行,为此我们还需要划分对应TaskGroup需要运行的Task,该参数通过core.container.taskGroup.channel进行配置,默认为5。决定每个Group运行那些Task的则由以下方法进行决定,将直接返回对应任务组的配置参数。

    /**
     * 通过获取配置信息得到每个taskGroup需要运行哪些tasks任务
     */
    List<Configuration> taskGroupConfigs = JobAssignUtil.assignFairly(this.configuration,
    	this.needChannelNumber, channelsPerTaskGroup);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    完成任务分配后我们就需要根据运行模式决定调度器,通过这里的源码可以明显看出其DataX 3.0是经过了阉割,仅保留了单机运行模式。

    executeMode = ExecuteMode.STANDALONE;
    scheduler = initStandaloneScheduler(this.configuration);
    
    • 1
    • 2

    后续我们仅能描述单机模式下关于任务调度的工作原理:
    Step1:调度器初始化的核心方法initStandaloneScheduler,其方法主要是初始化了StandAloneJobContainerCommunicator类用于通信(其中collect由ProcessInnerCollector提供,reporter由ProcessInnerReporter提供),StandAloneScheduler则为实际调度器。

    最终执行:

    scheduler.schedule(taskGroupConfigs);
    
    • 1

    在AbstractScheduler的schedule中通过StandAloneJobContainerCommunicator类调用了其collect方法:

    public Communication collect() {
        return super.getCollector().collectFromTaskGroup();
    }
    
    • 1
    • 2
    • 3

    该类为ProcessInnerCollector类,其对应的方法依然是LocalTGCommunicationManager静态类其中一个静态方法。

    public Communication collectFromTaskGroup() {
        return LocalTGCommunicationManager.getJobCommunication();
    }
    
    • 1
    • 2
    • 3

    其内部也是将之前每个TaskGroup所创建的Communication维护了一个静态字典并在需要的时候进行合并。

    public static Communication getJobCommunication() {
        Communication communication = new Communication();
        communication.setState(State.SUCCEEDED);
    
        for (Communication taskGroupCommunication :
                taskGroupCommunicationMap.values()) {
            communication.mergeFrom(taskGroupCommunication);
        }
    
        return communication;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
  • 相关阅读:
    linux 进程通信 C程序案例
    十进制转二进制的算法代码 ← Python
    [ 动词词组 ] 合集
    Spring boot(2)
    《算法之美》读后感
    C语言考试题库之单选题
    枚举类和集合
    JavaScript-对象、类与面向对象编程(理解对象)
    数组——二分查找
    有关iframe锚点,锚点出现上下偏移,锚点出现页面显示问题.iframe的srcdoc问题
  • 原文地址:https://blog.csdn.net/shijinghan1126/article/details/127906588