• 使用Airflow实现简单的工作流调度-大数据培训


    Airflow是一个以编程方式编写,安排和监视工作流的平台。

    使用Airflow将工作流编写任务的有向无环图(DAG)。Aiflow计划程序在遵循指定的依赖项同时在一组工作线程上执行任务。丰富的命令实用程序使在DAG上执行复杂的调度变得轻而易举。丰富的用户界面使查看生产中正在运行的管道,监视进度以及需要时对问题进行故障排除变得容易。

    一、编写Dag任务脚本

    1. 启动阿里云服务器集群,并启动hadoop集群。

    2. 配置集群节点间ssh免密登录。

    [root@airflowairflow]# vim /etc/hosts
    172.26.16.78airflow  airflow
    172.26.16.41hadoop101 hadoop101
    172.26.16.39hadoop102 hadoop102
    172.26.16.40hadoop103 hadoop103
     
    [root@airflow~]# ssh-keygen -t rsa
    [root@airflow~]# ssh-copy-id hadoop101
    [root@airflow~]# ssh-copy-id hadoop102
    [root@airflow~]# ssh-copy-id hadoop103

    3. 创建work-py目录用于存放python调度脚本,编写.py脚本

    [root@airflow~]# mkdir -p /opt/module/work-py
    [root@airflow~]# cd /opt/module/work-py/
    [root@airflowwork-py]# vim test.py
     
    #!/usr/bin/python
    fromairflow import DAG
    fromairflow.operators.bash_operator import BashOperator
    fromdatetime import datetime, timedelta
     
    default_args= {
        'owner': 'test_owner',
        'depends_on_past': True,
        'email': ['2473196869@qq.com'],
        'start_date':datetime(2020,12,15),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
     
    t1 =BashOperator(
        task_id='dwd',
        bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
        retries=3,
        dag=dag)
     
    t2 =BashOperator(
        task_id='dws',
        bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
        retries=3,
        dag=dag)
     
    t3 =BashOperator(
        task_id='ads',
        bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
        retries=3,
        dag=dag)
     
    t2.set_upstream(t1)
    t3.set_upstream(t2)

    脚本解读:

    default_args 设置默认参数

    depends_on_past 是否开启任务依赖

    schedule_interval 调度频率

    retries 重试次数

    start_date 开始时间

    BashOperator 具体执行任务,如果为true前置任务必须成功完成才会走下一个依赖任务,如果为false则忽略是否成功完成。

    task_id 任务唯一标识(必填)

    bash_command 具体任务执行命令

    set_upstream 设置依赖 如上图所示ads任务依赖dws任务依赖dwd任务

    注意:

    必须导包

    from airflow import DAG

    from airflow.operators.bash_operator importBashOperator

    4. 配置JDK

    注意:ssh的目标机(hadoop002) /etc/bashrc里必须配置java环境变量,配置完毕后source。

    (python3)[root@airflow work-py]# vim /etc/bashrc
    (python3)[root@airflow work-py]# source /etc/bashrc
    

    5. 查看Airflow配置文件,获取dag文件存放目录

    (python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
    

    6. 按照配置文件中配置的文件路径,创建dag文件存放目录,将.py脚本放入此目录。

    (python3)[root@airflow work-py]# mkdir ~/airflow/dags
    (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/

    7. 等待一段时间,刷新任务列表,可以看到列表中,已经出现test任务。

    (python3)[root@airflow work-py]# airflow list_dags
    -------------------------------------------------------------------
    DAGS
    -------------------------------------------------------------------
    example_bash_operator
    example_branch_dop_operator_v3
    example_branch_operator
    example_complex
    example_external_task_marker_child
    example_external_task_marker_parent
    example_http_operator
    example_kubernetes_executor_config
    example_nested_branch_dag
    example_passing_params_via_test_command
    example_pig_operator
    example_python_operator
    example_short_circuit_operator
    example_skip_dag
    example_subdag_operator
    example_subdag_operator.section-1
    example_subdag_operator.section-2
    example_trigger_controller_dag
    example_trigger_target_dag
    example_xcom
    latest_only
    latest_only_with_trigger
    test
    test_utils
    tutorial

    8. 刷新Airflow的web页面,已经出现test任务。

    9. 点击运行test任务。

    10. 点击成功任务,查看日志。

    11. 查看dag图,甘特图。

    12. 查看脚本代码。

    Dag任务操作

    1. 删除dag任务。

    2. 通过执行以下命令,可以重新添加dag任务。

    (python3)[root@airflow work-py]# airflow list_tasks  test --tree
    The'list_tasks' command is deprecated and removed in Airflow 2.0, please use'tasks list' instead
    [2020-12-1511:17:08,981] {__init__.py:50} INFO - Using executor SequentialExecutor
    [2020-12-1511:17:08,982] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
    
        
            

    3. 查看当前所有dag任务,可以看到test任务被重新添加了回来。

    (python3)[root@airflow work-py]#
    (python3)[root@airflow work-py]# airflow list_dags
    The 'list_dags'command is deprecated and removed in Airflow 2.0, please use 'dags list', or'dags report' instead
    [2020-12-1511:33:57,106] {__init__.py:50} INFO - Using executor SequentialExecutor
    [2020-12-1511:33:57,106] {dagbag.py:417} INFO - Filling up the DagBag from/root/airflow/dags
     
    -------------------------------------------------------------------
    DAGS
    -------------------------------------------------------------------
    example_bash_operator
    example_branch_dop_operator_v3
    example_branch_operator
    example_complex
    example_external_task_marker_child
    example_external_task_marker_parent
    example_http_operator
    example_kubernetes_executor_config
    example_nested_branch_dag
    example_passing_params_via_test_command
    example_pig_operator
    example_python_operator
    example_short_circuit_operator
    example_skip_dag
    example_subdag_operator
    example_subdag_operator.section-1
    example_subdag_operator.section-2
    example_trigger_controller_dag
    example_trigger_target_dag
    example_xcom
    latest_only
    latest_only_with_trigger
    test
    test_utils
    tutorial

    4. 重新添加的dag任务。

    3配置邮件服务器

    1. 首先确保所有邮箱已经开启SMTP服务。

    2. 修改airflow配置文件,如下:

    (python3)[root@airflow work-py]# vim ~/airflow/airflow.cfg
    smtp_host= smtp.qq.com
    smtp_starttls= True
    smtp_ssl= False
    smtp_user= 2473196869@qq.com
    #smtp_user =
    smtp_password= wjmfbxkfvypdebeg
    #smtp_password =
    smtp_port= 587
    smtp_mail_from= 2473196869@qq.com

    3. 重启Airflow。

    (python3)[root@airflow airflow]# ps -ef|egrep 'scheduler|airflow-webserver'|grep -vgrep|awk '{print $2}'|xargs kill -15
    (python3)[root@airflow airflow]# ps -ef |grep airflow
    root       745    1  0 09:50 ?        00:00:00 /sbin/dhclient -1 -q -lf/var/lib/dhclient/dhclient--eth0.lease -pf /var/run/dhclient-eth0.pid -Hairflow eth0
    root      7875 1851  0 12:51 pts/1    00:00:00 grep --color=auto airflow
    (python3)[root@airflow airflow]# kill -15 745
     
     
    (python3)[root@airflow airflow]# airflow webserver -p 8080 -D
    (python3)[root@airflow airflow]# airflow scheduler -D

    4. 重新编辑test.py脚本文件,并且替换。

    [root@airflow~]# cd /opt/module/work-py/
    [root@airflowwork-py]# vim test.py
     
    #!/usr/bin/python
    fromairflow import DAG
    fromairflow.operators.bash_operator import BashOperator
    fromairflow.operators.email_operator import EmailOperator
    fromdatetime import datetime, timedelta
     
    default_args= {
        'owner': 'test_owner',
        'depends_on_past': True,
        'email': ['2473196869@qq.com'],
        'start_date':datetime(2020,12,15),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # 'queue': 'bash_queue',
        # 'pool': 'backfill',
        # 'priority_weight': 10,
        # 'end_date': datetime(2016, 1, 1),
    }
    dag =DAG('test', default_args=default_args, schedule_interval=timedelta(days=1))
     
    t1 =BashOperator(
        task_id='dwd',
        bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwdMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
        retries=3,
        dag=dag)
     
    t2 =BashOperator(
        task_id='dws',
        bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.DwsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
        retries=3,
        dag=dag)
     
    t3 =BashOperator(
        task_id='ads',
        bash_command='ssh hadoop103"spark-submit  --master yarn--deploy-mode client --driver-memory 1g --num-executors 2 --executor-cores 2--executor-memory 2g --class com.atguigu.member.controller.AdsMemberController--queue spark /root/work/com_atguigu_warehouse-1.0-SNAPSHOT-jar-with-dependencies.jar"',
        retries=3,
    dag=dag)
     
    email=EmailOperator(
       task_id="email",
       to="2473196869@qq.com",
        subject="test-subject",
        html_content="

    test-content

    ", cc="chaosong@qq.com", dag=dag) t2.set_upstream(t1) t3.set_upstream(t2) email.set_upstream(t3) (python3)[root@airflow work-py]# cp test.py ~/airflow/dags/

    5. 查看页面是否生效。

    6. 运行测试,查看运行情况和邮件。

  • 相关阅读:
    面试题 | 记录面试时碰到的一道多线程题目
    mysql变量与游标
    Linux之软件包管理
    Java线程的四种创建方式
    大数据采集技术有哪些
    国际结算重点知识整理
    会话固定攻击
    Bear and Prime 100(交互题)
    工业品电商进入中场,四大阵营谁将异军突起
    LeetCode刷题系列之-多数之和类型
  • 原文地址:https://blog.csdn.net/zjjcchina/article/details/126119361