• dflow入门2——Slices


    为了梳理学习dflow时遇到的知识点,我决定开这一个系列记录自己的学习过程。当然了,最好是去看 官方教程文档
    本文,我们将阅读教程中slices这一节,并在最后写一个应用。

    阅读原教程

    两个OP

    我们首先看源码中的第一个OP

    from typing import List
    
    from dflow import Step, Workflow, argo_range
    from dflow.python import OP, OPIO, Artifact, OPIOSign, PythonOPTemplate, Slices
    
    
    class Hello(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'filename': str
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'out_art': Artifact(str)
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            file_num = int(op_in["filename"].split('.')[0][1:])
            open(op_in["filename"], "w").write("Hello" * file_num)
            op_out = OPIO({
                'out_art': op_in["filename"]
            })
            return op_out
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    结合上文,我们知道,所谓OP就是一个操作手册,撰写OP需要遵循特定的模板,而该模板可以简化为下面这个普通的python函数。

    def aaa(in_msg: str, in_artifact: Artifact):
    	op_in = {"in_msg": in_msg, "in_artifact": in_artifact}
    	op_out = execute(op_in)
    	out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我们对照这两个代码块,首先读前两个类方法:hello OP的输入是filename,输出是Artifact类型,初始化该类型需要一个str
    下面我们看execute,首先是接收了文件名,然后从文件名中获取file_num,然后新建一个文件,往里写Hello,并重复file_num次,最后把该文件名对应的文件作为输出
    结合下文,filename的形式是这样的:

    "filename": [f"f{x}.txt" for x in range(10)]
    
    • 1

    单个的filename是这样的:

    f"f{x}.txt" for x in range(10)
    
    • 1

    所以x是一个数字

    f"f{x}.txt"
    
    • 1

    execute第一句话就是对该文件名进行处理

    op_in["filename"].split('.')[0][1:]
    
    • 1

    下面我们看第二个OP

    class Check(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'filename': Artifact(List[str])
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign()
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            print(op_in["filename"])
            return OPIO()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    再重复一下
    所谓OP就是一个操作手册,撰写OP需要遵循特定的模板,而该模板可以简化为下面这个普通的python函数。

    def aaa(in_msg: str, in_artifact: Artifact):
    	op_in = {"in_msg": in_msg, "in_artifact": in_artifact}
    	op_out = execute(op_in)
    	out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3
    • 4
    • 5

    我们对照这两个代码块看这个OP
    该OP输入是Artifact类型,初始化该类型,需要往Artifact里塞一个list,list里面是文件路径
    也就是说,输入是大包裹,里面有一群小包裹
    输出是一个空的,啥都没有
    execute就是把输入的包裹打印出来

    OP的使用:slice功能

    该功能是template附属的功能,如果说,template是工人,OP是操作手册,那么slice就相当于:工人重复某个操作手册n次(并行)。
    下面是使用方法:

    hello = Step("hello",
                PythonOPTemplate(Hello, image="python:3.8",
                                slices=Slices("{{item}}",
                                            input_parameter=["filename"],
                                            output_artifact=["out_art"]
                                            )
                                ),
                parameters={"filename": [f"f{x}.txt" for x in range(10)]},
                with_param=argo_range(10))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里的item应该是一个全局变量,应该是slice中的某个item,是一个int
    在这里插入图片描述
    整个step的输入:

    parameters={"filename": [f"f{x}.txt" for x in range(10)]}
    
    • 1

    slice中每步的输入输出是:

    input_parameter=["filename"],
    output_artifact=["out_art"]
    
    • 1
    • 2

    当然了,还可以是其他输入输出,下面是slice的源码:
    在这里插入图片描述
    之所以选择上面两个,是因为我们的hello OP只需要这两个
    下面我们对用check step 检查hello OP的结果

    check = Step("check",
                PythonOPTemplate(Check, image="python:3.8"),
                artifacts={"filename": hello.outputs.artifacts["out_art"]},
                )
    
    • 1
    • 2
    • 3
    • 4

    可以看到,该OP接收的是单个的artifacts,结合check OP的定义,我们看到,这是一个大包裹,里面是一堆小包裹,而hello OP的step输出就是一堆小包裹。

    工作流

    wf = Workflow("slices")
    wf.add(hello)
    wf.add(check)
    wf.submit()
    
    • 1
    • 2
    • 3
    • 4

    我们定义好了两个OP,然后定义了使用两个OP的step,最后将他们组装即可。

    浅试Slice功能

    问题阐述

    我们有6个xyz文件在in_dir目录里
    在这里插入图片描述
    每个文件第二行有能量信息
    在这里插入图片描述
    我们需要做的事情是,打开文件,读取能量数据。将能量数据和文件名称、文件路径对应起来,存到同一个dataframe里面。

    传统的脚本

    import os
    import pandas as pd
    
    
    root = r'H:\dflow\dflow_gym\dummy_slice\in_dir'
    dump = r'H:\dflow\dflow_gym\dummy_slice'
    e_list = []
    name_list = []
    path_list = []
    for a_file in os.listdir(root):
        a_path = os.path.join(root, a_file)
        with open(a_path, 'r') as f:
            f.readline()
            a_e = float((f.readline()).split()[1])
        e_list.append(a_e)
        name_list.append(a_file)
        path_list.append(a_path)
    info = {'e': e_list, 'name': name_list, 'xyz_path': path_list}
    info_df = pd.DataFrame(info)
    os.chdir(dump)
    info_df.to_pickle('info.pickle')
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    我们把这段脚本打包成一个函数:
    (涉及路径的时候加上path,更规范,不容易出错)

    import os
    import pandas as pd
    from pathlib import Path
    
    def get_e(src_root: str, dump_root: str):
        e_list = []
        name_list = []
        path_list = []
        for a_file in os.listdir(Path(src_root)):
            a_path = os.path.join(Path(src_root), a_file)
            with open(a_path, 'r') as f:
                f.readline()
                a_e = float((f.readline()).split()[1])
            e_list.append(a_e)
            name_list.append(a_file)
            path_list.append(a_path)
        info = {'e': e_list, 'name': name_list, 'xyz_path': path_list}
        info_df = pd.DataFrame(info)
        os.chdir(Path(dump_root))
        info_df.to_pickle('info.pickle')
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    dflow v1–non slice

    下面我们把这段函数改成一个template,外包step和workflow
    我们对照一下template的模板:

    def aaa(in_msg: str, in_artifact: Artifact):
    	op_in = {"in_msg": in_msg, "in_artifact": in_artifact}
    	op_out = execute(op_in)
    	out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3
    • 4
    • 5

    运行该函数不需要指令,需要包裹,即in_artifact
    该函数没有输出指令,但是输出了一个包裹info.pickle,我们需要把info.pickle下载下来。

    流程大概是,我们上传in_dir,在云端处理信息以后,把info.pickle下载到指定文件夹。
    我们“借鉴”一下源码中example里面的文件上传和下载,相对路径即可。
    下面是dflow第一版:

    import os
    from pathlib import Path
    from dflow import Step, Workflow, download_artifact, upload_artifact
    from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
                              upload_packages)
    import pandas as pd
    
    if "__file__" in locals():
        upload_packages.append(__file__)
    
    
    class step1(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'in_dir': Artifact(Path),
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'info': Artifact(Path),
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            e_list = []
            name_list = []
            path_list = []
            cwd_ = os.getcwd()
            os.chdir(op_in['in_dir'])
            for a_file in os.listdir('./'):
                with open(a_file, 'r') as f:
                    f.readline()
                    a_e = float((f.readline()).split()[1])
                e_list.append(a_e)
                name_list.append(a_file)
                path_list.append(os.path.abspath(a_file))
            info = {'e': e_list, 'name': name_list, 'xyz_path': path_list}
            info_df = pd.DataFrame(info)
            os.chdir(cwd_)
            info_df.to_pickle(r'info.pickle')
            op_out = OPIO({
                "info": Path(r'info.pickle'),
            })
            return op_out
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    对比二者不难发现,其实就是把程序输入换成了op_in['in_dir'],输出给套了个壳。
    下面我们给这个OP套上template,step,workflow的壳。

    
    def test_dflow_v1():
        wf = Workflow(name='test1')
        art_in = upload_artifact(Path('in_dir'))
        print(art_in)
        step = Step(name='step1',
                    template=PythonOPTemplate(Parse, image="python_diy:3.8"),
                    artifacts={'in_dir': art_in})
        wf.add(step)
        wf.submit()
        while wf.query_status() in ["Pending", "Running"]:
            time.sleep(1)
    
        assert(wf.query_status() == "Succeeded")
        step = wf.query_step(name="step1")[0]
        assert(step.phase == "Succeeded")
        download_artifact(artifact=step.outputs.artifacts["info"])
    
    
    if __name__ == '__main__':
        test_dflow_v1()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    程序运行的时候会遇到如下几个问题:

    1. workflow和step的命名,不能有下划线,可以有中短线,而且不能大写开头
    2. 程序运行需要pandas库,直接upload会出现版本兼容问题,最好在镜像里提前打好,方法参见这篇
      上述这种实现是顺序读取列表
      在这里插入图片描述
      我们只调用了一次step1就完成了3个文件的处理。
      下面我们用slice功能

    dflow v2–slice_para

    使用slice功能,我们需要把OP功能缩减成只读取单个文件的功能,然后重复调用。
    因此我们需要调整OP的输入为:单个文件
    输出为:e, name, path

    import os
    from pathlib import Path
    import pandas as pd
    from dflow import Step, Workflow, download_artifact, upload_artifact
    from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
                              upload_packages)
    import time
    
    
    if "__file__" in locals():
        upload_packages.append(__file__)
    
    
    class getInfo(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'in_f': Artifact(Path),
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'e': float,
                'name': str,
                'xyz_path': Path,
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            with open(op_in['in_f'], 'r') as f:
                f.readline()
                a_e = float((f.readline()).split()[1])
            op_out = OPIO({
                "e": a_e,
                'name': os.path.basename(op_in['in_f']),
                'xyz_path':os.path.abspath(op_in['in_f'])
            })
            return op_out
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    另一方面,我们需要汇总所有的信息到 df 里面,所以增加一个OP

    class getDF(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'in_es': List[float],
                'in_names': List[str],
                'in_paths': List[Path]
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'out_df': Artifact(Path)
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            info = pd.DataFrame({'e': op_in['in_es'], 'names': op_in['in_names'], 'xyz_paths': op_in['in_paths']})
            info.to_pickle('info_v2.pickle')
            op_out = OPIO({
                "out_df": Path('info_v2.pickle'),
            })
            return op_out
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    这两个OP功能加起来是原OP的功能
    下面我们把壳套上,照着模板写:

    def test_dflow_v2():
        wf = Workflow(name='test1')
        art_in = upload_artifact(Path('in_dir'))
        print(art_in)
        step1 = Step(name='step1',
                    template=PythonOPTemplate(getInfo, image="python_diy:3.8", slices=Slices(
                        "{{item}}",
                        input_parameter=['in_f'],
                        output_parameter=['e', 'name', 'xyz_path']
                    )),
        parameters = {'in_f': os.listdir(Path('in_dir'))},
                     artifacts={'in_dir': art_in},
                    with_param = argo_range(len(os.listdir(Path('in_dir')))),
                    key='get-e-{{item}}'
        )
        step2 = Step(name='step2',
                     template=PythonOPTemplate(getDF, image='python_diy:3.8'),
                     parameters={'in_es': step1.outputs.parameters['e'],
                                 'in_names': step1.outputs.parameters['name'],
                                 'in_paths': step1.outputs.parameters['xyz_path']})
        wf.add(step1)
        wf.add(step2)
        wf.submit()
        while wf.query_status() in ["Pending", "Running"]:
            time.sleep(1)
    
        assert(wf.query_status() == "Succeeded")
        step = wf.query_step(name="step2")[0]
        assert(step.phase == "Succeeded")
        download_artifact(artifact=step.outputs.artifacts['out_df'])
    
    
    if __name__ == '__main__':
        test_dflow_v2()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    运行以后即为下图所示:
    在这里插入图片描述

    dflow v3–slice_artifact

    注意到,上文在slice的时候是对parameter进行的slice,每个子节点都传了所有的文件。
    但是我们理想状态是对文件进行slice,每个子任务处理一个文件,因此只需给OP传一个文件即可。
    这个似乎没有很好的方法,只能在上传包裹的时候就完成切片,也就是上传一堆小包裹

    import os
    from pathlib import Path
    import pandas as pd
    from dflow import Step, Workflow, download_artifact, upload_artifact, argo_range
    from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate, Slices,
                              upload_packages)
    import time
    from typing import List
    
    
    if "__file__" in locals():
        upload_packages.append(__file__)
    
    
    class getInfo(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'in_f': Artifact(Path)
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'e': float,
                'name': str,
                'xyz_path': str,
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            with open(op_in['in_f'], 'r') as f:
                f.readline()
                a_e = float((f.readline()).split()[1])
            op_out = OPIO({
                "e": a_e,
                'name': os.path.basename(op_in['in_f']),
                'xyz_path': os.path.abspath(op_in['in_f'])
            })
            return op_out
    
    
    class getDF(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'in_es': List[float],
                'in_names': List[str],
                'in_paths': List[str]
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'out_df': Artifact(Path)
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            info = pd.DataFrame({'e': op_in['in_es'], 'names': op_in['in_names'], 'xyz_paths': op_in['in_paths']})
            info.to_pickle('info_v2.pickle')
            op_out = OPIO({
                "out_df": Path('info_v2.pickle'),
            })
            return op_out
    
    
    def test_dflow_v3():
        wf = Workflow(name='test1')
        upload_list = []
        for a_file in os.listdir('in_dir'):
            upload_list.append(Path(Path('in_dir')/a_file))
        art_in = upload_artifact(upload_list)
        print(art_in)
    
        step1 = Step(name='step1',
                     template=PythonOPTemplate(getInfo, image="python_diy:3.8", slices=Slices(
                         "{{item}}",
                         input_artifact=['in_f'],
                         output_parameter=['e', 'name', 'xyz_path']
                     )),
                     artifacts={'in_f': art_in},
                     with_param = argo_range(len(os.listdir(Path('in_dir')))),
                     key='get-e-{{item}}'
                     )
        step2 = Step(name='step2',
                     template=PythonOPTemplate(getDF, image='python_diy:3.8'),
                     parameters={'in_es': step1.outputs.parameters['e'],
                                 'in_names': step1.outputs.parameters['name'],
                                 'in_paths': step1.outputs.parameters['xyz_path']})
        wf.add(step1)
        wf.add(step2)
        wf.submit()
        while wf.query_status() in ["Pending", "Running"]:
            time.sleep(1)
    
        assert(wf.query_status() == "Succeeded")
        step = wf.query_step(name="step2")[0]
        assert(step.phase == "Succeeded")
        download_artifact(artifact=step.outputs.artifacts['out_df'])
    
    
    if __name__ == '__main__':
        test_dflow_v3()
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118

    完整代码如上,需要注意的是:
    os.path.abspath() 是返回一个str,而云端(linux)上的path和local(win)下面的path不一样
    所以在拼info的时候要用str,而不是path

  • 相关阅读:
    从单机定时到多层分发
    CDR插件开发之Addon插件005 - Corel.Interop.VGCore.dll库文件简介
    java-net-php-python-ssm房车买卖租赁专用网站计算机毕业设计程序
    蓝桥杯实战应用【算法代码篇】-多分支递归:裴波那契序列(附Java、C++和R语言代码)
    creo草绘图形技巧-透视图
    SAP-PP:基础概念笔记-5(物料主数据的MRP1~4视图)
    新生宝宝为何天生过敏体质 婴儿过敏体质的症状
    RocketMQ系列(一) 基本介绍
    【QT】capture.obj:-1: error: LNK2019: 无法解析的外部符号 __imp_htons(解决方法)
    Spring Security——基于前后端分离项目的使用(安全框架)
  • 原文地址:https://blog.csdn.net/frank_haha/article/details/125980146