• Airflow用于ETL的四种基本运行模式, 2022-11-20


    (2022.11.20 Sun)
    基本运行模式(pattern)是data pipeline使用Airflow的DAG的不同结构,基本模式有如下四种 :

    • 序列Sequence
    • 平行拆分Parallel split
    • 同步Synchronisation
    • 单选Exclusive choice

    序列模式

    序列模式即若干task按先后顺序依次执行,在运行代码上 表示为task_1 >> task_2 >> ...

    1. dag = DAG(
    2. dag_id='sequential_pattern',
    3. default_args={
    4. 'start_date': utils.dates.days_ago(1),
    5. },
    6. schedule_interval=None,
    7. )
    8. with dag:
    9. read_input = DummyOperator(task_id='read_input')
    10. aggregate_data = DummyOperator(task_id='generate_data')
    11. write_to_redshift = DummyOperator(task_id='write_to_redshift')
    12. read_input >> aggregate_data >> write_to_redshift

    Parallel split

    parallel split

    parallel split模式用于在分支的情况。比如当数据集备好之后,需要被加载进入多个不同的tasks,且都是同一个pipeline中,如同数据进入不同的分支。

    分支在DAG中的表示为task_1 >> [task_2, task_3]
    案例如

    1. dag = DAG(
    2. dag_id='pattern_parallel_split',
    3. default_args={
    4. 'start_date': utils.dates.days_ago(1),
    5. },
    6. schedule_interval=None,
    7. )
    8. with dag:
    9. read_input = DummyOperator(task_id='read_input')
    10. aggregate_data = DummyOperator(task_id='generate_data')
    11. convert_to_parquet = DummyOperator(task_id='convert_to_parquet')
    12. convert_to_avro = DummyOperator(task_id='convert_to_avro')
    13. read_input >> aggregate_data >> [convert_to_parquet, convert_to_avro]

    Sychronisation

    与parallel split相似,在同步模式中,不同branch的结果汇聚(reconciliation)在一个task中,不同的branch执行并行计算,并将结果整合。

    synchronization

    DAG的代码表达中,同步模式可拆解为在每个for loop中执行顺序模式,即

    1. for xx in xxx:
    2. task_0 >> task_i >> task_2

    代码实例

    1. dag = DAG(
    2. dag_id='pattern_synchronization',
    3. default_args={
    4. 'start_date': utils.dates.days_ago(1),
    5. },
    6. schedule_interval=None,
    7. )
    8. with dag:
    9. convert_to_parquet = DummyOperator(task_id='convert_to_parquet')
    10. for hour in range(0, 24):
    11. read_input = DummyOperator(task_id='read_input_hour_{}'.format(hour))
    12. aggregate_data = DummyOperator(task_id='generate_data_hour_{}'.format(hour))
    13. read_input >> aggregate_data >> convert_to_parquet

    单选

    根据预先设定的条件,在分支部分选择不同的task执行。

    exclusive choice

    在Apache Airflow中,可通过BranchOpertor对象执行分支单选命令。BranchOperator对象指定的方法,其返回值可用于指定对分支的选择,而task_id用于标识分支的名字。参考如下案例。

    1. dag = DAG(
    2. dag_id='pattern_exclusive_choice',
    3. default_args={
    4. 'start_date': utils.dates.days_ago(1),
    5. },
    6. schedule_interval=None,
    7. )
    8. with dag:
    9. def route_task():
    10. execution_date = context['execution_date']
    11. return 'convert_to_parquet'if execution_date.minute % 2 == 0 else 'convert_to_avro'
    12. read_input = DummyOperator(task_id='read_input')
    13. aggregate_data = DummyOperator(task_id='generate_data')
    14. route_to_format = BranchPythonOperator(task_id='route_to_format', python_callable=route_task)
    15. convert_to_parquet = DummyOperator(task_id='convert_to_parquet')
    16. convert_to_avro = DummyOperator(task_id='convert_to_avro')
    17. read_input >> aggregate_data >> route_to_format >> [convert_to_parquet, convert_to_avro]

    Reference

    1 ETL data patterns with Apache Airflow, waitingforcode, by BARTOSZ KONIECZNY

  • 相关阅读:
    vue 计算属性未重新计算 / computed 未触发 / computed 原理&源码分析
    Git diff 使用 vimdiff 对比差异
    东方博宜OJ——1163 - 相加之和最大,并给出它们的起始位置
    【物理应用】基于Matlab模拟极化雷达回波
    ssm文达学院学生社团招募系统的设计与实现毕业设计源码211633
    深度解析C语言文件操作以及常见问题
    【quartus13.1/Verilog】swjtu西南交大:计组课程设计
    怒刷LeetCode的第9天(Java版)
    C++之Linux syscall实例总结(二百四十六)
    国际版Amazon Lightsail的功能解析
  • 原文地址:https://blog.csdn.net/weixin_45892228/article/details/128066550