• airflow跨Dag依赖的调度方式


    案例代码

    上游DAG代码

    slave_dag.py:

    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
    import pendulum
    
    local_tz = pendulum.timezone("Asia/Shanghai")
    default_args={
            "owner": "airflow",
            "start_date": datetime(2022, 9, 20, 11, 0,tzinfo=local_tz),
            }
    with DAG(
    dag_id="slave_dag",
    default_args=default_args,
    schedule_interval="10,20,30,40,50 12,13,14,15,16,17 * * *",
    #concurrency=1, 
    #max_active_runs=1, 
    tags=['example2'],
    ) as slave_dag:
     # [START howto_operator_external_task_marker]
        '''
        parent_task = ExternalTaskMarker(
         task_id="slave_task2",
         external_dag_id="master_dag",
         external_task_id="master_task1",
         execution_delta=timedelta(minutes=3)
          )
    parent_task
    '''
        slave_task = BashOperator(
        task_id ="slave_task1",
        bash_command ="echo i am slave!",
        )
    
    slave_task
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    下游DAG代码:

    master_dag.py:

    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.utils.dates import days_ago
    from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
    import pendulum
    
    local_tz = pendulum.timezone("Asia/Shanghai")
    
    default_args={
            "owner": "airflow",
            "start_date": datetime(2022, 9, 20, 11, 0,tzinfo=local_tz),
            }
    with DAG(
    dag_id="master_dag",
    default_args=default_args,
    schedule_interval="13,23,33,43,53 12,13,14,15,16,17 * * *",
    #concurrency=1, 
    #max_active_runs=1, 
    tags=['example1'],
     ) as child_dag:
    # [START howto_operator_external_task_sensor]
         child_task1 = ExternalTaskSensor(
         task_id="master_task1",
         external_dag_id="slave_dag",
         external_task_id="slave_task1",
         timeout=120,
         allowed_states=['success'],
         failed_states=['failed', 'skipped'],
         execution_delta=timedelta(minutes=3),
         mode= "reschedule",
         )
    
     # [END howto_operator_external_task_sensor]
         child_task2 = BashOperator(task_id="master_task2",bash_command="echo i am master!",dag=child_dag)
    child_task1 >> child_task2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    说明:
    跨DAG,一定要注意一个start date和schedule_interval,然后是timedelta(minutes=10),execution_delta的时间一定是他们schedule_interval相差的时间,否则会超时

    如:
    上游DAG的schedule_interval
    schedule_interval=“13 12 * * *”,
    下游DAG的schedule_interval
    schedule_interval=“17 12 * * *”,
    相差为4;
    那么execution_delta=timedelta(minutes=4)

    其他说明:
    主要参数
    external_dag_id (str) – 包含需要等待的外部Task的GAD id
    external_task_id (str or None) – 需要等待的外部Task,如果为None则等待整个DAG(默认为None)
    allowed_states (Iterable) – DAG或Task允许的状态, 默认是 [‘success’]
    failed_states (Iterable) –DAG或Task不允许的状态, 默认是 None
    execution_delta (Optional[datetime.timedelta]) – 与之前执行的DAG或Task的时间差。这里的意思是指需要等待的DAG或Task的执行时间在当前DAG或Task的执行时间之前,也就是说在当前DAG或Task执行的时候需要等待的DAG活Task至少已经开始执行。 默认值是当前DAG或Task的执行时间. 如果需要指定前一天的DAG或Task,可以使用datetime.timedelta(days=1). execution_delta和execution_date_fn都可以作为ExternalTaskSensor的参数,但是不能同时使用.
    execution_date_fn (Optional[Callable]) – 这是一个以当前执行时间作为参数的方法,用来返回期望的执行时间.通俗来说就是需要等待的DAG或Task执行时间在当前DAG的执行时间之后,用来往后倒一段时间,在该时间点的DAG或Task执行成功之前,该Task会一直等待,直到改时间点有DAG或Task执行成功或当前Task超时execution_delta和execution_date_fn都可以作为ExternalTaskSensor的参数,但是不能同时使用.
    check_existence (bool) – 用于检查外部DAG id或者Task id是否存在(当为true时),如果不存在,则立刻停止等待
    timeout (int) – 超时时间,单位为秒,如果在超时时间内还未等到外部DAG或Task成功执行,则抛出失败异常,进入重试(如果有重试次数的话)
    mode (str) – 指定Task在等待期间的模式,有三种模式:
    poke (默认): 在等待期间,Task会一直占用Worker Slot
    reschedule: Task只会在检查外部DAG或Task时才会占用一个Worker Slot,在两次检查之间会进入sleep状态(ExternalTaskSensor会每隔一段时间检查外部DAG或Task是否已执行成功,默认检查间隔时1分钟)
    smart sensor: 可进行批量处理的一种模式,内容较为复杂,具体配置可以参看这里Smart Sensors.

  • 相关阅读:
    2024年最新通信安全员考试题库
    (vue)h5 通过高德地图(原生) 获取当前位置定位
    NeurIPS2022 interesting papers
    c/c++里 对 共用体 union 的内存分配
    高校 Web 站点网络安全面临的主要的威胁
    威联通使用Typecho搭建博客
    健身房如何管理与运营?
    Linux红帽(RHCE)认证学习笔记
    代码随想录二刷 Day 37
    大腿神经网络解剖图片,大腿神经网络解剖图谱
  • 原文地址:https://blog.csdn.net/wuchongyong/article/details/126969038