• 从零实现自动求导以及线性回归实例


    一、定义自己的Tensor

    这里为了方便起见,只支持float类型的数据,而且只支持标量,不支持向量:

    class Tensor:
        def __init__(self, data, requires_grad=False):
            """
            初始化Tensor,只支持float类型的标量
            """
            if not type(data) is float:
                raise RuntimeError(f'invalid data type {type(data).__name__}')
            self.data = data
            self.requires_grad = requires_grad
            # 初始化梯度
            self.grad = None
            # 用于计算图的操作算子
            self._op = None
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    当清楚梯度时,直接将梯度设置为None

        def zero_grad(self):
            self.grad = None
    
    • 1
    • 2

    为了方便调试,重写了__str____repr函数:

        def __str__(self):
            return f"kyTorch.Tensor({self.data:.4f}, requires_grad={self.requires_grad})"
    
        def __repr__(self) -> str:
            return f"Tensor({self.data:.4f}, requires_grad={self.requires_grad})"
    
    • 1
    • 2
    • 3
    • 4
    • 5

    测试:

    a = Tensor(2., requires_grad=True)
    print(a)	# kyTorch.Tensor(2.0000, requires_grad=True)
    b = Tensor(4)
    print(b)	# RuntimeError: invalid data type int
    
    • 1
    • 2
    • 3
    • 4

    二、实现Tensor的加法和乘法

    定义运算操作的基类,为了便于复用,并不直接使用forward做运算,而是使用apply。

    class _Function:
        def __init__(self, *tensors):
            # 该操作所依赖的所有tensor
            self.depends = [tensor for tensor in tensors]
            # 需要在backward()中使用的Tensor的data
            self.save_tensors_data = []
    
        def save_for_backward(self, *data):
            self.save_tensors_data.extend(data)
    
        def forward(self, *args):
            """前向传播,进行真正的运算"""
            raise NotImplementedError('must implement the forward')
    
        def backward(self, grad):
            """反向传播,计算梯度"""
            raise NotImplementedError('must implement the forward')
    
        @staticmethod
        def apply(cls, *tensors):
            from kygrad.tensor import Tensor
            op = cls(*tensors)  # 实例化对象,cls是类,op是对象
            res = Tensor(op.forward(*[tensor.data for tensor in tensors]),
                         requires_grad=any([tensor.requires_grad for tensor in tensors]))
    
            if res.requires_grad:
                res._op = op
            return res
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    定义加法:
    请添加图片描述

    class Add(_Function):
        def forward(self, x, y):
            return x + y
    
        def backward(self, grad):
            """
            z = x + y
            ∂L/∂x = ∂L/∂z * ∂z/∂x
            ∂z/∂x等于1,这里的grad是指∂L/∂z
            """
            return grad, grad
    
    
    class Sub(_Function):
        def forward(self, x, y):
            return x - y
    
        def backward(self, grad):
            return grad, -grad
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    定义乘法:
    请添加图片描述

    class Mul(_Function):
        def forward(self, x, y):
            self.save_for_backward(x, y)
            return x * y
    
        def backward(self, grad):
            """
            z = x * y
            ∂L/∂x = ∂L/∂z * ∂z/∂x
            ∂L/∂y = ∂L/∂z * ∂z/∂y
            ∂L/∂z是grad,∂z/∂x等于y,∂z/∂y等于x
            """
            x, y = self.save_tensors_data
            # 分别返回∂L/∂x 和 ∂L/∂y
            return grad * y, grad * x
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    在Tensor中重写__add____mul__函数:

        def __add__(self, other):
            # 都转成Tensor进行运算
            other = other if isinstance(other, Tensor) else Tensor(other)
            return Add.apply(Add, self, other)
        
        def __mul__(self, other):
            other = other if isinstance(other, Tensor) else Tensor(other)
            return Mul.apply(Mul, self, other)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    测试:

    a = Tensor(2., requires_grad=True)
    b = Tensor(4., requires_grad=False)
    print(a * b)	# kyTorch.Tensor(8.0000, requires_grad=True)
    print((a + b) * (b + 1.0))	# kyTorch.Tensor(30.0000, requires_grad=True)
    
    • 1
    • 2
    • 3
    • 4

    但是1.0 + a会报错TypeError: unsupported operand type(s) for +: 'float' and 'Tensor',那就还需要重写__radd____rmul__函数,此外,为了实现a += 1.0这种原地操作,还需要重写__iadd____imul__

        def __radd__(self, other):
            other = other if isinstance(other, Tensor) else Tensor(other)
            # 注意这里other和self反过来传参了
            return Add.apply(Add, other, self)
    
        def __iadd__(self, other):
            other = other if isinstance(other, Tensor) else Tensor(other)
            return Add.apply(Add, self, other)
        
        def __rmul__(self, other):
            other = other if isinstance(other, Tensor) else Tensor(other)
            return Mul.apply(Mul, other, self)
    
        def __imul__(self, other):
            other = other if isinstance(other, Tensor) else Tensor(other)
            return Mul.apply(Mul, self, other)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    测试:

    print(1.0 + a)	# kyTorch.Tensor(3.0000, requires_grad=True)
    a *= 3.2
    print(a)		# kyTorch.Tensor(6.4000, requires_grad=True)
    
    • 1
    • 2
    • 3

    但是这代码一点也不优雅,__add____radd__的代码都差不多,咱们的原则就是能让程序干的就让程序干,让其自动化重写魔法函数:

    import inspect
    import importlib
    
    def _register(name, cls):
        def func(*tensors):
            tensors = [t if isinstance(t, Tensor) else Tensor(t) for t in tensors]
            return cls.apply(cls, *tensors)
        
        setattr(Tensor, name, func)
        setattr(Tensor, f"__{name}__", func)
        # 原地操作
        setattr(Tensor, f"__i{name}__", func)
        # other在操作符前,self在操作符后
        setattr(Tensor, f"__r{name}__", lambda self, x: func(x, self))
    
    
    def overwriteOps():
        for name, cls in inspect.getmembers(importlib.import_module('kygrad.ops'), inspect.isclass):
            name = name.lower()
            if name in ['add', 'mul']:
                _register(name, cls)
    
    
    overwriteOps()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    测试,没毛病,haha

    print(1.0 + a)	# kyTorch.Tensor(3.0000, requires_grad=True)
    a *= 3.2
    print(a)		# kyTorch.Tensor(6.4000, requires_grad=True)
    
    • 1
    • 2
    • 3

    三、实现Tensor的反向传播

    首先使用深度优先遍历计算图,返回所有需要计算梯度的非叶子结点:

        # 遍历计算图
        def _rev_topo_sort(self):
            # 有_op的就是需要计算梯度的非叶子结点,它是由其他tensor一起计算得来的
            # 返回所有有_op的,也就是返回需要计算梯度的所有非叶子结点
            # dfs,被加入tensors数组的顺序是孩子(左 右) 根
            def visit(tensor, visited, tensors):
                # 标记为已访问
                visited.append(tensor)
                if tensor._op:
                    [visit(depend_tensor, visited, tensors)
                     for depend_tensor in tensor._op.depends if depend_tensor not in visited]
                    tensors.append(tensor)
                return tensors
            # 倒过来里面就是先是根 再是孩子(右 左)
            return reversed(visit(self, [], []))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    测试,以e = (a + b) * (b + 1)为例:

    a, b = Tensor(2., requires_grad=True), Tensor(1., requires_grad=True)
    e = (a + b) * (b + 1.)
    print(list(e._rev_topo_sort()))	# [Tensor(6.0000, requires_grad=True), Tensor(2.0000, requires_grad=True), Tensor(3.0000, requires_grad=True)]
    
    • 1
    • 2
    • 3

    需要计算梯度的非叶子结点是edc,基于该拓扑结构,实现Tensor的backward

        def backward(self):
            """实现Tensor的反向传播"""
            # ∂L/∂L等于1
            self.grad = Tensor(1.)
            for t in self._rev_topo_sort():
                grads = t._op.backward(t.grad.data)
                for child, grad in zip(t._op.depends, grads):
                    if child.requires_grad:
                        # 默认会累加梯度
                        child.grad = child.grad + Tensor(grad) if child.grad else Tensor(grad)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    测试,以e = (a + b) * (b + 1)为例:

    a, b = Tensor(2., requires_grad=True), Tensor(1., requires_grad=True)
    e = (a + b) * (b + 1.)
    e.backward()
    print(a.grad)	# kyTorch.Tensor(2.0000, requires_grad=False)
    print(b.grad)	# kyTorch.Tensor(5.0000, requires_grad=False)
    
    • 1
    • 2
    • 3
    • 4
    • 5

    f ( a , b ) = ( a + b ) ∗ ( b + 1 ) = a b + b 2 + a + b f(a, b) = (a + b) * ( b + 1) = ab + b^2 + a + b f(a,b)=(a+b)(b+1)=ab+b2+a+b,那么 ∂ f ∂ a = b + 1 \frac{∂f}{∂a} = b + 1 af=b+1 ∂ f ∂ b = a + 2 b + 1 \frac{∂f}{∂b} = a + 2b + 1 bf=a+2b+1,与自动算出来的相同。

    四、实现线性回归

    计算损失的时候还用到了Tensor的减法,所以再定义个减法:

    class Sub(_Function):
        def forward(self, x, y):
            return x - y
    
        def backward(self, grad):
            return grad, -grad
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    为了训练模型,定义训练器,使用梯度下降:

    class Optimizer:
        def __init__(self, params):
            self.params = params
    
        def zero_grad(self):
            for param in self.params:
                param.zero_grad()
    
    
    class SGD(Optimizer):
        def __init__(self, params, lr=0.001):
            super().__init__(params)
            self.lr = lr
    
        def step(self):
            for param in self.params:
                param.data -= self.lr * param.grad.data
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    有这样一组数据,大致呈线性关系。

    from kygrad.tensor import Tensor
    from kygrad.optim import SGD
    
    w = Tensor(1., requires_grad=True)
    b = Tensor(0., requires_grad=True)
    
    
    def linreg(X):
        """线性回归模型"""
        # y = w * x + b
        y = []
        for x in X:
            y.append(w * x + b)
        return y
    
    
    def squared_loss(y_hat, y):
        loss = Tensor(0.)
        for i in range(len(y)):
            loss += (y[i] - y_hat[i]) * (y[i] - y_hat[i])
        return loss
    
    
    areas = [64.4, 68., 74.1, 74., 76.9, 78.1, 78.6]
    prices = [6.1, 6.25, 7.8, 6.66, 7.82, 7.14, 8.02]
    
    epochs = 500
    lr = 0.00001
    optimizer = SGD([w, b], lr=lr)
    train_loss = []
    for epoch in range(epochs):
        y_hat = linreg(areas)
        l = squared_loss(y_hat, prices)
        optimizer.zero_grad()
        l.backward()
        optimizer.step()
    
        train_loss.append(l.data)
        if epoch % 10 == 0:
            print(f'epoch {epoch + 1}: loss={l.data:.4f}')
    
    print(f"w: {w.data:.4f}, b: {b.data:.4f}")
    print(train_loss[-1])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    最后得到的wb分别为0.0971和-0.0129,损失值为1.2595,损失值变化曲线如下:

    请添加图片描述

    画出线性回归拟合出来的直线:

    请添加图片描述

    给它加个特征,重新训练:

    from kygrad.tensor import Tensor
    from kygrad.optim import SGD
    
    w1 = Tensor(1., requires_grad=True)
    w2 = Tensor(1., requires_grad=True)
    b = Tensor(0., requires_grad=True)
    
    
    def linreg(X):
        """线性回归模型"""
        # y = w1 * x1 + w2 * x2 + b
        y = []
        for x1, x2 in X:
            y.append(w1 * x1 + w2 * x2 + b)
        return y
    
    
    def squared_loss(y_hat, y):
        loss = Tensor(0.)
        for i in range(len(y)):
            loss += (y[i] - y_hat[i]) * (y[i] - y_hat[i])
        return loss
    
    
    areas = [64.4, 68., 74.1, 74., 76.9, 78.1, 78.6]
    ages = [31., 21., 10., 24., 17., 16., 17.]
    prices = [6.1, 6.25, 7.8, 6.66, 7.82, 7.14, 8.02]
    
    epochs = 500
    lr = 0.00001
    optimizer = SGD([w1, w2,  b], lr=lr)
    train_loss = []
    for epoch in range(epochs):
        y_hat = linreg(zip(areas, ages))
        l = squared_loss(y_hat, prices)
        optimizer.zero_grad()
        l.backward()
        optimizer.step()
    
        train_loss.append(l.data)
        if epoch % 10 == 0:
            print(f'epoch {epoch + 1}: loss={l.data:.4f}')
    
    print(f"w1: {w1.data:.4f}, w2:{w2.data:.4f}, b: {b.data:.4f}")
    print(train_loss[-1])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    得到的结果为w1: 0.0992, w2:-0.0080, b: -0.0177,损失值为1.0956,与之前相比有所下降。

    损失值变化曲线(没有画前四次的,初始的损失值太高了,画的话跟之前的一样,断崖式)如下:
    请添加图片描述

    五、总结

    通过自己实现自动求导并应用到线性回归实例中,有助于深入理解深度学习框架的底层实现,而不是仅仅把它当作黑盒。

    本文的Tensor只能支持浮点类型的标量,而且只有简单的加法、减法、乘法运算,还有许多功能没有实现,比如一元运算、矩阵运算、聚合运算等等,不过总体实现流程是相似的。

    参考资料:
    https://github.com/karpathy/micrograd
    https://github.com/geohot/tinygrad
    https://github.com/nlp-greyfoss/metagrad

  • 相关阅读:
    VS2022安装Python开发环境
    【C++编程语言】之面向对象的三大特性之一封装
    Redis实践
    springboot实现(select && insert)
    【PCBA方案设计】蓝牙跳绳方案
    Linux——文件传输协议知识点梳理
    QPair的介绍及用法
    OSGi 事件和服务调用(非SCR调用)
    day2 ARM基础
    IntelliJ IDEA 2023.1 版本可以安装了
  • 原文地址:https://blog.csdn.net/luxurie/article/details/126560216