• dflow入门4——recurse&reuse&conditional


    为了梳理学习dflow时遇到的知识点,我决定开这一个系列记录自己的学习过程。当然了,最好是去看 官方教程文档
    本文,我们将阅读教程中recurse、reuse和conditional这三节

    阅读原教程

    recurse

    recurse是dflow开发人员针对递归循环设计的模块,我们先来看recurse
    首先定义了一个模板

        plus1 = ShellOPTemplate(
            name='plus1',
            image="alpine:3.15",
            script="echo 'This is iter {{inputs.parameters.iter}}' && "
            "echo $(({{inputs.parameters.iter}}+1)) > /tmp/result.txt")
        plus1.inputs.parameters = {"iter": InputParameter()}
        plus1.outputs.parameters = {"iter": OutputParameter(
            value_from_path="/tmp/result.txt")}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    该模板是自增1的操作

        hello = Step(name="hello", template=plus1, parameters={
                     "iter": steps.inputs.parameters["iter"]})
    
    • 1
    • 2

    定义了一个step,该step使用了模板,自增1,输入是其他模板的输出
    下面定义一个steps

        steps = Steps(name="iter", inputs=Inputs(
            parameters={"iter": InputParameter(value=0),
                        "limit": InputParameter(value=3)}))
    
    • 1
    • 2
    • 3

    该steps的输入有两个,一个是iter,一个是limit
    最后定义一个新的step

        next = Step(name="next", template=steps,
                    parameters={"iter": hello.outputs.parameters["iter"]},
                    when="%s < %s" % (
                        hello.outputs.parameters["iter"],
                        steps.inputs.parameters["limit"]))
    
    • 1
    • 2
    • 3
    • 4
    • 5

    翻译成普通的python程序:

    def add1(input: int):
        print(input)
        return input + 1
    
    def loops(input_loop: int, limit_loop: int):
        while input_loop < limit_loop:
            input_loop = add1(input_loop)
            loops(input_loop=input_loop, limit_loop=limit_loop)
    
    loops(input_loop=0, limit_loop=3)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    我们对比来看,自增1操作即为add1函数
    loops函数对应next

    我们从上往下看:

    • wf 里仅有一个 steps
    • 这个 steps add 了两步,add 1 和 loops
    • 此外设定了进入函数的条件,也就是while的条件

    从小往上看是build的过程:

    • 首先定义自增1的模板
    • 定义steps,设定好条件
    • 定义next模板,该模板接收自增1的输出,再调用steps
    • 最后steps添加这两个模板,wf 提交任务

    调用方法:

    • 把自增1替换为我们要循环的模板即可,next 接收自增1的关键输出,steps设定好阈值。这就是一个for循环

    reuse

    下面我们看reuse
    这一节首先将上一节的shelltemplate替换成了python template

    import time
    
    from dflow import InputParameter, Inputs, Step, Steps, Workflow
    from dflow.python import OP, OPIO, OPIOSign, PythonOPTemplate
    
    
    class Plus1(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'iter': int
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'iter': int
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            return OPIO({
                'iter': op_in['iter'] + 1
            })
    
    if __name__ == "__main__":
        steps = Steps(name="iter", inputs=Inputs(
            parameters={"iter": InputParameter(value=0),
                        "limit": InputParameter(value=5)}))
        plus1 = Step(name="plus1",
                     template=PythonOPTemplate(Plus1,
                                               image="python:3.8"),
                     parameters={"iter": steps.inputs.parameters["iter"]},
                     key="iter-%s" % steps.inputs.parameters["iter"])
        steps.add(plus1)
        next = Step(name="next", template=steps,
                    parameters={"iter": plus1.outputs.parameters["iter"]},
                    when="%s < %s" % (
                        plus1.outputs.parameters["iter"],
                        steps.inputs.parameters["limit"]))
        steps.add(next)
    
        wf = Workflow("recurse", steps=steps)
        wf.submit()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    区别在于limit替换成了5
    这样的话,自增1步进行了5次
    此外,每一个自增1步分配了一个key
    重点在后面

        while wf.query_status() in ["Pending", "Running"]:
            time.sleep(1)
    
        assert(wf.query_status() == "Succeeded")
    
        step0 = wf.query_step(key="iter-0")[0]
        step1 = wf.query_step(key="iter-1")[0]
        step1.modify_output_parameter("iter", 3)
    
        wf = Workflow("recurse-resubmit", steps=steps)
        wf.submit(reuse_step=[step0, step1])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里取出了前两步,修改了第二步的输出(从2修改到了3)
    重新提交后,原来5次的循环,重新使用之前的两次,由于修改了第二次的输出,所以在这次相当于从原来的第4步开始,运行两步后结束。
    这一feature比较有价值的点在于,我们的输入输出文件都是在minio上存着的,只要我们不删容器(e.g. 重启docker),只要有他们的key,这些东西都是可以查询的,可以重用的。

    conditional

    首先定义了一个条件template

    import random
    import time
    
    from dflow import (OutputArtifact, OutputParameter, Outputs, Step, Steps,
                       Workflow, if_expression)
    from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
                              upload_packages)
    
    if "__file__" in locals():
        upload_packages.append(__file__)
    
    
    class Random(OP):
        @classmethod
        def get_input_sign(cls):
            return OPIOSign()
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                "is_head": bool,
                "msg1": str,
                "msg2": str,
                "foo": Artifact(str),
                "bar": Artifact(str)
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            open("foo.txt", "w").write("head")
            open("bar.txt", "w").write("tail")
            if random.random() < 0.5:
                is_head = True
            else:
                is_head = False
            return OPIO({
                "is_head": is_head,
                "msg1": "head",
                "msg2": "tail",
                "foo": "foo.txt",
                "bar": "bar.txt"
            })
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    然后套上壳

        steps = Steps("conditional-steps", outputs=Outputs(
            parameters={"msg": OutputParameter()},
            artifacts={"res": OutputArtifact()}))
    
        random_step = Step(
            name="random",
            template=PythonOPTemplate(Random, image="python:3.8")
        )
        steps.add(random_step)
    
        steps.outputs.parameters["msg"].value_from_expression = if_expression(
            _if=random_step.outputs.parameters["is_head"],
            _then=random_step.outputs.parameters["msg1"],
            _else=random_step.outputs.parameters["msg2"])
    
        steps.outputs.artifacts["res"].from_expression = if_expression(
            _if=random_step.outputs.parameters["is_head"],
            _then=random_step.outputs.artifacts["foo"],
            _else=random_step.outputs.artifacts["bar"])
    
        wf = Workflow(name="conditional", steps=steps)
    
        wf.submit()
        while wf.query_status() in ["Pending", "Running"]:
            time.sleep(1)
    
        assert(wf.query_status() == "Succeeded")
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    结构简单

  • 相关阅读:
    【t5 pytorch版源码学习】t5-pegasus-pytorch源码学习
    使用 Express 设置 GraphQL
    [iOS]-Block
    Java:搞清楚这三类Java面试问题,offer就稳了
    翻译 | Kubernetes Operator 对数据库的重要性
    R语言基于指定规则、条件删除列表中的元素:使用purrr包的discard函数移除列表中的所有NA元素和NULL元素
    吡喃腈衍生物
    [vs2017]_[初级]_[常用快捷键*持续更新]
    Qt学习03 Qt的诞生和本质
    java 泛型
  • 原文地址:https://blog.csdn.net/frank_haha/article/details/125999854