天之道,损有余以补不足。
深度学习,那是先富的游戏。穷导致硬件不行,非科班导致工程也不行,每次比赛数据量一大,我心里浮现的都是水浒传名场面
相声,讲究的是说学逗唱;而工程,讲究的是一个吹德偶夫(trade-off)。那,我们就用时间换空间。时间是每个人的朋友,跑的慢,就多等等。之前kdd cup百度比赛,我在尝试复现baseline的时候就遇到了麻烦。
215天的训练历史,产生了400万样本。batch_size选择1024,但每个step大概耗时800ms,单个epoch慢的离谱。不禁感慨生活太残忍了。
本文记录一下,我做了哪些改动。很多人喜欢强调:什么算法工程师先要是一个工程师。我虽然不懂这话的含义,但工地上,他们都叫我Yue工,赢麻了。
首先是读取文件过程,想把csv保存为pickle后加载。但发现csv读取只花2秒。可以接受,未采纳。
dataset中窗口滑动转化为时序样本,原本pandas操作变为numpy操作。单个样本处理时间从2e-4s 降低为2e-6s,训练单个step从800ms可以降为250ms。但二者切片时对左开右闭的设置不一样,坑了很久
原本我沾沾自喜的设计了,一个day到index到样本的数据提取路线,每个过程都封装的很好。实际跑起来,发现训练前啥预处理没有,要花三十分钟(1933s)。从所有index选取不在drop_list的,普通写法特别耗时,转化为集合求差。三十分钟变成3秒。
很慢
idx = [i for i in all_idx if i not in dropidx] # very slow
很快
idx = sorted(list(set(all_idx) - set(dropidx)))
样本选取idx过程中,通过分布式提取并保存为pickle,训练可以开始的快些?groupby还是很耗时。
def get_idx_from_days2(data, selected_days, day_columns='Day', mode='train', train_sequence_length=2*24*6, predict_sequence_length=2*24*6, strides=1*6, max_lags=1):
"""
sample1: 固定间隔,每个间隔选一个
sample2: 间隔1-6随机, 此时先选出每个的间隔序列,再cumsum到原始序列进行选择
"""
def func(data):
return data.tail(predict_sequence_length - 1).index.tolist()
def func2(data):
return data.head(max(train_sequence_length, max_lags) + 1).index.tolist()
cpu_count = os.cpu_count()
all_idx = data.loc[data[day_columns].isin(selected_days)].index.tolist()
data_grouped = data.groupby(['TurbID'])
dropidx = joblib.Parallel(cpu_count)(joblib.delayed(func)(group) for name, group in data_grouped)
dropidx = list(itertools.chain(*dropidx))
if mode == 'train':
dropidx2 = joblib.Parallel(cpu_count)(joblib.delayed(func2)(group) for name, group in data_grouped)
dropidx2 = list(itertools.chain(*dropidx2))
dropidx += dropidx2
idx = sorted(list(set(all_idx) - set(dropidx)))
return idx
tensorflow可以存成tf-records二进制文件加速加载,这里我没有使用。
本来我最喜欢的结构是数据原始列原封不动,作为网络输入。特征部分尽量在神经网络里使用tf实现,感觉只适合简单任务与小数据。因此,把特征工程部分采用多进程完成并保存。或者采用tf.data里的多线程map
tensorflow本身的一个优势是其自带的tf.data模块,可以高效的给模型喂子弹。官方文档里有如何
Profiling tf.profiler.experimental.Trace
原来Tensorboard 里有个profile_batch的参数,可以直接帮着分析。由于每次都要端口转发,后来用tensorboard就比较少了, 不过实际训练的时候,还是去掉TB吧,很粘时间也
预测48小时,并迭代15天。因此可以很多值预测了多遍,可以转化一下,用向量方法求出来。在尝试规则模型的时候,发现本地评分过程消耗了两个小时。单个风机消耗的是6秒,为啥到134,就到两小时了。
首先看了一下代码,看起来没有很容易优化的地方。那第一步就是把采样加上去,因为不采样自己凭一次分要2小时,太久。另一方面,我发现比赛规则我第一印象是按个滚动,现在里面也是有采样的。所以既可以减少时间,也与线上评价更吻合。
发现循环的时候,其实可以更简单点,就是逐行循环,而不是每一轮都筛选。
还是不行的感觉,如果换成多线程感觉比较麻烦。其实,之前我几乎没怎么用过多线程。我一直都有一颗仁慈的人。很多人对待电脑,就像资本家对我们一样,就不让闲着。我干完了等别人结果都不行,非要整什么异步。
最终版本:
def _process(index: int):
y_true = pd.read_pickle('../../data/user_data/y_true.pkl')
y_pred = pd.read_pickle('../../data/user_data/y_pred.pkl')
raw_data = pd.read_pickle('../../data/user_data/valid_df_raw.pkl')
predict_sequence_length = y_true.shape[1] - 2
total_rows = len(y_true)
strides = total_rows // os.cpu_count()
start = index * strides
y_true_process = y_true.iloc[start: start+strides]
y_pred_process = y_pred.iloc[start: start+strides]
scores_process = []
for true, pred in zip(y_true_process.iterrows(), y_pred_process.iterrows()):
start_time = true[1]['start_time']
turbine = true[1]['TurbID']
raw_index = raw_data.loc[(raw_data['start_time']== start_time) & (raw_data['TurbID'] == turbine)].index[0]
scores_process.append(
turbine_score(
true[1].iloc[2:].values,
pred[1].iloc[2:].values,
raw_data.loc[raw_index: raw_index + predict_sequence_length-1]
))
return scores_process
def kdd_score_parallel():
#多线程版本,每个线程从保存的pkl中截取自己处理的片段,然后分别处理,最后合成
n_process = os.cpu_count()
scores = []
with multiprocessing.Pool(n_process) as p:
process_scores = p.map(_process, range(n_process))
scores.append(process_scores)
scores = np.concatenate(scores)
scores = scores.reshape([-1, 2])
return scores
做出多线程版本。发现需要保存下来,才能方便序列化,多核之后从7000秒降到600秒了,几乎可以达到实用了。
如果再有心思的话,可以用numba和cython进一步优化速度。
实验迭代
另外关于迭代,就是可以采样部分数据进行实验,加速迭代历程。时序里,当然是选择最后的,或者同期的。
家里条件好的,可以把apex和多卡都搞上。1080这种卡收益不大,那就让老爷们先走吧。
深刻的感受到了从30分钟转化为3秒完成,都是因为自己薄弱的基础。
再比如,主办方给的tensorflow版本较低,我甚至要降cuda版本才能用。我就给主办方以时间,一个月没做比赛,他们就把版本升了。
最后,即使成绩暂时不够好,或生活不如意,问题不大。做时间的朋友,慢慢积累。也许,成绩提高了,也许期待就降低了。牢记:给文明以岁月,给自己以时间,路线对了,稳赢,无非是小赢、中赢,还是大赢的问题。
以上措施帮助我可以在两个小时左右完成训练和验证