国内证券行业的估值定价、风险管理和量化研究,终于开始需要高性能计算了。10年前,我就试图把用 Platform Symphony 搭建的网格计算集群拉进这样的场景,不过安装好两个星期后,还是拆掉将机器变成虚拟机用了。10年后的今天,硬件、软件、开发语言、数据量级都不一样了,但鼓捣调度资源,完成计算任务的集群还是像之前那么好玩。
Ray 是个用c++写的 ,支持Python的分布式计算框架,结构简洁清晰,集群支持自动化部署于阿里云、AWS、微软云、GCS和本地K8S集群。
CUDA_PATH:C:\ProgramData\Anaconda3
CUDA_PATH_V11_7:C:\ProgramData\Anaconda3
C:\ProgramData\Anaconda3
C:\ProgramData\Anaconda3\bin
C:\ProgramData\Anaconda3\Lib\x64
pip install -U "ray[default]"
pip install cupy-cuda117
from datetime import datetime
import numpy as np
import cupy as cp
import ray
num_task_rounds = 1 # 发送计算任务的轮数(每个任务为1个完整场景)
num_gpu_actors = 1 # 集群上使用GPU的ray actor的数量
num_cpu_actors = 10 # 集群上使用CPU的ray actor的数量
gpu_efficiency = 30 # CPU actor执行1个任务的时间,相对GPU actor执行1个任务的时间的倍数
# 如有Ray集群,则使用远程地址
#ray.init(address="ray://##.###.##.##:10001")
ray.init()
@ray.remote(num_gpus=1)
class BSMCGPUActor(object):
def __init__(self):
self.numOfPaths = 10000
self.numOfSteps = 100
def generate_path(self):
z = cp.random.standard_normal(size=(self.numOfPaths, self.numOfSteps))
return cp.mean(cp.exp(cp.cumsum(cp.concatenate([z, -z]), axis=1)))
@ray.remote(num_cpus=1)
class BSMCCPUActor(object):
def __init__(self):
self.numOfPaths = 10000
self.numOfSteps = 100
def generate_path(self):
z = np.random.standard_normal(size=(self.numOfPaths, self.numOfSteps))
return np.mean(np.exp(np.cumsum(np.concatenate([z, -z]), axis=1)))
if __name__ == '__main__':
start_time = datetime.now()
bsmc_gpu_actors = [BSMCGPUActor.remote() for _ in range(num_gpu_actors)]
bsmc_cpu_actors = [BSMCCPUActor.remote() for _ in range(num_cpu_actors)]
print("initiation cost time:" + str(datetime.now() - start_time))
npvs = []
num_ready_tasks = 0
num_tasks = (num_cpu_actors + num_gpu_actors * gpu_efficiency) * num_task_rounds # 本程序运行1次完成的任务总数
for i in range(num_task_rounds):
# 为充分利用集群资源,同时提交CPU和GPU计算任务
for actor in bsmc_cpu_actors:
npvs.append(actor.generate_path.remote())
for _ in range(gpu_efficiency):
for actor in bsmc_gpu_actors:
npvs.append(actor.generate_path.remote())
# 控制集群中积压的任务数量,避免OOM
while len(npvs) > (num_gpu_actors + num_cpu_actors) or (i == num_task_rounds - 1 and len(npvs)):
done_id, npvs = ray.wait(npvs)
num_ready_tasks += len(done_id)
if num_ready_tasks % 100 == 0:
print("ready tasks:" + str(num_ready_tasks) + "/" + str(num_tasks) + ", the " + str(num_ready_tasks) +
"th result:" + str(ray.get(done_id)) + ", current cost time:" + str(datetime.now() - start_time))
print("total cost time:" + str(datetime.now() - start_time))