• 散文:dflow 是如何实现slice的


    Slice 是 dflow 的核心功能之一,但仅限于 PythonOP
    如果将一些小的 PythonOP 填到一个 steps 模板里,这些小 OP 就可以共同实现一些复杂的功能,但可惜的是,steps 不支持 slice 这一功能

    为了钻这个牛角尖,我决定看一下 PythonOP 的源码。看了一段时间,没有头绪,于是决定写下自己读代码的过程,梳理思路。所以这篇文章可能会比较乱,大家看一乐就好。

    PythonOP, OP and Steps

    首先复习一下dflow里的一些概念,OP 是最基本的单元,PythonOP是OP的包装,一堆PythonOP线性排列或者循环组合可以组成Steps
    在这里插入图片描述

    在这里插入图片描述
    下面我们来看一个简单的PythonOP

    
    class Duplicate(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'msg': str,
                'num': int,
                'foo': Artifact(Path),
                'idir': Artifact(Path),
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'msg': str,
                'bar': Artifact(Path),
                'odir': Artifact(Path),
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            op_out = OPIO({
                "msg": op_in["msg"] * op_in["num"],
                "bar": Path("output.txt"),
                "odir": Path("todir"),
            })
    
            content = open(op_in['foo'], "r").read()
            open("output.txt", "w").write(content * op_in["num"])
    
            Path(op_out['odir']).mkdir()
            for ii in ['f1', 'f2']:
                (op_out['odir']/ii).write_text(op_in['num']
                                               * (op_in['idir']/ii).read_text())
    
            return op_out
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    这是一个简单的OP模板,他直接继承了OP,实现了类似下面的Python函数:

    def aaa(in_msg: str, in_artifact: Artifact):
    	op_in = {"in_msg": in_msg, "in_artifact": in_artifact}
    	op_out = execute(op_in)
    	out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意到,我们这里尚未涉及PythonOP,但已经默认,这是一个PythonOP了,准确的说,这时候的duplicate还只是一个继承了OP的一个类,还不能直接使用,我们需要套一个壳才能使用:

        step = Step(
            name="step",
            template=PythonOPTemplate(Duplicate, image="python:3.8"),
            parameters={"msg": "Hello", "num": 3},
            artifacts={"foo": artifact0, "idir": artifact1},
        )
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    因此duplicate严格来说,还只是一个半成品,它继承了OP,但又不是OP,需要套一个壳转化一下才能再次变成OP
    我们点进PythonOPTemplate的定义,里面写着:
    在这里插入图片描述


    花开两朵,各表一枝。


    前面小结一下:
    我们平时写PythonOP都是按照固定的格式

    def aaa(in_msg: str, in_artifact: Artifact):
    	op_in = {"in_msg": in_msg, "in_artifact": in_artifact}
    	op_out = execute(op_in)
    	out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3
    • 4
    • 5

    写一个半成品,套上PythonOP后转化成了真正的OP
    那有没有方法绕过八股文格式,直接写“真正的OP”呢?
    注意到,big steps那一节有过类似的应用,只不过当时的重点在于,对小OP串联组成大OP。但如果细心观察可以发现,这个大OP的输入输出和平时的PythonOP不一样。

    因为这是一个原生的OP

    具体来说,他做了如下的变化:

    1. 输入输出全局声明:
        steps = Steps(name="hello-steps")
        steps.inputs.parameters["foo"] = InputParameter()
        steps.outputs.parameters["foo"] = OutputParameter()
    
    • 1
    • 2
    • 3
    1. 整个steps的输入分给各个小OP
        step1 = Step(
            name="step1",
            template=PythonOPTemplate(Duplicate, image="python:3.8"),
            parameters={"foo": steps.inputs.parameters["foo"]},
            key="step1"
        )
        steps.add(step1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    1. 全局的输出源自小OP的输出
        step2 = Step(
            name="step2",
            template=PythonOPTemplate(Duplicate, image="python:3.8"),
            parameters={"foo": step1.outputs.parameters["foo"]},
            key="step2"
        )
        steps.add(step2)
    
        steps.outputs.parameters["foo"].value_from_parameter = \
            step2.outputs.parameters["foo"]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    1. 使用过程中无需PythonOP转换
        big_step = Step(name="big-step", template=steps,
                        parameters={"foo": Hello("hello")})
    
    • 1
    • 2

    我们可以得出如下结论:
    Steps直接集成了OP,虽然看起来是一群小PythonOP的组合,但不是PythonOP
    他的逻辑更直接:拿到输入直接,do something,然后输出

    def execute(in_msg: str, in_artifact: Artifact):
    	# do something
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3

    对比一下之前的PythonOP

    def aaa(in_msg: str, in_artifact: Artifact):
    	op_in = {"in_msg": in_msg, "in_artifact": in_artifact}
    	op_out = execute(op_in)
    	out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3
    • 4
    • 5

    由此可见,PythonOP虽然更复杂,但写起来按照八股文,还是很轻松的,只需要关注execute逻辑,做好变量检查即可。相比之下,steps需要一个个声明输入输出变量类型(很长),使用的时候也是很长一串。
    小结一下:
    PythonOP和steps都是OP,只不过实现的方式不同,最终在容器里,二者都是要转化成普通OP的格式的。
    具体来说,PythonOP把输入输出变量转换成普通OP的输入输出变量,放入执行逻辑里就可以了。而steps则是等到一个个的PythonOP转换成普通的OP,再给套上整个流的输入输出即可,最终也是变成了OP。

    所以,虽然steps是PythonOP的组合,但steps并不是一种PythonOP,而是一种将各OP串起来的大OP

    PythonOP 是怎么转化成 OP 的

    此时,我们大概已经知道答案了。PythonOP把半成品的输入输出解包变成了OP。
    干的事情是这样的,但具体是如何实现的呢?
    我写了简单的程序:

    import time
    from pathlib import Path
    
    from dflow import Step, Workflow, download_artifact, upload_artifact
    from dflow.python import (OP, OPIO, Artifact, OPIOSign, PythonOPTemplate,
                              upload_packages)
    
    
    class Duplicate(OP):
        def __init__(self):
            pass
    
        @classmethod
        def get_input_sign(cls):
            return OPIOSign({
                'msg': str,
                'num': int,
                'foo': Artifact(Path),
                'idir': Artifact(Path),
            })
    
        @classmethod
        def get_output_sign(cls):
            return OPIOSign({
                'msg': str,
                'bar': Artifact(Path),
                'odir': Artifact(Path),
            })
    
        @OP.exec_sign_check
        def execute(
                self,
                op_in: OPIO,
        ) -> OPIO:
            op_out = OPIO({
                "msg": op_in["msg"] * op_in["num"],
                "bar": Path("output.txt"),
                "odir": Path("todir"),
            })
    
            content = open(op_in['foo'], "r").read()
            open("output.txt", "w").write(content * op_in["num"])
    
            Path(op_out['odir']).mkdir()
            for ii in ['f1', 'f2']:
                (op_out['odir']/ii).write_text(op_in['num']
                                               * (op_in['idir']/ii).read_text())
    
            return op_out
    
    a_OP = PythonOPTemplate(op_class=Duplicate, image='python:3.8')
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    下面我用最笨的方法,打断点进去看看:
    进来以后第一块是确认了一下op变量类型,然后把输入输出解包

            op = None
            if isinstance(op_class, OP):
                op = op_class
                op_class = op.__class__
            class_name = op_class.__name__
            input_sign = op_class.get_input_sign()
            output_sign = op_class.get_output_sign()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述
    后面涉及 input_sign 就是直接当 dict 使用了
    解包完,进行了一些附属功能的检查
    在这里插入图片描述
    这里我们暂时用不到
    后面就是upload一些包,对传进来的变量进行redistribution,看起来很长,主要是因为要检查的类型比较多,架构还是很清晰的
    最后通过 render_script 把收进来的变量写成普通的OP脚本

    PythonOP是如何实现slice的

    上一节的核心结论就是,PythonOP 通过繁琐的变量检查对输入输出变量进行了解包,并将其转化为了普通OP
    也就是:

    def aaa(in_msg: str, in_artifact: Artifact):
    	op_in = {"in_msg": in_msg, "in_artifact": in_artifact}
    	op_out = execute(op_in)
    	out_msg, out_artifact = op_out["out_msg"], op_out["out_artifact"]
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3
    • 4
    • 5

    解包,变成了

    def execute(in_msg: str, in_artifact: Artifact):
    	# do something
    	return out_msg, out_artifact
    
    • 1
    • 2
    • 3

    slice 就是在此基础上添加的一些辅助功能。
    dflow的文档里介绍slice时提到,他是通过调用step的with param实现的
    step的with param最基础的用法如下:

    step = Step(
        ...
        parameters={"msg": "{{item}}"},
        with_param=steps.inputs.parameters["msg_list"]
    )
    
    • 1
    • 2
    • 3
    • 4
    • 5

    step运行时,会从 ["msg_list"] 抽出 msg 赋给 "{{item}}" ,成为 steps 这个 OP 中一个切片过了的变量。
    我们需要谨记,PythonOP也是一个普通的OP,因此他实现并行,底层原理也只是调用了这一个可切片的点

    那一个可切片的变量,如何影响一群变量的呢?

    我们以 input_parameter 为例,其余均差不多
    下面是 input_parameter 在 slice 里的部分
    在这里插入图片描述
    回顾一下 slice 的用法:

        hello = Step("hello",
                     PythonOPTemplate(Hello, image="python:3.8",
                                      slices=Slices("{{item}}",
                                                    input_parameter=["filename"],
                                                    output_artifact=["foo"]
                                                    )
                                      ),
                     parameters={"filename": ["f1.txt", "f2.txt"]},
                     with_param=argo_range(2))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    我们对照 slices 部分

                                      slices=Slices("{{item}}",
                                                    input_parameter=["filename"],
                                                    output_artifact=["foo"]
                                                    )
    
    • 1
    • 2
    • 3
    • 4

    第一项 "{{item}}" 赋值给了 slices
    input_parameter=["filename"] ,进一步赋值给了 self.input_parameter
    self.input_parameter 在 template 里就变成了 slices.input_parameter
    注意到,初始化的 slices.input_parameter = ["filename"]
    我们来看下面做了什么:

                if slices.input_parameter:
                    input_parameter_slices = {
                        name: slices.slices for name in slices.input_parameter}
    
    • 1
    • 2
    • 3

    input_parameter_slices 变成了一个 dict, key 是 列表里的每一个元素,value 是 slices.slices 也就是 "{{item}}"
    这一步之后,有了下面的一个变量:

    input_parameter_slices = {"filename": "{{item}}"}
    
    • 1

    下面这个变量被传入了 render_script 函数
    在这里插入图片描述
    再往后就是这句话了:

    for name, sign in input_sign.items():
        slices = self.get_slices(input_parameter_slices, name)
    
    • 1
    • 2

    检查传入的变量,然后以查表的方式得到 slices

        def get_slices(self, slices_dict, name):
            slices = None
            if slices_dict is not None:
                slices = self.render_slices(slices_dict.get(name, None))
            return slices
    
    • 1
    • 2
    • 3
    • 4
    • 5

    get_slices 接收的是一个 dict 和一个 name
    我们注意到,dict 就是 {"filename": "{{item}}"} ,它可以有很多项,这里只涉及了一项
    查表就是

    for name, sign in input_sign.items():
    
    • 1

    要遍历每一个输入的 name 嘛
    如果这个 name 被 slice 了,查表结果就是 "{{item}}" ,如果没有,就是 None
    对于本例,查找到的结果是 "{{item}}"
    下面是如何操作这个 "{{item}}" 呢?

        def render_slices(self, slices=None):
            if slices is None:
                return None
    
            i = slices.find("{{item")
            while i >= 0:
                j = slices.find("}}", i+2)
                var = slices[i:j+2]
                if var not in self.dflow_vars:
                    var_name = "dflow_var_%s" % len(self.dflow_vars)
                    self.inputs.parameters[var_name] = InputParameter(value=var)
                    self.dflow_vars[var] = var_name
                else:
                    var_name = self.dflow_vars[var]
                slices = slices.replace(var, "{{inputs.parameters.%s}}" % var_name)
                i = slices.find("{{item")
            return slices
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    i = 0,后进入 while 循环,j=6
    var = "{{item}}"

    slices = slices.replace(var, "{{inputs.parameters.%s}}" % var_name)
    
    • 1

    随后把 slices 中的 "{{item}}" 替换成了 "{{inputs.parameters.%s}}" % var_name
    while 循环的意思就是支持多处替换
    dflow_vars 是什么呢?

                if var not in self.dflow_vars:
                    var_name = "dflow_var_%s" % len(self.dflow_vars)
                    self.inputs.parameters[var_name] = InputParameter(value=var)
                    self.dflow_vars[var] = var_name
                else:
                    var_name = self.dflow_vars[var]
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    往上看,发现他初始化是一个空的 dict

    var_name = "dflow_var_%s" % len(self.dflow_vars)
    
    • 1

    “var_name = dflow_var_0”

    slices = slices.replace(var, "{{inputs.parameters.%s}}" % var_name)
    
    • 1

    返回值就变成了 "{{inputs.parameters.dflow_var_0}}"

    我们回到 374 行

    slices = self.get_slices(input_parameter_slices, name)
    
    • 1

    查表结果是:

    slices = "{{inputs.parameters.dflow_var_0}}"
    
    • 1

    第二次查表,由于上一次查表后已经给 self.dflow_vars 填充了一个 dict ,所以第二次查表所得值为:

    slices = "{{inputs.parameters.dflow_var_1}}"
    
    • 1

    这个str是怎么用的呢?

                        script += "input['%s'] = handle_input_parameter('%s', "\
                            "r'''{{inputs.parameters.%s}}''', input_sign['%s'], "\
                            "%s, '/tmp')\n" % (name, name, name, name, slices)
    
    
    • 1
    • 2
    • 3
    • 4

    这段话用到了 handle_input_parameter,是这样定义的:

    def handle_input_parameter(name, value, sign, slices=None, data_root="/tmp"):
        if "dflow_list_item" in value:
            dflow_list = []
            for item in jsonpickle.loads(value):
                dflow_list += jsonpickle.loads(item)
            obj = convert_dflow_list(dflow_list)
        elif isinstance(sign, BigParameter):
            with open(data_root + "/inputs/parameters/" + name, "r") as f:
                content = jsonpickle.loads(f.read())
                if sign.type == str:
                    obj = content["value"]
                else:
                    obj = jsonpickle.loads(content["value"])
        else:
            if isinstance(sign, Parameter):
                sign = sign.type
            if sign == str and slices is None:
                obj = value
            else:
                obj = jsonpickle.loads(value)
    
        if slices is not None:
            assert isinstance(
                obj, list), "Only parameters of type list can be sliced, while %s"\
                " is not list" % obj
            if isinstance(slices, list):
                obj = [obj[i] for i in slices]
            else:
                obj = obj[slices]
    
        return obj
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    这段话就是把变量按照切片切好再返回。
    一堆细节,就先这样吧,有空再补

  • 相关阅读:
    docker安装minio
    vue常见的keep-alive问题
    MySQL 教程(三)函数
    java毕业设计——基于java+mysql+socket的即时通讯软件设计与实现(毕业论文+程序源码)——即时通讯软件
    php代码审计之——phpstorm动态调试
    JavaScript中 判断网络状态的几种方法
    Kubernetes IPVS和IPTABLES
    基于Web的个人网页响应式页面设计与实现 HTML+CSS+JavaScript(web前端网页制作课作业)
    3.2 C++高级编程_抽象类界面
    微积分入门书籍(二)
  • 原文地址:https://blog.csdn.net/frank_haha/article/details/126353536