这里整理一些PyTorch单机多核训练的方法和简单原理,目的是既能在写代码时知道怎么用,又能从原理上知道大致是怎么回事儿。
就目前来说,并行训练的方法可以根据的不同的并行对象分为——模型并行和数据并行。
模型并行:是网络太大,一张卡存不了,那么拆分,然后进行模型并行训练。
数据并行:多个显卡同时采用数据训练网络的副本。在这里仅先讨论数据并行。
PyTorch单机多核训练方案有两种:一种是利用nn.DataParallel
实现,实现简单,不涉及多进程;另一种是用torch.nn.parallel.DistributedDataParallel
和torch.utils.data.distributed.DistributedSampler
结合多进程实现。第二种方式效率更高,但是实现起来稍难,第二种方式同时支持多节点分布式实现。方案二的效率要比方案一高,即使是在单运算节点上。
先介绍一下数据并行的概念。
目前流行的深度学习框架(例如Pytorch和Tensorflow)为分布式培训提供内置支持。从广义上讲,从磁盘读取输入数据开始,加载数据涉及四个步骤:
PyTorch 中的 Dataloader
方法 提供使用多个进程(通过将 num_workers> 0 设置)从磁盘加载数据以及将多页数据从可分页内存到固定内存的能力(通过设置 pin_memory = True)。
一般的,对于大批量的数据,若仅有一个线程用于加载数据,则数据加载时间占主导地位,这意味着无论我们如何加快数据处理速度,性能都会受到数据加载时间的限制。现在,设置num_workers = 4(这四个进程是四个独立的进程) 以及 pin_memory = True。这样,可以使用多个进程从磁盘读取不重叠的数据,并启动生产者-消费者线程以将这些进程读取的数据从可分页的内存转移到固定的内存。
多个进程能够更快地加载数据,并且当数据处理时间足够长时,流水线数据加载几乎可以完全隐藏数据加载延迟。这是因为在处理当前批次的同时,将从磁盘读取下一个批次的数据,并将其传输到固定内存。如果处理当前批次的时间足够长,则下一个批次的数据将立即可用。这个想法需要为num_workers 参数设置适当的值。设置此参数,以使从磁盘读取批处理数据的速度比GPU处理当前批处理的速度更快(但不能更高,因为这只会浪费多个进程使用的系统资源)。
接下来将介绍具体的实现方法,为了方便理解,这里用一个简单的CNN模型训练MNIST手写数据集,相关代码:
核心在于使用nn.DataParallel
将模型wrap一下,代码其他地方不需要做任何更改:
model = nn.DataParallel(model)
为方便说明,我们假设模型输入为(32, input_dim),这里的 32 表示batch_size,模型输出为(32, output_dim),使用 4 个GPU训练。nn.DataParallel
起到的作用是将这 32 个样本拆成 4 份,发送给 4 个GPU 分别做 forward,然后生成 4 个大小为(8, output_dim)的输出,然后再将这 4 个输出都收集到cuda:0
上并合并成(32, output_dim)。
详细流程:
forward:是将输入一个 batch 的数据均分成多份,分别送到对应的 GPU 进行计算。与 Module 相关的所有数据也都会以浅复制的方式复制多份。每个 GPU 在单独的线程上将针对各自的输入数据独立并行地进行 forward 计算。
backward:在主GPU上收集网络输出,并通过将网络输出与批次中每个元素的真实数据标签进行比较来计算损失函数值。接下来,损失值分散给各个GPU,每个GPU进行反向传播以计算梯度。最后,在主GPU上归约梯度、进行梯度下降,并更新主GPU上的模型参数。由于模型参数仅在主GPU上更新,而其他从属GPU此时并不是同步更新的,所以需要将更新后的模型参数复制到剩余的从属 GPU 中,以此来实现并行。
DataParallel会将定义的网络模型参数默认放在GPU 0上,所以dataparallel实质是可以看做把训练参数从GPU拷贝到其他的GPU同时训练,这样会导致内存和GPU使用率出现很严重的负载不均衡现象,即GPU 0的使用内存和使用率会大大超出其他显卡的使用内存,因为在这里GPU0作为master来进行梯度的汇总和模型的更新,再将计算任务下发给其他GPU,所以他的内存和使用率会比其他的高。
具体流程见下图:
可以看出,nn.DataParallel
没有改变模型的输入输出,因此其他部分的代码不需要做任何更改,非常方便。但弊端是,后续的loss计算只会在cuda:0
上进行,没法并行,因此会导致负载不均衡的问题。
如果把loss
放在模型里计算的话,则可以缓解上述负载不均衡的问题,示意代码如下:
class Net:
def __init__(self,...):
# code
def forward(self, inputs, labels=None)
# outputs = fct(inputs)
# loss_fct = ...
if labels is not None:
loss = loss_fct(outputs, labels) # 在训练模型时直接将labels传入模型,在forward过程中计算loss
return loss
else:
return outputs
按照我们上面提到的模型并行逻辑,在每个GPU上会计算出一个loss,这些loss会被收集到cuda:0
上并合并成长度为 4 的张量。这个时候在做backward的之前,必须对将这个loss张量合并成一个标量,一般直接取mean就可以。这在Pytorch官方文档nn.DataParallel函数中有提到:
When
module
returns a scalar (i.e., 0-dimensional tensor) in forward(), this wrapper will return a vector of length equal to number of devices used in data parallelism, containing the result from each device.
这部分的例子可以参考:data_parallel_train.py
Note: DataParallel 中,batch size 设置必须为单卡的 n 倍,因为一个batch的数据会被主GPU分散为minibatch给其他GPU,但是在 DistributedDataParalle(方案二) 内,batch size 设置于单卡一样即可,因为各个GPU对应的进程独立从磁盘中加载数据,训练所设置的batch size就是每一个进程的batch size。
方案二被成为分布式数据并行(distributed data parallel),是通过多进程实现(parallel.DistributedDataParallel)的,相比与方案一要复杂很多。可以从以下几个方面理解:
从一开始就会启动多个进程(进程数等于GPU数),每个进程独享一个GPU,每个进程都会独立地执行代码。这意味着每个进程都独立地初始化模型、训练,当然,在每次迭代过程中会通过进程间通信共享梯度,整合梯度,然后独立地更新参数。
每个进程都会初始化一份训练数据集,当然它们会使用数据集中的不同记录做训练,这相当于同样的模型喂进去不同的数据做训练,也就是所谓的数据并行。这是通过torch.utils.data.distributed.DistributedSampler
函数实现的,不过逻辑上也不难想到,只要做一下数据partition,不同进程拿到不同的parition就可以了,官方有一个简单的demo,感兴趣的可以看一下代码实现:Distributed Training
进程通过local_rank
变量来标识自己,local_rank
为0的为master,其他是slave。这个变量是torch.distributed
包帮我们创建的,使用方法如下:
import argparse # 必须引入 argparse 包
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type