• 使用Python开源库Couler编写和提交Argo Workflow工作流


    Python 是用户在 Kubernetes 上编写机器学习工作流的流行编程语言。

    开箱即用时,Argo 并没有为 Python 提供一流的支持。相反,我们提供Java、Golang 和 Python API 客户端[1]

    但这对大多数用户来说还不够。许多用户需要一个抽象层来添加组件和特定于用例的特性。

    今天你有两个选择。

    KFP 编译器+ Python 客户端

    Argo 工作流被用作执行 Kubeflow 流水线的引擎。你可以定义一个 Kubeflow 流水线,并在 Python 中将其直接编译到 Argo 工作流中。

    然后你可以使用Argo Python 客户端[2]向 Argo 服务器 API 提交工作流。

    这种方法允许你利用现有的 Kubeflow 组件。

    安装:

    1. pip3 install kfp
    2. pip3 install argo-workflows

    例子:

    1. import kfp as kfp
    2. def flip_coin():
    3. return kfp.dsl.ContainerOp(
    4. name='Flip a coin',
    5. image='python:alpine3.6',
    6. command=['python', '-c', """
    7. import random
    8. res = "heads" if random.randint(0, 1) == 0 else "tails"
    9. with open('/output', 'w') as f:
    10. f.write(res)
    11. """],
    12. file_outputs={'output': '/output'}
    13. )
    14. def heads():
    15. return kfp.dsl.ContainerOp(name='Heads', image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"'])
    16. def tails():
    17. return kfp.dsl.ContainerOp(name='Tails', image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"'])
    18. @kfp.dsl.pipeline(name='Coin-flip', description='Flip a coin')
    19. def coin_flip_pipeline():
    20. flip = flip_coin()
    21. with kfp.dsl.Condition(flip.output == 'heads'):
    22. heads()
    23. with kfp.dsl.Condition(flip.output == 'tails'):
    24. tails()
    25. def main():
    26. kfp.compiler.Compiler().compile(coin_flip_pipeline, __file__ + ".yaml")
    27. if __name__ == '__main__':
    28. main()

    运行这个来创建你的工作流:

    1. apiVersion: argoproj.io/v1alpha1
    2. kind: Workflow
    3. metadata:
    4. generateName: coin-flip-
    5. annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0, pipelines.kubeflow.org/pipeline_compilation_time: '2021-01-21T17:17:54.299235',
    6. pipelines.kubeflow.org/pipeline_spec: '{"description": "Flip a coin", "name":
    7. "Coin-flip"}'}
    8. labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.3.0}
    9. spec:
    10. entrypoint: coin-flip
    11. templates:
    12. - name: coin-flip
    13. dag:
    14. tasks:
    15. - name: condition-1
    16. template: condition-1
    17. when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "heads"'
    18. dependencies: [flip-a-coin]
    19. - name: condition-2
    20. template: condition-2
    21. when: '"{{tasks.flip-a-coin.outputs.parameters.flip-a-coin-output}}" == "tails"'
    22. dependencies: [flip-a-coin]
    23. - {name: flip-a-coin, template: flip-a-coin}
    24. - name: condition-1
    25. dag:
    26. tasks:
    27. - {name: heads, template: heads}
    28. - name: condition-2
    29. dag:
    30. tasks:
    31. - {name: tails, template: tails}
    32. - name: flip-a-coin
    33. container:
    34. command:
    35. - python
    36. - -c
    37. - "\nimport random\nres = \"heads\" if random.randint(0, 1) == 0 else \"tails\"\
    38. \nwith open('/output', 'w') as f:\n f.write(res) \n "
    39. image: python:alpine3.6
    40. outputs:
    41. parameters:
    42. - name: flip-a-coin-output
    43. valueFrom: {path: /output}
    44. artifacts:
    45. - {name: flip-a-coin-output, path: /output}
    46. - name: heads
    47. container:
    48. command: [sh, -c, echo "it was heads"]
    49. image: alpine:3.6
    50. - name: tails
    51. container:
    52. command: [sh, -c, echo "it was tails"]
    53. image: alpine:3.6
    54. arguments:
    55. parameters: []
    56. serviceAccountName: pipeline-runner

    注意,Kubeflow 不支持这种方法。

    你可以使用客户端提交上述工作流程如下:

    1. import yaml
    2. from argo.workflows.client import (ApiClient,
    3. WorkflowServiceApi,
    4. Configuration,
    5. V1alpha1WorkflowCreateRequest)
    6. def main():
    7. config = Configuration(host="http://localhost:2746")
    8. client = ApiClient(configuration=config)
    9. service = WorkflowServiceApi(api_client=client)
    10. with open("coin-flip.py.yaml") as f:
    11. manifest: dict = yaml.safe_load(f)
    12. del manifest['spec']['serviceAccountName']
    13. service.create_workflow('argo', V1alpha1WorkflowCreateRequest(workflow=manifest))
    14. if __name__ == '__main__':
    15. main()

    Couler

    Couler[3]是一个流行的项目,它允许你以一种平台无感的方式指定工作流,但它主要支持 Argo 工作流(计划在未来支持 Kubeflow 和 AirFlow):

    安装:

    pip3 install git+https://github.com/couler-proj/couler
    

    例子:

    1. import couler.argo as couler
    2. from couler.argo_submitter import ArgoSubmitter
    3. def random_code():
    4. import random
    5. res = "heads" if random.randint(0, 1) == 0 else "tails"
    6. print(res)
    7. def flip_coin():
    8. return couler.run_script(image="python:alpine3.6", source=random_code)
    9. def heads():
    10. return couler.run_container(
    11. image="alpine:3.6", command=["sh", "-c", 'echo "it was heads"']
    12. )
    13. def tails():
    14. return couler.run_container(
    15. image="alpine:3.6", command=["sh", "-c", 'echo "it was tails"']
    16. )
    17. result = flip_coin()
    18. couler.when(couler.equal(result, "heads"), lambda: heads())
    19. couler.when(couler.equal(result, "tails"), lambda: tails())
    20. submitter = ArgoSubmitter()
    21. couler.run(submitter=submitter)

    这会创建以下工作流程:

    1. apiVersion: argoproj.io/v1alpha1
    2. kind: Workflow
    3. metadata:
    4. generateName: couler-example-
    5. spec:
    6. templates:
    7. - name: couler-example
    8. steps:
    9. - - name: flip-coin-29
    10. template: flip-coin
    11. - - name: heads-31
    12. template: heads
    13. when: '{{steps.flip-coin-29.outputs.result}} == heads'
    14. - name: tails-32
    15. template: tails
    16. when: '{{steps.flip-coin-29.outputs.result}} == tails'
    17. - name: flip-coin
    18. script:
    19. name: ''
    20. image: 'python:alpine3.6'
    21. command:
    22. - python
    23. source: |
    24. import random
    25. res = "heads" if random.randint(0, 1) == 0 else "tails"
    26. print(res)
    27. - name: heads
    28. container:
    29. image: 'alpine:3.6'
    30. command:
    31. - sh
    32. - '-c'
    33. - echo "it was heads"
    34. - name: tails
    35. container:
    36. image: 'alpine:3.6'
    37. command:
    38. - sh
    39. - '-c'
    40. - echo "it was tails"
    41. entrypoint: couler-example
    42. ttlStrategy:
    43. secondsAfterCompletion: 600
    44. activeDeadlineSeconds: 300


    参考资料

    使用Python编写和提交Argo工作流 - SegmentFault 思否10人将获赠CNCF商店$100美元礼券!你填了吗?问卷链接([链接])作者:Alex CollinsPython 是用户在 Kubernetes 上编写机器学习工作流的流行编程语言。开箱即...https://segmentfault.com/a/1190000039101534[1]

    Java、Golang和Python API客户端: https://github.com/argoproj-labs/argo-client-gen

    [2]

    Argo Python客户端: https://github.com/argoproj-labs/argo-client-python

    [3]

    Couler: https://github.com/couler-proj/couler

  • 相关阅读:
    15贪心:合并区间
    Day17-购物车页面-收获地址-初步封装my-address组件
    如何使用vuex
    C++中变量是按值访问的, Python 中变量的值是按引用访问的示例说明
    Simplicity Studio 生成的工程移植到 Linux 上编译
    爆肝!阿里最新版的Spring Security源码手册,强行霸占GitHub榜首
    NCCL源码解析⑤:路径计算
    solidity开发环境配置,vscode搭配remix
    编译原理--基本块的划分
    (续)SSM整合之springmvc笔记(域对象共享数据)(P136-138)
  • 原文地址:https://blog.csdn.net/a772304419/article/details/126585061