这里为了方便起见,只支持float类型的数据,而且只支持标量,不支持向量:
class Tensor:
def __init__(self, data, requires_grad=False):
"""
初始化Tensor,只支持float类型的标量
"""
if not type(data) is float:
raise RuntimeError(f'invalid data type {type(data).__name__}')
self.data = data
self.requires_grad = requires_grad
# 初始化梯度
self.grad = None
# 用于计算图的操作算子
self._op = None
当清楚梯度时,直接将梯度设置为None
def zero_grad(self):
self.grad = None
为了方便调试,重写了__str__
和__repr
函数:
def __str__(self):
return f"kyTorch.Tensor({self.data:.4f}, requires_grad={self.requires_grad})"
def __repr__(self) -> str:
return f"Tensor({self.data:.4f}, requires_grad={self.requires_grad})"
测试:
a = Tensor(2., requires_grad=True)
print(a) # kyTorch.Tensor(2.0000, requires_grad=True)
b = Tensor(4)
print(b) # RuntimeError: invalid data type int
定义运算操作的基类,为了便于复用,并不直接使用forward做运算,而是使用apply。
class _Function:
def __init__(self, *tensors):
# 该操作所依赖的所有tensor
self.depends = [tensor for tensor in tensors]
# 需要在backward()中使用的Tensor的data
self.save_tensors_data = []
def save_for_backward(self, *data):
self.save_tensors_data.extend(data)
def forward(self, *args):
"""前向传播,进行真正的运算"""
raise NotImplementedError('must implement the forward')
def backward(self, grad):
"""反向传播,计算梯度"""
raise NotImplementedError('must implement the forward')
@staticmethod
def apply(cls, *tensors):
from kygrad.tensor import Tensor
op = cls(*tensors) # 实例化对象,cls是类,op是对象
res = Tensor(op.forward(*[tensor.data for tensor in tensors]),
requires_grad=any([tensor.requires_grad for tensor in tensors]))
if res.requires_grad:
res._op = op
return res
定义加法:
class Add(_Function):
def forward(self, x, y):
return x + y
def backward(self, grad):
"""
z = x + y
∂L/∂x = ∂L/∂z * ∂z/∂x
∂z/∂x等于1,这里的grad是指∂L/∂z
"""
return grad, grad
class Sub(_Function):
def forward(self, x, y):
return x - y
def backward(self, grad):
return grad, -grad
定义乘法:
class Mul(_Function):
def forward(self, x, y):
self.save_for_backward(x, y)
return x * y
def backward(self, grad):
"""
z = x * y
∂L/∂x = ∂L/∂z * ∂z/∂x
∂L/∂y = ∂L/∂z * ∂z/∂y
∂L/∂z是grad,∂z/∂x等于y,∂z/∂y等于x
"""
x, y = self.save_tensors_data
# 分别返回∂L/∂x 和 ∂L/∂y
return grad * y, grad * x
在Tensor中重写__add__
和__mul__
函数:
def __add__(self, other):
# 都转成Tensor进行运算
other = other if isinstance(other, Tensor) else Tensor(other)
return Add.apply(Add, self, other)
def __mul__(self, other):
other = other if isinstance(other, Tensor) else Tensor(other)
return Mul.apply(Mul, self, other)
测试:
a = Tensor(2., requires_grad=True)
b = Tensor(4., requires_grad=False)
print(a * b) # kyTorch.Tensor(8.0000, requires_grad=True)
print((a + b) * (b + 1.0)) # kyTorch.Tensor(30.0000, requires_grad=True)
但是1.0 + a
会报错TypeError: unsupported operand type(s) for +: 'float' and 'Tensor'
,那就还需要重写__radd__
和__rmul__
函数,此外,为了实现a += 1.0
这种原地操作,还需要重写__iadd__
和__imul__
:
def __radd__(self, other):
other = other if isinstance(other, Tensor) else Tensor(other)
# 注意这里other和self反过来传参了
return Add.apply(Add, other, self)
def __iadd__(self, other):
other = other if isinstance(other, Tensor) else Tensor(other)
return Add.apply(Add, self, other)
def __rmul__(self, other):
other = other if isinstance(other, Tensor) else Tensor(other)
return Mul.apply(Mul, other, self)
def __imul__(self, other):
other = other if isinstance(other, Tensor) else Tensor(other)
return Mul.apply(Mul, self, other)
测试:
print(1.0 + a) # kyTorch.Tensor(3.0000, requires_grad=True)
a *= 3.2
print(a) # kyTorch.Tensor(6.4000, requires_grad=True)
但是这代码一点也不优雅,__add__
、__radd__
的代码都差不多,咱们的原则就是能让程序干的就让程序干,让其自动化重写魔法函数:
import inspect
import importlib
def _register(name, cls):
def func(*tensors):
tensors = [t if isinstance(t, Tensor) else Tensor(t) for t in tensors]
return cls.apply(cls, *tensors)
setattr(Tensor, name, func)
setattr(Tensor, f"__{name}__", func)
# 原地操作
setattr(Tensor, f"__i{name}__", func)
# other在操作符前,self在操作符后
setattr(Tensor, f"__r{name}__", lambda self, x: func(x, self))
def overwriteOps():
for name, cls in inspect.getmembers(importlib.import_module('kygrad.ops'), inspect.isclass):
name = name.lower()
if name in ['add', 'mul']:
_register(name, cls)
overwriteOps()
测试,没毛病,haha
print(1.0 + a) # kyTorch.Tensor(3.0000, requires_grad=True)
a *= 3.2
print(a) # kyTorch.Tensor(6.4000, requires_grad=True)
首先使用深度优先遍历计算图,返回所有需要计算梯度的非叶子结点:
# 遍历计算图
def _rev_topo_sort(self):
# 有_op的就是需要计算梯度的非叶子结点,它是由其他tensor一起计算得来的
# 返回所有有_op的,也就是返回需要计算梯度的所有非叶子结点
# dfs,被加入tensors数组的顺序是孩子(左 右) 根
def visit(tensor, visited, tensors):
# 标记为已访问
visited.append(tensor)
if tensor._op:
[visit(depend_tensor, visited, tensors)
for depend_tensor in tensor._op.depends if depend_tensor not in visited]
tensors.append(tensor)
return tensors
# 倒过来里面就是先是根 再是孩子(右 左)
return reversed(visit(self, [], []))
测试,以e = (a + b) * (b + 1)
为例:
a, b = Tensor(2., requires_grad=True), Tensor(1., requires_grad=True)
e = (a + b) * (b + 1.)
print(list(e._rev_topo_sort())) # [Tensor(6.0000, requires_grad=True), Tensor(2.0000, requires_grad=True), Tensor(3.0000, requires_grad=True)]
需要计算梯度的非叶子结点是e
、d
、c
,基于该拓扑结构,实现Tensor的backward
:
def backward(self):
"""实现Tensor的反向传播"""
# ∂L/∂L等于1
self.grad = Tensor(1.)
for t in self._rev_topo_sort():
grads = t._op.backward(t.grad.data)
for child, grad in zip(t._op.depends, grads):
if child.requires_grad:
# 默认会累加梯度
child.grad = child.grad + Tensor(grad) if child.grad else Tensor(grad)
测试,以e = (a + b) * (b + 1)
为例:
a, b = Tensor(2., requires_grad=True), Tensor(1., requires_grad=True)
e = (a + b) * (b + 1.)
e.backward()
print(a.grad) # kyTorch.Tensor(2.0000, requires_grad=False)
print(b.grad) # kyTorch.Tensor(5.0000, requires_grad=False)
f ( a , b ) = ( a + b ) ∗ ( b + 1 ) = a b + b 2 + a + b f(a, b) = (a + b) * ( b + 1) = ab + b^2 + a + b f(a,b)=(a+b)∗(b+1)=ab+b2+a+b,那么 ∂ f ∂ a = b + 1 \frac{∂f}{∂a} = b + 1 ∂a∂f=b+1, ∂ f ∂ b = a + 2 b + 1 \frac{∂f}{∂b} = a + 2b + 1 ∂b∂f=a+2b+1,与自动算出来的相同。
计算损失的时候还用到了Tensor的减法,所以再定义个减法:
class Sub(_Function):
def forward(self, x, y):
return x - y
def backward(self, grad):
return grad, -grad
为了训练模型,定义训练器,使用梯度下降:
class Optimizer:
def __init__(self, params):
self.params = params
def zero_grad(self):
for param in self.params:
param.zero_grad()
class SGD(Optimizer):
def __init__(self, params, lr=0.001):
super().__init__(params)
self.lr = lr
def step(self):
for param in self.params:
param.data -= self.lr * param.grad.data
有这样一组数据,大致呈线性关系。
from kygrad.tensor import Tensor
from kygrad.optim import SGD
w = Tensor(1., requires_grad=True)
b = Tensor(0., requires_grad=True)
def linreg(X):
"""线性回归模型"""
# y = w * x + b
y = []
for x in X:
y.append(w * x + b)
return y
def squared_loss(y_hat, y):
loss = Tensor(0.)
for i in range(len(y)):
loss += (y[i] - y_hat[i]) * (y[i] - y_hat[i])
return loss
areas = [64.4, 68., 74.1, 74., 76.9, 78.1, 78.6]
prices = [6.1, 6.25, 7.8, 6.66, 7.82, 7.14, 8.02]
epochs = 500
lr = 0.00001
optimizer = SGD([w, b], lr=lr)
train_loss = []
for epoch in range(epochs):
y_hat = linreg(areas)
l = squared_loss(y_hat, prices)
optimizer.zero_grad()
l.backward()
optimizer.step()
train_loss.append(l.data)
if epoch % 10 == 0:
print(f'epoch {epoch + 1}: loss={l.data:.4f}')
print(f"w: {w.data:.4f}, b: {b.data:.4f}")
print(train_loss[-1])
最后得到的w
和b
分别为0.0971和-0.0129,损失值为1.2595,损失值变化曲线如下:
画出线性回归拟合出来的直线:
给它加个特征,重新训练:
from kygrad.tensor import Tensor
from kygrad.optim import SGD
w1 = Tensor(1., requires_grad=True)
w2 = Tensor(1., requires_grad=True)
b = Tensor(0., requires_grad=True)
def linreg(X):
"""线性回归模型"""
# y = w1 * x1 + w2 * x2 + b
y = []
for x1, x2 in X:
y.append(w1 * x1 + w2 * x2 + b)
return y
def squared_loss(y_hat, y):
loss = Tensor(0.)
for i in range(len(y)):
loss += (y[i] - y_hat[i]) * (y[i] - y_hat[i])
return loss
areas = [64.4, 68., 74.1, 74., 76.9, 78.1, 78.6]
ages = [31., 21., 10., 24., 17., 16., 17.]
prices = [6.1, 6.25, 7.8, 6.66, 7.82, 7.14, 8.02]
epochs = 500
lr = 0.00001
optimizer = SGD([w1, w2, b], lr=lr)
train_loss = []
for epoch in range(epochs):
y_hat = linreg(zip(areas, ages))
l = squared_loss(y_hat, prices)
optimizer.zero_grad()
l.backward()
optimizer.step()
train_loss.append(l.data)
if epoch % 10 == 0:
print(f'epoch {epoch + 1}: loss={l.data:.4f}')
print(f"w1: {w1.data:.4f}, w2:{w2.data:.4f}, b: {b.data:.4f}")
print(train_loss[-1])
得到的结果为w1: 0.0992, w2:-0.0080, b: -0.0177
,损失值为1.0956,与之前相比有所下降。
损失值变化曲线(没有画前四次的,初始的损失值太高了,画的话跟之前的一样,断崖式)如下:
通过自己实现自动求导并应用到线性回归实例中,有助于深入理解深度学习框架的底层实现,而不是仅仅把它当作黑盒。
本文的Tensor只能支持浮点类型的标量,而且只有简单的加法、减法、乘法运算,还有许多功能没有实现,比如一元运算、矩阵运算、聚合运算等等,不过总体实现流程是相似的。
参考资料:
https://github.com/karpathy/micrograd
https://github.com/geohot/tinygrad
https://github.com/nlp-greyfoss/metagrad