欢迎关注我的CSDN:https://spike.blog.csdn.net/
本文地址:https://blog.csdn.net/caroline_wendy/article/details/136711101
在 Python 中,使用文件锁来解决多机批量运行程序的问题是一种常见的同步机制。这主要是为了防止多个进程同时对同一文件进行写操作,从而避免数据损坏或不一致的情况。

Kubernetes 的 AFM-MSA 的推理 YAML,即:
name 参数:afm-msacompletions 与 parallelism,保持一致,运行 Pods 的数量,例如 3 是使用 3 个 Pod 运行同一个程序。workingDir):workspace_v2/af2image):af2:v1.04commands 参数,需要执行的命令,注意的是,不要使用 nohup 方式运行,这样会导致 Pod 提前关闭。即:
apiVersion: batch/v1
kind: Job
metadata:
name: afm-msa-search # 你的训练名字,这里可以使用generateName: structure-predict-job-形式,k8s会在-后面自动后缀一些随机字符串,以避免同时跑同一个pipeline时资源名字重复报错
spec:
completions: 2 # 总pod数量
parallelism: 2 # 并行运行的pod数量
backoffLimit: 0 # 重试次数,这里失败后不需要重试
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
labels:
file-mount: "true" # 这两个label必须加,kubeflow帮你自动配置一些基本环境
user-mount: "true"
spec:
nodeSelector:
service: "ai2-msa-only" # device是gpu类型,比如a10,a100
tolerations:
- key: role
operator: Equal
value: ai2-msa-only
effect: NoSchedule
containers:
- name: sp
image: "af2:v1.04" # 统一用这个镜像,提供一个基础conda和cuda环境
imagePullPolicy: Always
resources:
limits:
cpu: "62"
memory: 124G
requests:
cpu: "62"
memory: 124G
command: [
"/bin/sh",
"-cl",
"python -u batch_msa_search.py test/fasta_test/ test/outputs_test/ multimer > nohup.20230314.out 2>&1"] # 执行的命令
workingDir: "workspace_v2/af2/" # 默认的工作目录,就是你启动脚本的所在目录
env: # 这是把每个pod的名字注入环境变量,以便能够在程序里区分当前是在哪一个pod中
- name: PODNAME
valueFrom:
fieldRef:
fieldPath: metadata.name
restartPolicy: Never
执行:
# cd workspace/multimer-pipeline
kubectl delete jobs afm-msa-search
kubectl apply -f afm-msa-search.yaml
# kubectl get pods
kubectl describe pods afm-msa-search-2c5kd
查看 Pods 日志:
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 10m cce-volcano Successfully assigned afm-msa-search-2c5kd to 172.30.1.156
Normal Pulling 10m kubelet Pulling image "af2:v1.04"
Normal Pulled 10m kubelet Successfully pulled image "af2:v1.04" in 184.084607ms
Normal Created 10m kubelet Created container sp
Normal Started 10m kubelet Started container sp
batch_msa_search.py 即批量搜索 MSA 的脚本,使用文件锁的方式,进行处理
with open(lock_path, 'x') as wp,检查文件锁的操作os.system(cmd_line),执行命令源码:
#!/usr/bin/env python
# -- coding: utf-8 --
"""
Copyright (c) 2024. All rights reserved.
Created by C. L. Wang on 2024/3/13
"""
import os
import random
import shutil
import sys
import time
input_file = sys.argv[1]
output_dir = sys.argv[2]
model_type = sys.argv[3]
time.sleep(random.random())
model_list = ['monomer', 'monomer_casp14', 'monomer_ptm', 'multimer']
if model_type not in model_list:
print("[Info] model not supported!!!,only monomer/multimer supported!!!")
sys.exit(1)
if not os.path.exists(output_dir):
os.mkdir(output_dir)
lock_folder = os.path.join(output_dir, "lock-msa")
if not os.path.exists(lock_folder):
os.mkdir(lock_folder)
lock_file_list = []
for root, dirs, files in os.walk(input_file):
if root != input_file:
break
for i, input_file in enumerate(files):
path = os.path.join(root, input_file)
print(f"[Info] run path: {path}")
lock_path = os.path.join(output_dir, "lock-msa", f"{input_file}.lock") # 检查文件锁
try:
with open(lock_path, 'x') as wp:
time.sleep(10)
pass
except Exception as e:
print(e)
print(f'[Info] exist! {i} {input_file}')
continue
print(f'working on {i} {input_file}')
base_name = os.path.basename(path).split(".")[0]
cmd_line = f"/bin/bash run_msa.sh {path} {output_dir} {model_type} > nohup.{base_name}.out 2>&1"
print(f"[Info] cmd_line: {cmd_line}")
lock_file_list.append(lock_path)
os.system(cmd_line) # 运行命令
print(f"[Info] lock_file_list: {len(lock_file_list)}")
for path in lock_file_list:
if os.path.exists(path):
shutil.rmtree(path)
print(f"[Info] delete file: {path}")
print(f"[Info] run {input_file} over!")
核心运行程序是 run_msa.sh,即,执行脚本:
#!/bin/bash
set -xe
#PROJECT_DIR=$(cd "$(dirname $0)" && pwd)
source activate alphafold
export PATH="/usr/local/cuda-11.6/bin:$PATH"
export LD_LIBRARY_PATH="/usr/local/cuda-11.6/lib64:$LD_LIBRARY_PATH"
conda info --envs
fasta_path=$1
output_dir=$2
model_type=$3
bash run_alphafold.sh \
-f "$fasta_path" \
-o "$output_dir" \
-m "$model_type" \
-g false \
-h true
echo "run over: $fasta_path"
查看执行结果:
kubectl exec -it afm-msa-search-2c5kd bash