Airflow是一个以编程方式编写,安排和监视工作流的平台。
使用Airflow将工作流编写任务的有向无环图(DAG)。Aiflow计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变得轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变得容易。
一、编写Dag任务脚本
1. 启动阿里云服务器集群,并启动hadoop集群。
2. 配置集群节点间ssh免密登录。
[root@airflowairflow]# vim /etc/hosts 172.26.16.78airflow airflow 172.26.16.41hadoop101 hadoop101 172.26.16.39hadoop102 hadoop102 172.26.16.40hadoop103 hadoop103 [root@airflow~]# ssh-keygen -t rsa [root@airflow~]# ssh-copy-id hadoop101 [root@airflow~]# ssh-copy-id hadoop102 [root@airflow~]# ssh-copy-id hadoop103
3. 创建work-py目录用于存放python调度脚本,编写.py脚本
[root@airflow~]# mkdir -p /opt/module/work-py
[root@airflow~]# cd /opt/module/work-py/
[root@airflowwork-py]# vim test.py
#!/usr/bin/python
fromairflow import DAG
fromairflow.operators.bash_operator import BashOperator
fromdatetime import datetime, timedelta
default_args= {
'owner': 'test_owner',
'depends_on_past': True,
'email': ['2473196869@qq.com'],
'start_date':datetime(2020,12,15),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
t1 =BashOperator(
task_id='dwd',
bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
retries=3,
dag=dag)
t2 =BashOperator(
task_id='dws',
bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
retries=3,
dag=dag)
t3 =BashOperator(
task_id='ads',
bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
retries=3,
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
脚本解读:
default_args 设置默认参数
depends_on_past 是否开启任务依赖
schedule_interval 调度频率
retries 重试次数
start_date 开始时间
BashOperator 具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。
task_id 任务唯一标识(必填)
bash_command 具体任务执行命令
set_upstream 设置依赖 如上图所示ads任务依赖dws任务依赖dwd任务
注意:
必须导包
from airflow import DAG
from airflow.operators.bash_operator importBashOperator
4. 配置JDK
注意:ssh的目标机(hadoop002) /etc/bashrc里必须配置java环境变量,配置完毕后source。
(python3)[root@airflow work-py]# vim /etc/bashrc (python3)[root@airflow work-py]# source /etc/bashrc
5. 查看Airflow配置文件,获取dag文件存放目录
(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
6. 按照配置文件中配置的文件路径,创建dag文件存放目录,将.py脚本放入此目录。
(python3)[root@airflow work-py]# mkdir ~/airflow/dags (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/
7. 等待一段时间,刷新任务列表,可以看到列表中,已经出现test任务。
(python3)[root@airflow work-py]# airflow list_dags ------------------------------------------------------------------- DAGS ------------------------------------------------------------------- example_bash_operator example_branch_dop_operator_v3 example_branch_operator example_complex example_external_task_marker_child example_external_task_marker_parent example_http_operator example_kubernetes_executor_config example_nested_branch_dag example_passing_params_via_test_command example_pig_operator example_python_operator example_short_circuit_operator example_skip_dag example_subdag_operator example_subdag_operator.section-1 example_subdag_operator.section-2 example_trigger_controller_dag example_trigger_target_dag example_xcom latest_only latest_only_with_trigger test test_utils tutorial
8. 刷新Airflow的web页面,已经出现test任务。

9. 点击运行test任务。

10. 点击成功任务,查看日志。




11. 查看dag图,甘特图。


12. 查看脚本代码。

Dag任务操作
1. 删除dag任务。

2. 通过执行以下命令,可以重新添加dag任务。
(python3)[root@airflow work-py]# airflow list_tasks test --tree
The'list_tasks' command is deprecated and removed in Airflow 2.0, please use'tasks list' instead
[2020-12-1511:17:08,981] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-12-1511:17:08,982] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
3. 查看当前所有dag任务,可以看到test任务被重新添加了回来。
(python3)[root@airflow work-py]#
(python3)[root@airflow work-py]# airflow list_dags
The 'list_dags'command is deprecated and removed in Airflow 2.0, please use 'dags list', or'dags report' instead
[2020-12-1511:33:57,106] {__init__.py:50} INFO - Using executor SequentialExecutor
[2020-12-1511:33:57,106] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
example_bash_operator
example_branch_dop_operator_v3
example_branch_operator
example_complex
example_external_task_marker_child
example_external_task_marker_parent
example_http_operator
example_kubernetes_executor_config
example_nested_branch_dag
example_passing_params_via_test_command
example_pig_operator
example_python_operator
example_short_circuit_operator
example_skip_dag
example_subdag_operator
example_subdag_operator.section-1
example_subdag_operator.section-2
example_trigger_controller_dag
example_trigger_target_dag
example_xcom
latest_only
latest_only_with_trigger
test
test_utils
tutorial
4. 重新添加的dag任务。

3配置邮件服务器
1. 首先确保所有邮箱已经开启SMTP服务。

2. 修改airflow配置文件,如下:
(python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg smtp_host= smtp.qq.com smtp_starttls= True smtp_ssl= False smtp_user= 2473196869@qq.com #smtp_user = smtp_password= wjmfbxkfvypdebeg #smtp_password = smtp_port= 587 smtp_mail_from= 2473196869@qq.com
3. 重启Airflow。
(python3)[root@airflow airflow]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15
(python3)[root@airflow airflow]# ps -ef |grep airflow
root 745 1 0 09:50 ? 00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dhclient/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0
root 7875 1851 0 12:51 pts/1 00:00:00 grep --color=auto airflow
(python3)[root@airflow airflow]# kill -15 745
(python3)[root@airflow airflow]# airflow webserver -p 8080 -D
(python3)[root@airflow airflow]# airflow scheduler -D
4. 重新编辑test.py脚本文件,并且替换。
[root@airflow~]# cd /opt/module/work-py/
[root@airflowwork-py]# vim test.py
#!/usr/bin/python
fromairflow import DAG
fromairflow.operators.bash_operator import BashOperator
fromairflow.operators.email_operator import EmailOperator
fromdatetime import datetime, timedelta
default_args= {
'owner': 'test_owner',
'depends_on_past': True,
'email': ['2473196869@qq.com'],
'start_date':datetime(2020,12,15),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
t1 =BashOperator(
task_id='dwd',
bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
retries=3,
dag=dag)
t2 =BashOperator(
task_id='dws',
bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
retries=3,
dag=dag)
t3 =BashOperator(
task_id='ads',
bash_command='ssh hadoop103"spark-submit --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
retries=3,
dag=dag)
email=EmailOperator(
task_id="email",
to="2473196869@qq.com",
subject="test-subject",
html_content="test-content
",
cc="chaosong@qq.com",
dag=dag)
t2.set_upstream(t1)
t3.set_upstream(t2)
email.set_upstream(t3)
(python3)[root@airflow work-py]# cp test.py ~/airflow/dags/
5. 查看页面是否生效。


6. 运行测试,查看运行情况和邮件。


