本节主要以实验为主
一、学习数据的准备
1、将语料库的文本转化为单词ID
- import numpy as np
-
- def preprocess( text ):
- text = text.lower() # 大写字母变为小写字母
- text = text.replace('.', ' .') # 用‘ .’代替‘.’
- words = text.split(' ') # 以空格为分隔条件,对语句进行分隔。
-
- word_to_id = {}
- id_to_word = {}
- for word in words:
- if word not in word_to_id:
- new_id = len(word_to_id)
- word_to_id[word] = new_id
- id_to_word[new_id] = word
- print(word_to_id)
- corpus = np.array([word_to_id[w] for w in words])
- print(corpus)
-
- text = "You say goodbye and I say hello."
- preprocess(text)
输出结果:
word_to_id字典中保存的是单词与ID的对应关系;corpus数组中保存的是:将单词用ID替换。
2、从单词ID列表corpus生成contexts和target
word2vec中使用的神经网络的输入是上下文,它的正确解标签是被这些上下文包围在中间的单词,即目标词。我们要做的事情是:当向神经网络输入上下文时,使目标词出现的概率高。故从语料库生成上下文和目标词。
代码实现:
- import numpy as np
-
- def preprocess( text ):
- text = text.lower() # 大写字母变为小写字母
- text = text.replace('.', ' .') # 用‘ .’代替‘.’
- words = text.split(' ') # 以空格为分隔条件,对语句进行分隔。
-
- word_to_id = {}
- id_to_word = {}
- for word in words:
- if word not in word_to_id:
- new_id = len(word_to_id)
- word_to_id[word] = new_id
- id_to_word[new_id] = word
-
- corpus = np.array([word_to_id[w] for w in words])
- return corpus, word_to_id, id_to_word
-
- def create_conext_target(corpus, windows_size=1): # 单词ID列表和上下文的窗口大小
- target = corpus[windows_size:-windows_size]
- contexts = []
-
- for idx in range(windows_size, len(corpus)-windows_size):
- cs = []
- for t in range(-windows_size, windows_size+1):
- if t == 0:
- continue
- cs.append(corpus[idx + t])
- contexts.append(cs)
-
- return np.array(contexts), np.array(target)
-
-
-
- text = "You say goodbye and I say hello."
- corpus, word_to_id, id_to_word = preprocess(text)
- contexts, target = create_conext_target(corpus, windows_size=1)
- print(contexts)
- print(target)
结果:
3、转化为one-hot表示
将上下文和目标词转化为one-hot表示,如下图所示:
代码实现:
- import numpy as np
-
- def preprocess( text ): # 预处理:生成单词ID,单词ID列表
- text = text.lower() # 大写字母变为小写字母
- text = text.replace('.', ' .') # 用‘ .’代替‘.’
- words = text.split(' ') # 以空格为分隔条件,对语句进行分隔。
-
- word_to_id = {}
- id_to_word = {}
- for word in words:
- if word not in word_to_id:
- new_id = len(word_to_id)
- word_to_id[word] = new_id
- id_to_word[new_id] = word
-
- corpus = np.array([word_to_id[w] for w in words])
- return corpus, word_to_id, id_to_word
-
- def create_conext_target(corpus, windows_size=1): # 生成上下文和目标词
- target = corpus[windows_size:-windows_size]
- contexts = []
-
- for idx in range(windows_size, len(corpus)-windows_size):
- cs = []
- for t in range(-windows_size, windows_size+1):
- if t == 0:
- continue
- cs.append(corpus[idx + t])
- contexts.append(cs)
-
- return np.array(contexts), np.array(target)
-
- def convert_one_hot(corpus, vocab_size): # 转化为one_hot表示
- N = corpus.shape[0] # shape返回各个维度上的元素个数
- if corpus.ndim == 1: # ndim返回数组的维度
- one_hot = np.zeros((N, vocab_size), dtype=np.int32) #生成一个N行vocab_size列的零矩阵
- for idx, word_id in enumerate(corpus):
- one_hot[idx, word_id] = 1
-
- elif corpus.ndim == 2:
- C = corpus.shape[1]
- one_hot = np.zeros((N, C, vocab_size), dtype=np.int32)
- for idx_0, word_ids in enumerate(corpus):
- for idx_1, word_id in enumerate(word_ids):
- one_hot[idx_0, idx_1, word_id] = 1
-
- return one_hot
-
-
- text = "You say goodbye and I say hello."
- corpus, word_to_id, id_to_word = preprocess(text)
- contexts, target = create_conext_target(corpus, windows_size=1)
-
-
- vocab_size = len(word_to_id)
- target = convert_one_hot(target, vocab_size)
- contexts = convert_one_hot(contexts, vocab_size)
- print(target)
- print(contexts)
结果:
二、CBOW模型的实现
神经网络示意图:
- import numpy as np
- import matplotlib.pyplot as plt
- import time
-
- def preprocess( text ): # 预处理:生成单词ID,单词ID列表
- text = text.lower() # 大写字母变为小写字母
- text = text.replace('.', ' .') # 用‘ .’代替‘.’
- words = text.split(' ') # 以空格为分隔条件,对语句进行分隔。
-
- word_to_id = {}
- id_to_word = {}
- for word in words:
- if word not in word_to_id:
- new_id = len(word_to_id)
- word_to_id[word] = new_id
- id_to_word[new_id] = word
-
- corpus = np.array([word_to_id[w] for w in words])
- return corpus, word_to_id, id_to_word
-
- def create_conext_target(corpus, windows_size): # 生成上下文和目标词
- target = corpus[windows_size:-windows_size]
- contexts = []
-
- for idx in range(windows_size, len(corpus)-windows_size):
- cs = []
- for t in range(-windows_size, windows_size+1):
- if t == 0:
- continue
- cs.append(corpus[idx + t])
- contexts.append(cs)
-
- return np.array(contexts), np.array(target)
-
- def convert_one_hot(corpus, vocab_size): # 转化为one_hot表示
- N = corpus.shape[0] # shape返回各个维度上的元素个数
- if corpus.ndim == 1: # ndim返回数组的维度
- one_hot = np.zeros((N, vocab_size), dtype=np.int32) #生成一个N行vocab_size列的零矩阵
- for idx, word_id in enumerate(corpus):
- one_hot[idx, word_id] = 1
-
- elif corpus.ndim == 2:
- C = corpus.shape[1]
- one_hot = np.zeros((N, C, vocab_size), dtype=np.int32)
- for idx_0, word_ids in enumerate(corpus):
- for idx_1, word_id in enumerate(word_ids):
- one_hot[idx_0, idx_1, word_id] = 1
-
- return one_hot
-
- class MatMul:
- def __init__(self, W):
- self.params = [W]
- self.grads = [np.zeros_like(W)]
- self.x = None
-
- def forward(self, x):
- W, = self.params
- out = np.dot(x, W)
- self.x = x
- return out
-
- def backward(self, dout):
- W, = self.params
- dx = np.dot(dout, W.T)
- dW = np.dot(self.x.T, dout)
- self.grads[0][...] = dW
- return dx
-
- def softmax(x):
- if x.ndim == 2:
- x = x - x.max(axis=1, keepdims=True)
- x = np.exp(x)
- x /= x.sum(axis=1, keepdims=True)
- elif x.ndim == 1:
- x = x - np.max(x)
- x = np.exp(x) / np.sum(np.exp(x))
-
- return x
-
-
- def cross_entropy_error(y, t):
- if y.ndim == 1:
- t = t.reshape(1, t.size)
- y = y.reshape(1, y.size)
-
- # 在监督标签为one-hot-vector的情况下,转换为正确解标签的索引
- if t.size == y.size:
- t = t.argmax(axis=1)
-
- batch_size = y.shape[0]
-
- return -np.sum(np.log(y[np.arange(batch_size), t] + 1e-7)) / batch_size
-
-
- class SoftmaxWithLoss:
- def __init__(self):
- self.params, self.grads = [], []
- self.y = None # softmax的输出
- self.t = None # 监督标签
-
- def forward(self, x, t):
- self.t = t
- self.y = softmax(x)
-
- # 在监督标签为one-hot向量的情况下,转换为正确解标签的索引
- if self.t.size == self.y.size:
- self.t = self.t.argmax(axis=1)
-
- loss = cross_entropy_error(self.y, self.t)
- return loss
-
- def backward(self, dout):
- batch_size = self.t.shape[0]
-
- dx = self.y.copy()
- dx[np.arange(batch_size), self.t] -= 1
- dx *= dout
- dx = dx / batch_size
- return dx
-
- class Trainer:
- def __init__(self, model, optimizer):
- self.model = model
- self.optimizer = optimizer
- self.loss_list = []
- self.eval_interval = None
- self.current_epoch = 0
-
- def fit(self, x, t, max_epoch, batch_size, max_grad=None, eval_interval=20):
- data_size = len(x)
- print(data_size)
- max_iters = data_size // batch_size
- print(max_iters)
- self.eval_interval = eval_interval
- model, optimizer = self.model, self.optimizer
- total_loss = 0
- loss_count = 0
-
- start_time = time.time()
- for epoch in range(max_epoch):
- # 打乱
- idx = np.random.permutation(np.arange(data_size))
- x = x[idx]
- t = t[idx]
-
- for iters in range(max_iters):
- batch_x = x[iters*batch_size:(iters+1)*batch_size]
- batch_t = t[iters*batch_size:(iters+1)*batch_size]
-
- # 计算梯度,更新参数
- loss = model.forward(batch_x, batch_t)
- model.backward()
- params, grads = remove_duplicate(model.params, model.grads) # 将共享的权重整合为1个
- if max_grad is not None:
- clip_grads(grads, max_grad)
- optimizer.update(params, grads)
- total_loss += loss
- loss_count += 1
-
- # 评价
- if (eval_interval is not None) and (iters % eval_interval) == 0:
- avg_loss = total_loss / loss_count
- elapsed_time = time.time() - start_time
- print('| epoch %d | iter %d / %d | time %d[s] | loss %.2f'
- % (self.current_epoch + 1, iters + 1, max_iters, elapsed_time, avg_loss))
- self.loss_list.append(float(avg_loss))
- total_loss, loss_count = 0, 0
-
- self.current_epoch += 1
-
- def plot(self, ylim=None):
- x = np.arange(len(self.loss_list))
- if ylim is not None:
- plt.ylim(*ylim)
- plt.plot(x, self.loss_list, label='train')
- plt.xlabel('iterations (x' + str(self.eval_interval) + ')')
- plt.ylabel('loss')
- plt.show()
-
- def remove_duplicate(params, grads):
- '''
- 将参数列表中重复的权重整合为1个,
- 加上与该权重对应的梯度
- '''
- params, grads = params[:], grads[:] # copy list
-
- while True:
- find_flg = False
- L = len(params)
-
- for i in range(0, L - 1):
- for j in range(i + 1, L):
- # 在共享权重的情况下
- if params[i] is params[j]:
- grads[i] += grads[j] # 加上梯度
- find_flg = True
- params.pop(j)
- grads.pop(j)
- # 在作为转置矩阵共享权重的情况下(weight tying)
- elif params[i].ndim == 2 and params[j].ndim == 2 and \
- params[i].T.shape == params[j].shape and np.all(params[i].T == params[j]):
- grads[i] += grads[j].T
- find_flg = True
- params.pop(j)
- grads.pop(j)
-
- if find_flg: break
- if find_flg: break
-
- if not find_flg: break
-
- return params, grads
-
-
- def clip_grads(grads, max_norm):
- total_norm = 0
- for grad in grads:
- total_norm += np.sum(grad ** 2)
- total_norm = np.sqrt(total_norm)
-
- rate = max_norm / (total_norm + 1e-6)
- if rate < 1:
- for grad in grads:
- grad *= rate
-
-
- class SimpleCBOW:
- def __init__(self, vocab_size, hidden_size):
- V, H = vocab_size, hidden_size
-
- # 初始化权重
- W_in = 0.01 * np.random.randn(V, H).astype('f')
- W_out = 0.01 * np.random.randn(H, V).astype('f')
-
- # 生成层
- self.in_layer0 = MatMul(W_in)
- self.in_layer1 = MatMul(W_in)
- self.out_layer = MatMul(W_out)
- self.loss_layer = SoftmaxWithLoss()
-
- # 将所有的权重和梯度整理到列表中
- layers = [self.in_layer0, self.in_layer1, self.out_layer]
- self.params, self.grads = [], []
- for layer in layers:
- self.params += layer.params
- self.grads += layer.grads
-
- # 将单词的分布式表示设置为成员变量
- self.word_vecs = W_in
-
- def forward(self, contexts, target):
- h0 = self.in_layer0.forward(contexts[:, 0])
- h1 = self.in_layer1.forward(contexts[:, 1])
- h = (h0 + h1) * 0.5
- score = self.out_layer.forward(h)
- loss = self.loss_layer.forward(score, target)
- return loss
-
- def backward(self, dout=1):
- ds = self.loss_layer.backward(dout)
- da = self.out_layer.backward(ds)
- da *= 0.5
- self.in_layer1.backward(da)
- self.in_layer0.backward(da)
- return None
-
-
- class Adam:
- '''
- Adam (http://arxiv.org/abs/1412.6980v8)
- '''
-
- def __init__(self, lr=0.001, beta1=0.9, beta2=0.999):
- self.lr = lr
- self.beta1 = beta1
- self.beta2 = beta2
- self.iter = 0
- self.m = None
- self.v = None
-
- def update(self, params, grads):
- if self.m is None:
- self.m, self.v = [], []
- for param in params:
- self.m.append(np.zeros_like(param))
- self.v.append(np.zeros_like(param))
-
- self.iter += 1
- lr_t = self.lr * np.sqrt(1.0 - self.beta2 ** self.iter) / (1.0 - self.beta1 ** self.iter)
-
- for i in range(len(params)):
- self.m[i] += (1 - self.beta1) * (grads[i] - self.m[i])
- self.v[i] += (1 - self.beta2) * (grads[i] ** 2 - self.v[i])
-
- params[i] -= lr_t * self.m[i] / (np.sqrt(self.v[i]) + 1e-7)
-
-
- windows_size = 1
- hidden_size = 5
- batch_size = 3
- max_epoch = 1000
-
- text = "You say goodbye and I say hello."
- corpus, word_to_id, id_to_word = preprocess(text)
- vocab_size = len(word_to_id)
- contexts, target = create_conext_target(corpus, windows_size)
-
- target = convert_one_hot(target, vocab_size)
- contexts = convert_one_hot(contexts, vocab_size)
-
- model =SimpleCBOW(vocab_size, hidden_size)
- optimizer = Adam()
- trainer = Trainer(model, optimizer)
-
- trainer.fit(contexts, target, max_epoch, batch_size)
- trainer.plot()
-
- word_vecs = model.word_vecs
- for word_id, word in id_to_word.items():
- print(word, word_vecs[word_id])
训练结果:
单词的分布式表示: