在数据量比较大的情况下,数据预处理有时候会非常耗费时间。
可以利用 joblib 中的 Parallel 和 delayed 进行多CPU并行处理
示例如下:
- import random
- import os
- from glob import glob
- from tqdm import tqdm
- from joblib import Parallel, delayed
- import soundfile as sf
- import pycantonese as pct
- from opencc import OpenCC
-
- cc = OpenCC('s2hk')
-
- ######### ljspeech ##########
- def process_ljspeech_one_utterance(wav_path, text, mode, save_root):
- try:
- tmp = wav_path.split('/')
- spk = 'LJSpeech-1.1'
- wname = tmp[-1]
- tname = wname.replace('.wav','.txt')
- text_to_path = f'{save_root}/{mode}/{spk}/{tname}'
-
- os.makedirs(os.path.dirname(text_to_path), exist_ok=True)
- fp = open(text_to_path, 'w')
- fp.write(text)
- fp.close()
-
- wav_to_path = f'{save_root}/{mode}/{spk}/{wname}'
-
- _, fs = sf.read(wav_path)
- if fs != 16000:
- cmd = f'sox {wav_path} -r 16000 {wav_to_path}'
- else:
- cmd = f'cp {wav_path} {wav_to_path}'
- os.system(cmd)
- assert False
-
- except BaseException:
- return
-
-
- wavs_root = 'source_data/LJSpeech/LJSpeech-1.1'
-
- data = []
- with open(f'{wavs_root}/metadata.csv', 'r') as f:
- lines = f.readlines()
- for line in lines:
- uttid = line.strip().split('|')[0]
- wav_path = f'{wavs_root}/wavs/{uttid}.wav'
- text = line.strip().split('|')[2]
- data.append([wav_path, text])
- f.close()
-
- valid_data = random.sample(data, 100)
- train_data = [dt for dt in data if dt not in valid_data]
-
- Parallel(n_jobs=20)(delayed(process_ljspeech_one_utterance)(wav_path, text, mode='train', save_root='wavs/LJSpeech') for wav_path,text in tqdm(train_data))
- Parallel(20)(delayed(process_ljspeech_one_utterance)(wav_path, text, mode='valid', save_root='wavs/LJSpeech') for wav_path,text in tqdm(valid_data))
- # Parallel(n_jobs=20): 指定20个CPU(默认是分配给不同的CPU)
-
-
- all_wavs = glob('wavs/LJSpeech/*/*/*.wav')
- print(f'obtain {len(all_wavs)} wavs...')