前面我介绍了word2vec算法的两种实现算法, S k i p − g r a m Skip-gram Skip−gram 以及 C B O W CBOW CBOW 算法,我认为理解一个算法最好的方法就是复现算法的模型,理解每一行代码,下面我就带大家来实现一下效果比较好的 S k i p − g r a m Skip-gram Skip−gram 算法。
在这个算法中,我们拟准备对一本中文网络小说的txt文件进行词向量的训练,训练之前,肯定要先将训练的文章的标点符号进行消除,然后使用jieba分词,对整个文章进行分词,在这里,分词时肯定会有一些没有用的停用词,比如“啊”、“哦”之类的,从分词的结果中将这些停用词进行消除,最后将分词结果保存。之后建立一个字典,将每一个出现的词都往里面放,规定一个单边窗口大小 w i n _ s i z e win\_size win_size,寻找每个中心词周围 w i n _ s i z e win\_size win_size 个单词,将其分为一组,以中心词的 One-hot向量为输入,构建模型,进行负采样,对模型进行训练。这里因为训练的文章并不大,故这里就不使用二次采样了。
我们拿到的文本是一个方便人阅读的文本,里面有标点符号等,所以首先得去掉标点符号,并用结巴分词进行分词。
去除标点其实可以不用罗列所有标点再去除,只需要将文本转为 UTF-8 编码,然后用正则表达式将中文提取出来即可,因为中文的UFT-8的编码范围为 [\u4e00-\u9fa5]
,而且,正则表达式能够将标点符号分开的每一句话都提取出来并分别放在列表中,如句子 “去打游戏好不好,人总是向死而生”
一句话中,使用正则将中文提出后为 ["去打游戏好不好","人总是向死而生"]
,这样每一个句子都进行了分开,分词的时候就可以遍历每一个句子来进行分词,否则如果将其全部连接成字符串,就成了 "去打游戏好不好人总是向死而生"
,这样很可能将 好人
分成一个词,就对原文意思进行了曲解,使得词的表达能力变弱了。代码如下:
import re
import jieba
def make_file(file):
stopwords = {}
with open('data/Stop.txt', 'r', encoding='utf-8', errors='ingnore') as f:
for eachWord in f:
stopwords[eachWord.strip()] = eachWord.strip() # 创建停用词典
with open('data/{}'.format(file), 'r', encoding='utf-8', errors='ignore') as f:
content = f.read()
# Unicode的\u4e00-\u9fa5为中文,用正则表达式将中文抽出,达到去除标点符号的作用
content = re.findall('[\u4e00-\u9fa5]+', content)
concentrate = []
# 遍历每一个句子,对每一个句子进行分词,效果更好
for i in content:
seg_sentence = jieba.cut(i, cut_all=False)
concentrate.extend([j for j in seg_sentence])
main_content = []
# 去除停用词
for i in concentrate:
if i not in stopwords:
main_content.append(i)
# 存储词的列表,词与词用空格隔开
with open("data/test.txt", 'w', encoding='utf-8') as f:
f.write(' '.join(main_content))
make_file('douluo-utf.txt')
这里我之所以对整个文章进行读取,而不是按行进行读取,是为了提高准确率,否则如果一个词前一个字和后一个字分别在两行,这个词也就被硬生生的拆散了,当然,这样也加重了内存的压力,也是因为文章小的原因才能这样做。
分词后的文本长这样,可以看到还是比较准确的。
接下来将后面要用到的数据都进行整理一下。首先,负采样的公式如下 P ( w i ) = f ( w i ) 3 / 4 ∑ j = 0 n ( f ( w j ) 3 / 4 ) P(w_i)=\frac{f(w_i)^{3/4}}{\sum_{j=0}^n(f(w_j)^{3/4})} P(wi)=∑j=0n(f(wj)3/4)f(wi)3/4 f ( w ) f(w) f(w)代表 每个单词被赋予的一个权重,即它单词出现的词频。所以,我们要首先计算词频。还需要对窗口大小以及词的数量进行设定,这里设定词的数量为10000,窗口大小为2,窗口大小含义为左边采样两个词,右边采样两个词。将单词编号,得到编号与单词的映射,计算词频的 3 / 4 3/4 3/4 次方,代码如下。
from collections import Counter
import numpy as np
#下面为超参数
# 最大词数量
MAX_SIZE = 10000
# 训练的词向量维度
embedding_size = 30
# 单边窗口数
C = 3
# 每个词负采样个数
K = 15
batch_size = 32
lr = 1e-3
epoch = 100
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
with open("data/test.txt", 'r', encoding='utf-8') as f:
content = f.read().split(" ")
# Counter 能将列表打包为每个单词出现次数的元组,减1的1是留给Unknown的
words = dict(Counter(content).most_common(MAX_SIZE-1))
# 设定一个 ,即unknown项,将没用到的全放进去
words['' ] = len(content) - np.sum(list(words.values()))
# word:id
word2idx = {word:i for i, word in enumerate(words.keys())}
# id:word
idx2word = {i:word for i, word in enumerate(words.keys())}
# 每个单词出现次数的列表
word_counts = np.array([count for count in words.values()], dtype=np.float32)
# 词频
word_freqs = word_counts / np.sum(word_counts)
# 词频的 四分之三 次方
word_freqs = word_freqs ** (3./4.)
进行完数据的准备后,就该创建数据管道了。数据管道我们采用重写Dataset的方法来进行构建。
数据管道是为了将数据一批一批的拿进模型里面,而模型的话有两种方式进行构建,模型的示意图如下所示。
很显然,根据模型,由于隐藏层只有一层,我们可以直接使用两个线性层构建模型,这样的话传入模型的参数就是One-hot编码,然后用交叉熵作为损失函数。pytorch中还提供了一个 nn.Embedding
层,专门用于训练词向量的层,传入这个层中时,可以不用传入One-hot编码,只用指明有多少维度,要训练成多少维度即可,传入的就可以是数字。所以,根据模型的构建不同,这里数据管道处理的数据也不同。
这里我们选择使用 nn.Embedding
层来构建模型,所以,数据管道这里反而相对简单一些,
from torch.utils.data import Dataset, DataLoader
import torch
class WordEmbeddingDataset(Dataset):
def __init__(self, text, word2idx, word_freqs):
super(WordEmbeddingDataset, self).__init__() # #通过父类初始化模型,然后重写两个方法
# text转为编号,如果存在就获取ID,否则使用unknown的ID
self.text_encoded = [word2idx.get(word, word2idx['' ]) for word in text]
# nn.Embedding需要传入LongTensor类型
self.text_encoded = torch.LongTensor(self.text_encoded)
self.word2idx = word2idx
self.word_freqs = torch.Tensor(word_freqs)
def __len__(self):
return len(self.text_encoded) # 返回所有单词的总数,即item的总数
def __getitem__(self, idx):
# 取得中心词ID
center_words = self.text_encoded[idx]
# 先取得中心左右各C个词的索引
pos_indices = list(range(idx - C, idx)) + list(range(idx + 1, idx + C + 1))
# 为了避免索引越界,所以进行取余处理
pos_indices = [i % len(self.text_encoded) for i in pos_indices]
# tensor(list)背景词表索引
pos_words = self.text_encoded[pos_indices]
# multinomial表示对self.word_freqs按照概率大小抽取K * pos_words.shape[0]个样本,True表示是放回抽取
# 每采样一个正确的单词(positive word),就采样K个错误的单词(negative word),pos_words.shape[0]是正确单词数量
neg_words = torch.multinomial(self.word_freqs, K * pos_words.shape[0], True)
# while 循环是为了保证 neg_words中不能包含背景词,所以转为集合求交集
while len(set(np.array(pos_indices).tolist()) & set(np.array(neg_words).tolist())) > 0:
neg_words = torch.multinomial(self.word_freqs, K * pos_words.shape[0], True)
return center_words, pos_words, neg_words
words_dataset = WordEmbeddingDataset(content, word2idx, word_freqs)
dataloader = DataLoader(words_dataset, batch_size, shuffle=True)
对数据管道进行一次输出,可以看到输出的数据如下
for center_words, pos_words, neg_words in dataloader:
print("中心词:{}".format(center_words))
print("背景词:{}".format(pos_words))
print("负采样:{}".format(neg_words))
break
中心词:
tensor([ 37, 1251, 650, 170, 29, 67, 112, 932, 195, 9806, 8780, 1959,
18, 2551, 171, 523, 893, 69, 2068, 800, 115, 19, 9999, 584,
9999, 2164, 91, 619, 8661, 3344, 113, 3004])
背景词:
tensor([[ 201, 0, 126, 3030, 586, 3843],
[9999, 6004, 78, 331, 9999, 184],
[ 9, 84, 9999, 645, 805, 768],
…,
[9999, 7533, 145, 9676, 2993, 8],
[ 151, 9999, 28, 2297, 15, 60],
[ 527, 3000, 52, 104, 99, 398]])
负采样:
tensor([[9999, 961, 9653, …, 726, 1331, 6246],
[ 0, 5964, 895, …, 620, 100, 8784],
[ 360, 778, 2575, …, 4318, 6176, 1126],
…,
[1534, 4786, 8284, …, 376, 3057, 6588],
[7024, 2808, 3, …, 144, 8711, 9950],
[ 1, 279, 1375, …, 3, 3718, 376]])
模型的构建实际只需要两个 nn.Embedding
层就可以了,一个是中心词的权重,一个是背景词权重。
import torch.nn as nn
import torch.nn.functional as F
class EmbeddingModel(nn.Module):
def __init__(self, vocab_size, embed_size):
super(EmbeddingModel, self).__init__()
self.vocab_size = vocab_size
self.embed_size = embed_size
# 构建时指定输入的词为多少维,即MAX_SIZE大小,以及词维度,即embedding_size
self.in_embed = nn.Embedding(self.vocab_size, self.embed_size)
self.out_embed = nn.Embedding(self.vocab_size, self.embed_size)
def forward(self, input_labels, pos_labels, neg_labels):
''' input_labels: center words, [batch_size]
pos_labels: positive words, [batch_size, (window_size * 2)]
neg_labels:negative words, [batch_size, (window_size * 2 * K)]
return: loss, [batch_size]
'''
input_embedding = self.in_embed(input_labels) # [batch_size, embed_size]
pos_embedding = self.out_embed(pos_labels) # [batch_size, (window * 2), embed_size]
neg_embedding = self.out_embed(neg_labels) # [batch_size, (window * 2 * K), embed_size]
input_embedding = input_embedding.unsqueeze(2) # [batch_size, embed_size, 1]
# bmm即矩阵相乘,不过第一维不变,只乘后两维
pos_dot = torch.bmm(pos_embedding, input_embedding) # [batch_size, (window * 2), 1]
pos_dot = pos_dot.squeeze(2) # [batch_size, (window * 2)]
neg_dot = torch.bmm(neg_embedding, -input_embedding) # [batch_size, (window * 2 * K), 1]
neg_dot = neg_dot.squeeze(2) # batch_size, (window * 2 * K)]
# 计算正样本损失
log_pos = F.logsigmoid(pos_dot).sum(1) # [batch_size]
# 计算负样本损失
log_neg = F.logsigmoid(neg_dot).sum(1)
loss = log_pos + log_neg
return -loss
# 我们实际只需要训练出的中心词矩阵就可以了
def get_embedding(self):
return self.in_embed.weight.cpu().detach().numpy()
model = EmbeddingModel(MAX_SIZE, embedding_size).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
这里计算正样本损失以及负样本损失都是根据负样本损失的公式来的,详情可以看我上一篇原理详解的文章,公式有推导,推导出来公式如下:
log
P
(
w
0
∣
w
c
)
=
log
(
1
1
+
e
u
0
T
v
c
)
+
∑
i
=
1
m
log
(
1
1
+
e
u
i
T
v
c
)
logP(w0|wc)=log(11+euT0vc)+m∑i=1log(11+euTivc)
logP(w0∣wc)=log(1+eu0Tvc1)+i=1∑mlog(1+euiTvc1)所以计算出来的损失要正负样本损失相加。
最后就是模型训练了,这里因为是无监督学习,没有验证集,所以训练起来非常容易:
def train_model():
#训练模型
for e in range(epoch):
for i, (input_labels, pos_labels, neg_labels) in enumerate(dataloader):
input_labels = input_labels.long().to(device)
pos_labels = pos_labels.long().to(device)
neg_labels = neg_labels.long().to(device)
optimizer.zero_grad()
loss = model(input_labels, pos_labels, neg_labels).mean()
loss.backward()
optimizer.step()
if i % 1000 == 0:
print('epoch', e, 'iteration', i, loss.item())
# 保存模型
torch.save(model.state_dict(), "data/embedding-{}.th".format(embedding_size))
train_model()
这文本还是挺大的,因为训练也要花时间,我就只跑了5个epoch,5个epoch肯定没有到收敛,所以我也就没有保存最优模型的步骤了,实际训练可以添加保存最优损失的模型这一步。不过5个epoch已经可以大致看看效果了。
def find_word(word):
'''
计算并输出与输入词最相关的100个词
:param word: 输入词
:return:
'''
# 加载模型
model = EmbeddingModel(MAX_SIZE, embedding_size)
model.load_state_dict(torch.load("data/embedding-100.th"))
# 获取中心词矩阵
embedding_weight = model.get_embedding()
# 得到词与词向量的字典
word2embedding = {}
for i in words:
word2embedding[i] = embedding_weight[word2idx[i]]
# 得到输入词与其他词向量的余弦相似度
other = {}
for i in words:
if i == word:
continue
# 计算余弦相似度
other[i] = cosine_similarity(word2embedding[word].reshape(1,-1), word2embedding[i].reshape(1,-1))
# 对余弦相似度按从大到小排序
other = sorted(other.items(), key=lambda x: x[1], reverse=True)
count = 0
# 输出排序前100的相似度词语
for i, j in other:
print("({},{})".format(i,j))
count += 1
if count == 100:
break
find_word('大师')
模型输入如下,我就不全部粘贴了,大家看看效果就行了。
(弗兰德,[[0.81570786]])
(老师,[[0.7032538]])
(二龙,[[0.634623]])
(柳,[[0.6121346]])
(说,[[0.59906274]])
(这才,[[0.58481735]])
(秦明,[[0.5719741]])
(院长,[[0.5557458]])
(三人,[[0.5471796]])
(唐三,[[0.5343685]])
(点,[[0.5245141]])
(孩子,[[0.5202507]])
(想,[[0.51790005]])
(赵无极,[[0.514543]])
(道,[[0.48086473]])
(赶忙,[[0.48032683]])
(自然,[[0.47603428]])
(点头,[[0.47382873]])
(史莱克,[[0.47114572]])
(告诉,[[0.46480244]])
(一旁,[[0.46352047]])
(怪物,[[0.4612395]])
(父亲,[[0.46009806]])
这里仅仅是从零给大家搭建一个Word2vec模型,清楚Skip-gram算法的流程,如果真的想要训练一个Skip-gram模型,还有很多方法对其进行改进,比如权重初始化、批量归一化、调整超参数等方法,这些方法可以看看我关于网络优化的博客,当然,实现Word2vec最简单的也可以使用 gensim
库,里面内置有 Word2vec 的模型,只需要传入参数就可以训练,实现的方式更为简单快捷。
全部的代码可以在我的GitHub进行查看。