• Apache Airflow (八) :DAG任务依赖设置


    🏡 个人主页IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客

     🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。

     🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频


    目录

    1. DAG任务依赖设置一

    2. DAG任务依赖设置二

    3. DAG任务依赖设置三

    4. DAG任务依赖设置四

    5. DAG任务依赖设置五


    1. DAG任务依赖设置一

    • DAG调度流程图

    • task执行依赖
    A >> B >>C
    • 完整代码
    1. '''
    2. airflow 任务依赖关系设置一
    3. '''
    4. from airflow import DAG
    5. from airflow.operators.bash import BashOperator
    6. from datetime import datetime, timedelta
    7. default_args = {
    8.     'owner': 'airflow', # 拥有者名称
    9.     'start_date': datetime(2021, 9, 22),  # 第一次开始执行的时间,为 UTC 时间
    10.     'retries': 1,  # 失败重试次数
    11.     'retry_delay': timedelta(minutes=5),  # 失败重试间隔
    12. }
    13. dag = DAG(
    14.     dag_id = 'dag_relation_1', #DAG id ,必须完全由字母、数字、下划线组成
    15.     default_args = default_args, #外部定义的 dic 格式的参数
    16.     schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
    17. )
    18. A = BashOperator(
    19.     task_id='A',
    20.     bash_command='echo "run A task"',
    21.     dag=dag
    22. )
    23. B = BashOperator(
    24.     task_id='B',
    25.     bash_command='echo "run B task"',
    26.     dag=dag
    27. )
    28. C = BashOperator(
    29.     task_id='C',
    30.     bash_command='echo "run C task"',
    31.     dag=dag,
    32.     retries=3
    33. )
    34. A >> B >>C

    2. DAG任务依赖设置二

    • DAG调度流程图

    • task执行依赖​​​​​​​
    [A,B] >>C >>D
    • 完整代码
    1. '''
    2. airflow 任务依赖关系设置二
    3. '''
    4. from airflow import DAG
    5. from airflow.operators.bash import BashOperator
    6. from datetime import datetime, timedelta
    7. default_args = {
    8. 'owner': 'airflow', # 拥有者名称
    9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
    10. 'retries': 1, # 失败重试次数
    11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
    12. }
    13. dag = DAG(
    14. dag_id = 'dag_relation_2', #DAG id ,必须完全由字母、数字、下划线组成
    15. default_args = default_args, #外部定义的 dic 格式的参数
    16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
    17. )
    18. A = BashOperator(
    19. task_id='A',
    20. bash_command='echo "run A task"',
    21. dag=dag
    22. )
    23. B = BashOperator(
    24. task_id='B',
    25. bash_command='echo "run B task"',
    26. dag=dag
    27. )
    28. C = BashOperator(
    29. task_id='C',
    30. bash_command='echo "run C task"',
    31. dag=dag,
    32. retries=3
    33. )
    34. D = BashOperator(
    35. task_id='D',
    36. bash_command='echo "run D task"',
    37. dag=dag
    38. )
    39. [A,B] >>C >>D

    3. DAG任务依赖设置三

    • DAG调度流程图

    • task执行依赖
    [A,B,C] >>D >>[E,F]
    • 完整代码
    1. '''
    2. airflow 任务依赖关系设置三
    3. '''
    4. from airflow import DAG
    5. from airflow.operators.bash import BashOperator
    6. from datetime import datetime, timedelta
    7. default_args = {
    8. 'owner': 'airflow', # 拥有者名称
    9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
    10. 'retries': 1, # 失败重试次数
    11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
    12. }
    13. dag = DAG(
    14. dag_id = 'dag_relation_3', #DAG id ,必须完全由字母、数字、下划线组成
    15. default_args = default_args, #外部定义的 dic 格式的参数
    16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
    17. )
    18. A = BashOperator(
    19. task_id='A',
    20. bash_command='echo "run A task"',
    21. dag=dag
    22. )
    23. B = BashOperator(
    24. task_id='B',
    25. bash_command='echo "run B task"',
    26. dag=dag
    27. )
    28. C = BashOperator(
    29. task_id='C',
    30. bash_command='echo "run C task"',
    31. dag=dag,
    32. retries=3
    33. )
    34. D = BashOperator(
    35. task_id='D',
    36. bash_command='echo "run D task"',
    37. dag=dag
    38. )
    39. E = BashOperator(
    40. task_id='E',
    41. bash_command='echo "run E task"',
    42. dag=dag
    43. )
    44. F = BashOperator(
    45. task_id='F',
    46. bash_command='echo "run F task"',
    47. dag=dag
    48. )
    49. [A,B,C] >>D >>[E,F]

    ​​​​​​​4. DAG任务依赖设置四

    • DAG调度流程图

    • task执行依赖
    1. A >>B>>C>>D
    2. A >>E>>F
    • 完整代码
    1. '''
    2. airflow 任务依赖关系设置四
    3. '''
    4. from airflow import DAG
    5. from airflow.operators.bash import BashOperator
    6. from datetime import datetime, timedelta
    7. default_args = {
    8. 'owner': 'airflow', # 拥有者名称
    9. 'start_date': datetime(2021, 9, 22), # 第一次开始执行的时间,为 UTC 时间
    10. 'retries': 1, # 失败重试次数
    11. 'retry_delay': timedelta(minutes=5), # 失败重试间隔
    12. }
    13. dag = DAG(
    14. dag_id = 'dag_relation_4', #DAG id ,必须完全由字母、数字、下划线组成
    15. default_args = default_args, #外部定义的 dic 格式的参数
    16. schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
    17. )
    18. A = BashOperator(
    19. task_id='A',
    20. bash_command='echo "run A task"',
    21. dag=dag
    22. )
    23. B = BashOperator(
    24. task_id='B',
    25. bash_command='echo "run B task"',
    26. dag=dag
    27. )
    28. C = BashOperator(
    29. task_id='C',
    30. bash_command='echo "run C task"',
    31. dag=dag,
    32. retries=3
    33. )
    34. D = BashOperator(
    35. task_id='D',
    36. bash_command='echo "run D task"',
    37. dag=dag
    38. )
    39. E = BashOperator(
    40. task_id='E',
    41. bash_command='echo "run E task"',
    42. dag=dag
    43. )
    44. F = BashOperator(
    45. task_id='F',
    46. bash_command='echo "run F task"',
    47. dag=dag
    48. )
    49. A >>[B,C,D]
    50. A >>[E,F]

    5. DAG任务依赖设置五

    • DAG调度流程图

    • task执行依赖
    1. A >>B>>E
    2. C >>D>>E
    • 完整代码
    1. '''
    2. airflow 任务依赖关系设置五
    3. '''
    4. from airflow import DAG
    5. from airflow.operators.bash import BashOperator
    6. from datetime import datetime, timedelta
    7. default_args = {
    8.     'owner': 'airflow', # 拥有者名称
    9.     'start_date': datetime(2021, 9, 22),  # 第一次开始执行的时间,为 UTC 时间
    10.     'retries': 1,  # 失败重试次数
    11.     'retry_delay': timedelta(minutes=5),  # 失败重试间隔
    12. }
    13. dag = DAG(
    14.     dag_id = 'dag_relation_5', #DAG id ,必须完全由字母、数字、下划线组成
    15.     default_args = default_args, #外部定义的 dic 格式的参数
    16.     schedule_interval = timedelta(minutes=1) # 定义DAG运行的频率,可以配置天、周、小时、分钟、秒、毫秒
    17. )
    18. A = BashOperator(
    19.     task_id='A',
    20.     bash_command='echo "run A task"',
    21.     dag=dag
    22. )
    23. B = BashOperator(
    24.     task_id='B',
    25.     bash_command='echo "run B task"',
    26.     dag=dag
    27. )
    28. C = BashOperator(
    29.     task_id='C',
    30.     bash_command='echo "run C task"',
    31.     dag=dag,
    32.     retries=3
    33. )
    34. D = BashOperator(
    35.     task_id='D',
    36.     bash_command='echo "run D task"',
    37.     dag=dag
    38. )
    39. E = BashOperator(
    40.     task_id='E',
    41.     bash_command='echo "run E task"',
    42.     dag=dag
    43. )
    44. A >>B>>E
    45. C >>D>>E

  • 相关阅读:
    javascript【格式化时间日期】
    Vue学习
    requests爬虫详解
    基于.NetCore开发博客项目 StarBlog - (10) 图片瀑布流
    手把手教你如何安装 Elasticsearch
    Redis:send of 37 bytes failed with errno=10054
    【排错/运维】修复HDFS丢失、损坏以及副本数的问题
    Vue3数组重新赋值问题
    Linux深度学习问题汇总
    微服务组件Sentinel (Hystrix)详细分析
  • 原文地址:https://blog.csdn.net/qq_32020645/article/details/134452235