目录
现有的 Dataset 和 DataLoader 及其存在的问题
新的数据加载方式:DataPipe 与 DataLoader2
近日,PyTorch 推出了新的版本 PyTorch 1.12 ,其中针对 PyTorch 的数据加载与处理方面有几个值得关注的更新:
而就在几个月前发布的 PyTorch 1.11 中,PyTorch 推出了新的数据加载构建方式 DataPipe,详见上一期内容。从最近的一些版本更新可以看出,PyTorch 正尝试构建新一代的机器学习数据工具链与生态,以满足 PyTorch 对更高性能,和应对更大规模数据的需求。
本文将从以下三个部分,为大家解读一下近期 PyTorch 在数据工具链方面的一些更新与探索:
本文描述的功能按发布状态可分为:
Stable:稳定版本,这些功能将长期维护,通常不会存在很大的性能限制,文档充分准确,并且会尽可能地保证向后兼容性。
Beta:Beta 版本,功能被标记为 Beta 是因为 API 可能会根据用户反馈而更改,因为性能需要改进或者并未覆盖所有的功能,不保证向后兼容性。
Prototype: 原型版本,这些功能通常不能作为 PyPI 或 Conda 等二进制发行版的一部分提供,并且处于反馈和测试的早期阶段。
PyTorch 现有的数据构建方式是以 Dataset 和 DataLoader 为核心展开的,如下示例展示了一个普遍的 Dataset 定义方式(继承 Dataset 并实现 __len__
和 __getitem__
方法):
- # refer: https://pytorch.org/tutorials/beginner/basics/data_tutorial.html
-
- import os
- import pandas as pd
- from torchvision.io import read_image
-
- class CustomImageDataset(Dataset):
- def __init__(self, annotations_file, img_dir):
- self.img_labels = pd.read_csv(annotations_file)
- self.img_dir = img_dir
-
- def __len__(self):
- return len(self.img_labels)
-
- def __getitem__(self, idx):
- img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
- image = read_image(img_path)
- label = self.img_labels.iloc[idx, 1]
- return image, label
该示例定义了一个给定索引返回样本的方式,一般称之为 map-style,这种方式要求已知所有的样本信息(如 __len__
返回样本数量),虽然简洁清晰但有诸多限制,比如一个典型的场景是加载 COCO 格式的数据标注到内存中,当数据规模较大时,这种方式会占用非常大的内存,难以进一步扩展数据规模。
针对大规模数据的训练有许多应对思路,其中一个方式是将预处理后的数据序列化,存储为支持快速流式读取的文件格式,比如 TensorFlow 中的 TFRecord ,MXNet 使用的 RecordIO,简单且有效。而上述的 map-style 显然是不支持流式读取这种方式的,因此在 PyTorch 1.2 引入了一个 iter-style 的 IterableDataset ,以支持流式的数据加载。
Dataset 实现了单个样本(当然也支持多个样本)的加载,而在实际模型训练时,我们通常希望每次使用一小批的样本来进行训练,同时要打乱样本的顺序以防止过拟合。另外数据加载通常是训练瓶颈,也希望能够采用多进程或者多线程来加快数据加载速度。这些功能在 PyTorch 中都由 DataLoader 来完成,一个普遍的 DataLoader 定义与使用流程示例:
- from torch.utils.data import DataLoader
-
- ...
- train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
- test_dataloader = DataLoader(test_data, batch_size=64, shuffle=True)
-
- for features, label in train_dataloader:
- ...
Dataset + DataLoader 组成了现有的 PyTorch 数据构建方式,这种方式接口清晰易用,但是从长远看来仍存在一些问题,PyTorch 的产品经理 Donny Greenberg 在 PyTorch’s Next generation of Data Tooling 主题分享中,提到了如下几个问题:
以上问题影响了 PyTorch 数据加载与处理工具链的可复用性和可拓展性,同时对性能也有影响。在这些背景下,PyTorch 正尝试构建新的数据加载与处理方式。
在 PyTorch 1.11 中,引入了一个新的模块化数据加载库 TorchData,仍处于 Beta 版本状态,在 TorchData 里面实现了很多通用的数据加载以及处理的 DataPipe。
基于原有 Dataset 可复用性差的问题,DataPipe 设计的出发点就是模块化设计,将数据加载与构建流程拆分为一个个更小的功能模块,将原来的一个 Dataset 所实现的功能拆分为若干个功能模块,模块功能越简单可复用性越高。
与 Dataset 对应的,DataPipe 也支持 map-style 和 iter-style 两种数据获取方式,它们需要分别实现 __getitem__
和 __iter__
方法。此外,DataPipe 支持链式组合方式,可以通过已有的 DataPipe 构造一个新的 DataPipe,在新的 DataPipe 中对原有 DataPipe 迭代出来的数据进行处理,以下是一个 DataPipe 链式组合的例子:
- # refer: https://pytorch.org/data/beta/torchdata.datapipes.iter.html
-
- >>> from torchdata.datapipes.iter import IterableWrapper, Mapper
- >>> dp = IterableWrapper(range(10))
- >>> map_dp_1 = Mapper(dp, lambda x: x + 1) # Using class constructor
- >>> map_dp_2 = dp.map(lambda x: x + 1) # Using functional form (recommended)
- >>> list(map_dp_1)
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- >>> list(map_dp_2)
- [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
- >>> filter_dp = map_dp_1.filter(lambda x: x % 2 == 0)
- >>> list(filter_dp)
- [2, 4, 6, 8, 10]
通过 DataPipe 的链式组合,可以构建出由多个 DataPipe 组成的 DataPipe Graph,实现一个完整的数据加载与处理流程,用以替代 Dataset,并且从 PyTorch 1.12 起, DataPipe 可以和现有的 DataLoader 完全兼容使用。
DataPipe 除了能够替代 Dataset 之外,也可以替代 DataLoader 中的一些功能,比如 Sampler、Shuffle 和 Collate Batchs,因此 PyTorch 1.12 引入了一个原型版本的 DataLoader2,一个为适配 DataPipe 而设计的更轻量版本的 DataLoader。DataLoader2 的原型设计代码在 TorchData 中,虽然目前仍是原型版本的状态,但其 API 设计基本确定,可以一窥其貌,DataLoader2 简化逻辑后的代码如下:
- class DataLoader2(Generic[T_co]):
- """为方便理解,简化了代码逻辑的版本
-
- refer: https://github.com/pytorch/data/blob/main/torchdata/dataloader2/dataloader2.py#L46-L195
- """
- def __init__(
- self,
- datapipe: IterDataPipe,
- datapipe_adapter_fn: Optional[Union[Iterable[Adapter], Adapter]] = None,
- reading_service: Optional[ReadingServiceInterface] = None,
- ) -> None:
- self.datapipe = datapipe
- self.datapipe_adapter_fn = datapipe_adapter_fn
- self.reading_service = reading_service
-
- if self.datapipe_adapter_fn is not None:
- self.datapipe = self.datapipe_adapter_fn(self.datapipe)
-
- def __iter__(self) -> Iterator[T_co]:
- if self.reading_service is None:
- self.datapipe = self.reading_service.initialize(self.datapipe)
- self._datapipe_iter = iter(self.datapipe)
- return self
-
- def __next__(self) -> T_co:
- return next(self._datapipe_iter)
DataLoader2 中主要有三个概念,DataPipe、Adapter 和 ReadingService。其中 Adapter 用来配置、修改和扩展 DataPipe Graph ,比如说实现一个 PinMemory 的 Adapter,可以在 DataPipe Graph 的末尾添加一个 DataPipe 节点,用来将最后输出的 Tensor 放入到锁页内存中。除了 PinMemory 之外,还可以实现控制 ShufflerIterDataPipe 是否开启 shuffle,添加 DataPipe 节点控制进程同步等功能。而 ReadingService 则是用来实现具体执行 DataPipe Graph 的功能,比如多进程执行 DataPipe Graph 和分布式执行 DataPipe Graph。
通过 Beta 版本的 DataPipe 和原型版本的 DataLoader2,可以大概看出:PyTorch 在尝试解决现有的 Dataset + DataLoader 方式存在的设计问题,以达到更好的可扩展性与可复用性。
在前面提到,PyTorch 目前缺乏一套针对结构化数据的格式标准,这套标准包括数据处理的 API 以及数据结构。在 PyTorch 1.12 中,引入了一个新的数据预处理库 TorchArrow,此功能仍处于 Beta 状态。TorchArrow 是一个用于机器学习数据预处理的库,它提供了一个 Pandas 风格的接口,易用且性能强大,能够加速数据预处理流程。
TorchArrow 采用了标准的 Apache Arrow 内存格式,解决了不同系统,不同工具之间由于内存格式不同而存在的兼容性问题。在数据处理接口上,TorchArrow 提供了一个 Pandas 风格的 DataFrame API,并且采用 Velox 为后端,可以方便地向量化用户定义函数,具有以下特性:
以下是来自 TorchArrow 官方教程的使用例子,可以看出 TorchArrow 在保持与 Pandas 一致的 API 风格之外,提供了比 Pandas 更精确的类型系统:
- import torcharrow as ta
- import torcharrow.dtypes as dt
- import pandas as pd
-
- print(pd.Series([1, 2, None, 4]))
- # 0 1.0
- # 1 2.0
- # 2 NaN
- # 3 4.0
- # dtype: float64
-
- s = ta.column([1, 2, None, 4])
- print(s)
- # 0 1
- # 1 2
- # 2 None
- # 3 4
- # dtype: Int64(nullable=True), length: 4, null_count: 1
-
- print(s.device)
- # 'cpu'
下面的例子展示了如何使用 DataPipe 加载 Iris 数据集,并使用 TorchArrow 对数据进行标准化:
- from torchdata.datapipes.iter import IterableWrapper, HttpReader
- import torcharrow.dtypes as dt
-
-
- CLASS2INDEX = {'Iris-setosa': 0, 'Iris-versicolor': 1, 'Iris-virginica': 2}
- FEATURE_NAMES = ["sepal length", "sepal width", "petal length", "petal width"]
-
-
- def _filter_fn(x):
- return len(x) != 0
-
-
- def _convert_to_numeric(x):
- label_idx = CLASS2INDEX[x[4]]
- return (float(x[0]), float(x[1]), float(x[2]), float(x[3]), label_idx)
-
-
- def preprocess(df):
- for feature_name in FEATURE_NAMES:
- df[feature_name] = (df[feature_name] - df[feature_name].mean()) / df[feature_name].std()
- return df
-
-
- iris_data_url = 'http://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data'
-
- url_dp = IterableWrapper([iris_data_url])
- http_reader_dp = HttpReader(url_dp)
- csv_dp = http_reader_dp.parse_csv().filter(_filter_fn).map(_convert_to_numeric)
-
- DTYPE = dt.Struct([
- dt.Field("sepal length", dt.float32),
- dt.Field("sepal width", dt.float32),
- dt.Field("petal length", dt.float32),
- dt.Field("petal width", dt.float32),
- dt.Field("label", dt.int32)
- ])
-
- df_dp = csv_dp.dataframe(dtype=DTYPE, dataframe_size=20).map(preprocess)
-
- print(next(iter(df_dp)))
TorchArrow 希望能够为结构化的机器学习数据,提供一个统一的数据内存格式以及数据处理 API,目前仍处于很早期的状态,只有 TorchRec 这个库在使用,不过 TorchData 已经为 TorchArrow 实现了一些相关的 DataPipe,期待之后 TorchArrow 在 PyTorch 生态中的发展。
本文介绍了 PyTorch 现有的数据加载与处理方式,并讨论了其存在的问题,通过 PyTorch 最近发布的几个新版本对数据工具的变动,可以看出 PyTorch 在尝试解决这些问题,一个比较明确的方向是 DataPipe + DataLoader2,在结构化数据的处理方面,则有新推出的 TorchArrow 来构建新的范式。
除了上述提到的问题之外,PyTorch 数据工具生态还存在一些其他问题,目前尚未看到 PyTorch 解决或者应对的尝试,比如: