slave_dag.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
import pendulum
local_tz = pendulum.timezone("Asia/Shanghai")
default_args={
"owner": "airflow",
"start_date": datetime(2022, 9, 20, 11, 0,tzinfo=local_tz),
}
with DAG(
dag_id="slave_dag",
default_args=default_args,
schedule_interval="10,20,30,40,50 12,13,14,15,16,17 * * *",
#concurrency=1,
#max_active_runs=1,
tags=['example2'],
) as slave_dag:
# [START howto_operator_external_task_marker]
'''
parent_task = ExternalTaskMarker(
task_id="slave_task2",
external_dag_id="master_dag",
external_task_id="master_task1",
execution_delta=timedelta(minutes=3)
)
parent_task
'''
slave_task = BashOperator(
task_id ="slave_task1",
bash_command ="echo i am slave!",
)
slave_task
master_dag.py:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from airflow.sensors.external_task_sensor import ExternalTaskMarker, ExternalTaskSensor
import pendulum
local_tz = pendulum.timezone("Asia/Shanghai")
default_args={
"owner": "airflow",
"start_date": datetime(2022, 9, 20, 11, 0,tzinfo=local_tz),
}
with DAG(
dag_id="master_dag",
default_args=default_args,
schedule_interval="13,23,33,43,53 12,13,14,15,16,17 * * *",
#concurrency=1,
#max_active_runs=1,
tags=['example1'],
) as child_dag:
# [START howto_operator_external_task_sensor]
child_task1 = ExternalTaskSensor(
task_id="master_task1",
external_dag_id="slave_dag",
external_task_id="slave_task1",
timeout=120,
allowed_states=['success'],
failed_states=['failed', 'skipped'],
execution_delta=timedelta(minutes=3),
mode= "reschedule",
)
# [END howto_operator_external_task_sensor]
child_task2 = BashOperator(task_id="master_task2",bash_command="echo i am master!",dag=child_dag)
child_task1 >> child_task2
说明:
跨DAG,一定要注意一个start date和schedule_interval,然后是timedelta(minutes=10),execution_delta的时间一定是他们schedule_interval相差的时间,否则会超时
如:
上游DAG的schedule_interval
schedule_interval=“13 12 * * *”,
下游DAG的schedule_interval
schedule_interval=“17 12 * * *”,
相差为4;
那么execution_delta=timedelta(minutes=4)
其他说明:
主要参数
external_dag_id (str) – 包含需要等待的外部Task的GAD id
external_task_id (str or None) – 需要等待的外部Task,如果为None则等待整个DAG(默认为None)
allowed_states (Iterable) – DAG或Task允许的状态, 默认是 [‘success’]
failed_states (Iterable) –DAG或Task不允许的状态, 默认是 None
execution_delta (Optional[datetime.timedelta]) – 与之前执行的DAG或Task的时间差。这里的意思是指需要等待的DAG或Task的执行时间在当前DAG或Task的执行时间之前,也就是说在当前DAG或Task执行的时候需要等待的DAG活Task至少已经开始执行。 默认值是当前DAG或Task的执行时间. 如果需要指定前一天的DAG或Task,可以使用datetime.timedelta(days=1). execution_delta和execution_date_fn都可以作为ExternalTaskSensor的参数,但是不能同时使用.
execution_date_fn (Optional[Callable]) – 这是一个以当前执行时间作为参数的方法,用来返回期望的执行时间.通俗来说就是需要等待的DAG或Task执行时间在当前DAG的执行时间之后,用来往后倒一段时间,在该时间点的DAG或Task执行成功之前,该Task会一直等待,直到改时间点有DAG或Task执行成功或当前Task超时execution_delta和execution_date_fn都可以作为ExternalTaskSensor的参数,但是不能同时使用.
check_existence (bool) – 用于检查外部DAG id或者Task id是否存在(当为true时),如果不存在,则立刻停止等待
timeout (int) – 超时时间,单位为秒,如果在超时时间内还未等到外部DAG或Task成功执行,则抛出失败异常,进入重试(如果有重试次数的话)
mode (str) – 指定Task在等待期间的模式,有三种模式:
poke (默认): 在等待期间,Task会一直占用Worker Slot
reschedule: Task只会在检查外部DAG或Task时才会占用一个Worker Slot,在两次检查之间会进入sleep状态(ExternalTaskSensor会每隔一段时间检查外部DAG或Task是否已执行成功,默认检查间隔时1分钟)
smart sensor: 可进行批量处理的一种模式,内容较为复杂,具体配置可以参看这里Smart Sensors.