datasets是huggingface维护的一个轻量级可扩展的数据加载库,其兼容pandas、numpy、pytorch和tensorflow,使用简便。根据其官方简介:Datasets originated from a fork of the awesome TensorFlow Datasets,datasets是源自于tf.data的,两者之间的主要区别可参考这里。
tf.data相较于pytorch的dataset/dataloader来说,(个人认为)其最强大的一点是可以处理大数据集,而不用将所有数据加载到内存中。datasets的序列化基于Apache Arrow(tf.data基于tfrecord),熟悉spark的应该对apache arrow略有了解。datasets使用的是内存地址映射的方式来直接从磁盘上来读取数据,这使得他能够处理大量的数据。用法简介可参考Quick tour。下面对datasets用法做一些简单的记录。
Datasets库使用的是 Apache Arrow 文件格式,所以你只需要加载你需要的样本到内存中就行了,不需要全量加载。
datasets.load_dataset
( path: str,
name: Optional[str] = None,
data_dir: Optional[str] = None,
data_files: Optional[Union[str, Sequence[str], Mapping[str, Union[str, Sequence[str]]]]] = None,
split: Optional[Union[str, Split]] = None,
cache_dir: Optional[str] = None,
features: Optional[Features] = None,
download_config: Optional[DownloadConfig] = None,
download_mode: Optional[Union[DownloadMode, str]] = None,
verification_mode: Optional[Union[VerificationMode, str]] = None,
ignore_verifications="deprecated",
keep_in_memory: Optional[bool] = None,
save_infos: bool = False,
revision: Optional[Union[str, Version]] = None,
use_auth_token: Optional[Union[bool, str]] = None,
task: Optional[Union[str, TaskTemplate]] = None,
streaming: bool = False,
num_proc: Optional[int] = None,
storage_options: Optional[Dict] = None,
**config_kwargs,
) -> Union[DatasetDict, Dataset, IterableDatasetDict, IterableDataset]:
path (str) — Path or name of the dataset. Depending on path, the dataset builder that is used comes from a generic dataset script (JSON, CSV, Parquet, text etc.) or from the dataset script (a python file) inside the dataset directory. local数据组织方式:load_dataset(data_args.dataset_name)。data_args.dataset_name即数据的目录名,里面的数据应该是要命名成train.csv、validation.csv、test.csv,就会被自动生成DatasetDict(3个datasets)。resolve any data file that matches '['train[-._ 0-9/]**', '**[-._ 0-9/]train[-._ 0-9/]**', 'training[-._ 0-9/]**', '**[-._ 0-9/]training[-._ 0-9/]**']' with any supported extension ['csv', 'json', 'txt', 'gif', 'h5', 'hdf', 'png', 'ico', 'jpg', 'jpeg', 'mp3', 'RAW', 'zip'等很多]
name—表示数据集中的子数据集,当一个数据集包含多个数据集时,就需要这个参数。比如"glue"数据集下就包含"sst2"、“cola”、"qqp"等多个子数据集,此时就需要指定name来表示加载哪一个子数据集。
split (Split or str) — Which split of the data to load. If None, will return a dict with all splits (typically datasets.Split.TRAIN and datasets.Split.TEST). If given, will return a single Dataset. Splits can be combined and specified like in tensorflow-datasets. split="train" 只返回1个datasets,即train的。如果只有一个train.csv,也可以通过只取前n条数据来构建train,如split="train[:5000]"表示只取前5k条数据作为train。[slice-splits]
features (Features, optional) — Set the features type to use for this dataset.
sep或者delimiter—指定数据行的分割符,用法应该是pandas的用法。
header=0——第0行是否作为列名。column_names为主,可被column_names覆盖。
column_names—指定读取数据的列名。如果不指定,就默认第一行吧。所以可以通过这个改名。column_names=['text', 'label'] 指定特征column_names[#specify-features]
usecols—指定使用哪些列或列名;a valid list-like usecols parameter would be [0, 1, 2] or ['foo', 'bar', 'baz'].
[pandas小记:pandas数据输入输出_-柚子皮-的博客]cache_dir='/path/to/cache'——这个配置后,好像会卡死!数据集将被缓存到/path/to/cache目录中。如果您再次调用load_dataset并指定相同的cache_dir,则数据将从缓存中加载,而不是从原始数据源重新复制。Note:默认是会将本地数据cp到缓存目录,downloading and preparing dataset csv/chat60d to ~/.cache/huggingface/datasets/...,所以第一次会有Downloading data files。后面则是:datasets.arrow_dataset - Loading cached processed dataset at ~\.cache\huggingface\datasets\...\cache-....arrow。
示例1:
dataset_dict = load_dataset(data_args.dataset_name, sep='\t', header=0, column_names=column_names, usecols=['text', 'label'])
或者dataset_dict= load_dataset("csv", data_dir=data_args.dataset_name, delimiter='\t')
Note: 1 即使目录中只有一个train.csv时,也是返回DatasetDict对象。
示例:从train中取第2条数据,然后转成dataset类型
demo_data = [dataset_dict['train'][1]]
data = Dataset.from_list(demo_data)
from datasets import load_from_disk
encoded_train.save_to_disk('demo_data/')
reloaded_encoded_dataset = load_from_disk('demo_data')
[huggingface-datasets —— tf.data升级版数据加载库 - 简书]
print(dataset_dict)
DatasetDict({
train: Dataset({
features: ['text', 'label'],
num_rows: 500
})
validation: Dataset({
features: ['text', 'label'],
num_rows: 400
})
test: Dataset({
features: ['text', 'label'],
num_rows: 400
})
})
print(dataset_dict['train'].features) #获取dataset中各个字段的数据类型
{'text': Value(dtype='string', id=None), 'label': Value(dtype='int64', id=None)}
print(dataset_dict.shape)
{'train': (500, 2), 'validation': (400, 2), 'test': (400, 2)}
print(dataset_dict['train'][:3]) # 也可以dataset_dict['train'].select(range(3))吧
{'text': ['你好', '处理服务', '能尽快帮我发货吗?'], 'label': [0, 0, 0]}
print(dataset_dict.num_rows)
{'train': 500, 'validation': 400, 'test': 400}
print(dataset_dict.column_names)
{'train': ['text', 'label'], 'validation': ['text', 'label'], 'test': ['text', 'label']}
print(dataset.format) #Dataset对象的方法
print(dataset_dict['train'].format)
{'type': None, 'format_kwargs': {}, 'columns': ['text'], 'output_all_columns': False}
# 挑选label列中的前10个进行查看
dataset[label_mask]['label'][:10]
[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
示例
from datasets import Value, Sequence
dataset = dataset.cast_column("input_ids", Sequence(feature=Value(dtype='int64', id=None)))
{'input_ids': Sequence(feature=Value(dtype='int32', id=None), length=-1, id=None)...} >>
{'input_ids': Sequence(feature=Value(dtype='int64', id=None), length=-1, id=None)...}
也可以通过[process#cast]改数据类型
dataset.set_format()方法指定各个字段的输出类型并限制输出的字段,这一般在load数据时用到。
通过reset_format()可以重置数据类型.
dataset.set_format(type='torch', columns=['label', 'gender_0_spot_id_mean'...])
sorted_dataset = dataset.sort("label")
small_dataset = dataset.select([0, 10, 20, 30, 40, 50])
选取dataset的前100条
small_dataset = dataset.select(range(100))
打散后再选取前100条
dataset_dict['train'].shuffle(seed=42).select(range(100))
dataset_dict = dataset_dict.filter(lambda line: line['text'] and len(line['text']) > 0 and not line['text'].isspace())
dataset = dataset[np.array(dataset['label']) == 0]
或者在tokenized时候过滤:
tokenized_datasets = dataset_dict.map(tokenize_function,batched=True)
def tokenize_function(examples):
examples["text"] = [line for line in examples["text"] if line and len(line) > 0 and not line.isspace()]
return tokenizer(examples["text"], padding=padding, truncation=True, max_length=data_args.max_seq_length)
dataset_dict = dataset.train_test_split(test_size=0.1, seed=123)
print(dataset_dict )
DatasetDict({
train: Dataset({
features: ['text'],
num_rows: 1170
})
test: Dataset({
features: ['text'],
num_rows: 130
})
})使用shard可以将数据集划分为更小的数据集,加载时可分shard加载:
dataset_trn.shard(num_shards=50, index=1)
使用shuffle来打乱数据集:
shuffle_trn = dataset_trn.shuffle(seed=123)
[process#sort-shuffle-select-split-and-shard]
合并数据集的方法concatenate_datasets,当然合并的前提是各个数据集数据类型相同。如两个数据集来自于同一份数据抽取得到的(或者是同时读取的,如在data_files中指定不同dataset名字,或通过split指定名字),则最好先通过flatten_indices()来刷新indices,否则合并时容易报错
from datasets import concatenate_datasets
encoded_train = concatenate_datasets([encoded_trn, encoded_val])
为了使我们的数据保持dataset的格式,我们需要使用 Dataset.map 方法。这能够使我们有更大的灵活度,我们直接给 map 函数传入一个函数,这个函数传入 dataset 的一个样本,然后返回映射后的结果。
def map(
self,
function: Optional[Callable] = None,
with_indices: bool = False,
with_rank: bool = False,
input_columns: Optional[Union[str, List[str]]] = None,
batched: bool = False,
batch_size: Optional[int] = 1000,
drop_last_batch: bool = False,
remove_columns: Optional[Union[str, List[str]]] = None,
keep_in_memory: bool = False,
load_from_cache_file: Optional[bool] = None,
cache_file_name: Optional[str] = None,
writer_batch_size: Optional[int] = 1000,
features: Optional[Features] = None,
disable_nullable: bool = False,
fn_kwargs: Optional[dict] = None,
num_proc: Optional[int] = None,
suffix_template: str = "_{rank:05d}_of_{num_proc:05d}",
new_fingerprint: Optional[str] = None,
desc: Optional[str] = None,
) -> "Dataset"
batched (`bool`, defaults to `False`): Provide batch of examples to `function`. 使用batched表示一次处理多行,也可将其设置为False每次仅处理一行,这在数据增强中可能更常用。因为dataset允许我们每次处理的输入行数不等于输出行数,因此数据增强如单词替换一个变多个数据时,进行将label 字段进行copy多分
batch_size (`int`, *optional*, defaults to `1000`): Number of examples per batch provided to `function` if `batched=True`. If `batch_size <= 0` or `batch_size == None`, provide the full dataset as a single batch to `function`.
def tokenize_function(examples):
return tokenizer(examples["text"], padding=False, truncation=True, return_tensors="pt")
tokenized_dataset_dict= dataset_dict.map(
tokenize_function,
batched=True,
num_proc=data_args.preprocessing_num_workers,
remove_columns=[text_column_name],
load_from_cache_file=not data_args.overwrite_cache,
)
Note:
1 examples的数据类型是
examples.keys()可能有这些 dict_keys(['input_ids', 'token_type_ids', 'attention_mask', 'special_tokens_mask'])
输入时examples数据是没有padding的:
len(examples['input_ids'][0])=len(examples['attention_mask'][0])=31
len(examples['input_ids'][1])=len(examples['attention_mask'][1])=31
2 map可以设置参数 batched=True
,这样子我们的函数可以一下子处理多条数据,而不是每次一条一条地处理。
3 可以在 Dataset.map 中使用 num_proc 参数。如果tokenizer 使用了 use_fast=True 参数,采用的是多线程的方式处理样本。如果没有使用 fast tokenizer,则可以使用num_proc 参数提高速度。
这里tokenizer没有使用padding,这是因为把所有的样本padding到最大的长度效率很低,通常我们会在构造batch的时候才进行padding,因为我们只需要padding到这个batch中的最大长度,而不是训练的最大长度。这个可以节省非常多的预处理时间。batch中padding在后面的DataCollatorWithPadding[参考LLM:预训练语言模型finetune下游任务]或者DataCollatorForLanguageModeling/DataCollatorForWholeWordMask[LLM:finetune预训练语言模型]。
DatasetDict({
train: Dataset({
features: ['input_ids', 'token_type_ids', 'attention_mask'],
num_rows: 495
})
validation: Dataset({
features: ['input_ids', 'token_type_ids', 'attention_mask'],
num_rows: 397
})
test: Dataset({
features: ['input_ids', 'token_type_ids', 'attention_mask'],
num_rows: 397
})
})
DatasetDict['validation'].features:
{'text': Value(dtype='string', id=None), 'label': Value(dtype='int64', id=None),'input_ids': Sequence(feature=Value(dtype='int32', id=None), length=-1, id=None), 'token_type_ids': Sequence(feature=Value(dtype='int8', id=None), length=-1, id=None), 'attention_mask': Sequence(feature=Value(dtype='int8', id=None), length=-1, id=None)}
常规保存的模型和量化后保存的模型,tokenizer应该都没变,所以最后的dataset读取出来经过tokenize后都是如上一样的数据类型。
from:-柚子皮-
ref:https://github.com/huggingface/datasets
Datasets官方文档[Datasets]