🏡 个人主页:IT贫道_大数据OLAP体系技术栈,Apache Doris,Clickhouse 技术-CSDN博客
🚩 私聊博主:加入大数据技术讨论群聊,获取更多大数据资料。
🔔 博主个人B栈地址:豹哥教你大数据的个人空间-豹哥教你大数据个人主页-哔哩哔哩视频
PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。
关于PythonOperator常用参数如下,更多参数可以查看官网:airflow.operators.python — Airflow Documentation
- python_callable(python callable):调用的python函数
-
- op_kwargs(dict):调用python函数对应的 **args 参数,dict格式,使用参照案例。
-
- op_args(list):调用python函数对应的 *args 参数,多个封装到一个tuple中,list格式,使用参照案例。
PythonOperator调度案例
- import random
- from datetime import datetime, timedelta
- from airflow import DAG
- from airflow.operators.python import PythonOperator
-
- # python中 * 关键字参数允许你传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple。
- # python中 ** 关键字参数允许你传入0个或任意个含参数名的参数,这些关键字参数在函数内部自动组装为一个dict。
- def print__hello1(*a,**b):
- print(a)
- print(b)
- print("hello airflow1")
-
- # 返回的值只会打印到日志中
- return{"sss1":"xxx1"}
-
- def print__hello2(random_base):
- print(random_base)
- print("hello airflow2")
-
- # 返回的值只会打印到日志中
- return{"sss2":"xxx2"}
-
- default_args = {
- 'owner':'maliu',
- 'start_date':datetime(2021, 10, 1),
- 'retries': 1, # 失败重试次数
- 'retry_delay': timedelta(minutes=5) # 失败重试间隔
- }
-
- dag = DAG(
- dag_id = 'execute_pythoncode',
- default_args=default_args,
- schedule_interval=timedelta(minutes=1)
- )
-
- first=PythonOperator(
- task_id='first',
- #填写 print__hello1 方法时,不要加上“()”
- python_callable=print__hello1,
- # op_args 对应 print_hello1 方法中的a参数
- op_args=[1,2,3,"hello","world"],
- # op_kwargs 对应 print__hello1 方法中的b参数
- op_kwargs={"id":"1","name":"zs","age":18},
- dag = dag
- )
-
- second=PythonOperator(
- task_id='second',
- #填写 print__hello2 方法时,不要加上“()”
- python_callable=print__hello2,
- # random_base 参数对应 print_hello2 方法中参数“random_base”
- op_kwargs={"random_base":random.randint(0,9)},
- dag=dag
- )
-
- first >> second