• Server - 使用 文件锁 解决 Kubernetes 多机批量运行程序 (Python)


    欢迎关注我的CSDN:https://spike.blog.csdn.net/
    本文地址:https://blog.csdn.net/caroline_wendy/article/details/136711101

    在 Python 中,使用文件锁来解决多机批量运行程序的问题是一种常见的同步机制。这主要是为了防止多个进程同时对同一文件进行写操作,从而避免数据损坏或不一致的情况。

    File

    Kubernetes 的 AFM-MSA 的推理 YAML,即:

    • name 参数:afm-msa
    • completionsparallelism,保持一致,运行 Pods 的数量,例如 3 是使用 3 个 Pod 运行同一个程序。
    • 运行目录 (workingDir):workspace_v2/af2
    • 镜像 (image):af2:v1.04
    • commands 参数,需要执行的命令,注意的是,不要使用 nohup 方式运行,这样会导致 Pod 提前关闭。

    即:

    apiVersion: batch/v1
    kind: Job
    metadata:
      name: afm-msa-search # 你的训练名字,这里可以使用generateName: structure-predict-job-形式,k8s会在-后面自动后缀一些随机字符串,以避免同时跑同一个pipeline时资源名字重复报错
    spec:
      completions: 2    # 总pod数量
      parallelism: 2    # 并行运行的pod数量
      backoffLimit: 0   # 重试次数,这里失败后不需要重试
      template:
        metadata:
            annotations:
                sidecar.istio.io/inject: "false"
            labels:
                file-mount: "true"      # 这两个label必须加,kubeflow帮你自动配置一些基本环境
                user-mount: "true"
        spec:
          nodeSelector:
              service: "ai2-msa-only"   # device是gpu类型,比如a10,a100
          tolerations:
            - key: role
              operator:  Equal
              value: ai2-msa-only
              effect: NoSchedule
          containers:
            - name: sp
              image: "af2:v1.04"   # 统一用这个镜像,提供一个基础conda和cuda环境
              imagePullPolicy: Always
              resources:
                limits:
                  cpu: "62"
                  memory: 124G
                requests:
                  cpu: "62"
                  memory: 124G
              command: [
                "/bin/sh",
                "-cl",
                "python -u batch_msa_search.py test/fasta_test/ test/outputs_test/ multimer > nohup.20230314.out 2>&1"]     # 执行的命令
              workingDir: "workspace_v2/af2/"        # 默认的工作目录,就是你启动脚本的所在目录
              env:                     # 这是把每个pod的名字注入环境变量,以便能够在程序里区分当前是在哪一个pod中
                - name: PODNAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
          restartPolicy: Never
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    执行:

    # cd workspace/multimer-pipeline
    kubectl delete jobs afm-msa-search
    kubectl apply -f afm-msa-search.yaml
    
    # kubectl get pods
    kubectl describe pods afm-msa-search-2c5kd
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    查看 Pods 日志:

    Events:
      Type    Reason     Age   From         Message
      ----    ------     ----  ----         -------
      Normal  Scheduled  10m   cce-volcano  Successfully assigned afm-msa-search-2c5kd to 172.30.1.156
      Normal  Pulling    10m   kubelet      Pulling image "af2:v1.04"
      Normal  Pulled     10m   kubelet      Successfully pulled image "af2:v1.04" in 184.084607ms
      Normal  Created    10m   kubelet      Created container sp
      Normal  Started    10m   kubelet      Started container sp
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    batch_msa_search.py 即批量搜索 MSA 的脚本,使用文件锁的方式,进行处理

    • with open(lock_path, 'x') as wp,检查文件锁的操作
    • os.system(cmd_line),执行命令
    • 之后,删除全部的锁文件,避免错误无法执行。

    源码:

    #!/usr/bin/env python
    # -- coding: utf-8 --
    """
    Copyright (c) 2024. All rights reserved.
    Created by C. L. Wang on 2024/3/13
    """
    
    import os
    import random
    import shutil
    import sys
    import time
    
    input_file = sys.argv[1]
    output_dir = sys.argv[2]
    model_type = sys.argv[3]
    
    time.sleep(random.random())
    
    model_list = ['monomer', 'monomer_casp14', 'monomer_ptm', 'multimer']
    
    if model_type not in model_list:
        print("[Info] model not supported!!!,only monomer/multimer supported!!!")
        sys.exit(1)
    
    if not os.path.exists(output_dir):
        os.mkdir(output_dir)
    
    lock_folder = os.path.join(output_dir, "lock-msa")
    
    if not os.path.exists(lock_folder):
        os.mkdir(lock_folder)
    
    lock_file_list = []
    for root, dirs, files in os.walk(input_file):
        if root != input_file:
            break
        for i, input_file in enumerate(files):
            path = os.path.join(root, input_file)
            print(f"[Info] run path: {path}")
            lock_path = os.path.join(output_dir, "lock-msa", f"{input_file}.lock")  # 检查文件锁
            try:
                with open(lock_path, 'x') as wp:
                    time.sleep(10)
                    pass
            except Exception as e:
                print(e)
                print(f'[Info] exist! {i} {input_file}')
                continue
            print(f'working on {i} {input_file}')
            base_name = os.path.basename(path).split(".")[0]
            cmd_line = f"/bin/bash run_msa.sh {path} {output_dir} {model_type} > nohup.{base_name}.out 2>&1"
            print(f"[Info] cmd_line: {cmd_line}")
            lock_file_list.append(lock_path)
            os.system(cmd_line)  # 运行命令
    
    print(f"[Info] lock_file_list: {len(lock_file_list)}")
    for path in lock_file_list:
        if os.path.exists(path):
            shutil.rmtree(path)
            print(f"[Info] delete file: {path}")
    print(f"[Info] run {input_file} over!")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    核心运行程序是 run_msa.sh,即,执行脚本:

    #!/bin/bash
    
    set -xe
    #PROJECT_DIR=$(cd "$(dirname $0)" && pwd)
    source activate alphafold
    
    export PATH="/usr/local/cuda-11.6/bin:$PATH"
    export LD_LIBRARY_PATH="/usr/local/cuda-11.6/lib64:$LD_LIBRARY_PATH"
    
    conda info --envs
    
    fasta_path=$1
    output_dir=$2
    model_type=$3
    
    bash run_alphafold.sh \
    -f "$fasta_path" \
    -o "$output_dir" \
    -m "$model_type" \
    -g false \
    -h true
    
    echo "run over: $fasta_path"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    查看执行结果:

    kubectl exec -it afm-msa-search-2c5kd bash
    
    • 1
  • 相关阅读:
    每日OJ题_剑指offer数组篇
    使用html2canvas将html转pdf,由于table表的水平和竖直有滚动条导致显示不全(或者有空白)
    【PAT甲级 - C++题解】1045 Favorite Color Stripe
    【云原生丨Docker系列7】Docker的四种网络模式详解
    MySQL表的操作
    【oppenvino】使用docker安装openvino并进行onnx到IR中间件的转化
    2022年全球市场抹茶巧克力总体规模、主要生产商、主要地区、产品和应用细分研究报告
    Nginx使用教程
    基于javaweb的家庭理财系统(java+ssm+jsp+tomcat8+mysql)
    Springboot与RestTemplate
  • 原文地址:https://blog.csdn.net/u012515223/article/details/136711101