• 从零实现深度学习框架——重构计算图的实现


    引言

    本着“凡我不能创造的,我就不能理解”的思想,本系列文章会基于纯Python以及NumPy从零创建自己的深度学习框架,该框架类似PyTorch能实现自动求导。

    要深入理解深度学习,从零开始创建的经验非常重要,从自己可以理解的角度出发,尽量不使用外部完备的框架前提下,实现我们想要的模型。本系列文章的宗旨就是通过这样的过程,让大家切实掌握深度学习底层实现,而不是仅做一个调包侠。

    基于之前的计算图代码,在实现类似torch.unbind时遇到了问题。因此,我们需要重写计算图的支撑代码,以支持这些运算。

    存在的问题

    在实现有单个输入产生多个输出的运算时,比如unbindsplit。我们分别对多个输出后续进行不同的运算,在反向传播时,它们的梯度应该不同。

    此时在反向传播时,应该传入一个梯度列表,每个元素代表由每个输出贡献的梯度。现有的代码只接收单个梯度,而不是列表。

    重写Tensor#backward

     def backward(self, grad: "Tensor" = None, retain_grad=False, create_graph=False) -> None:
            '''
            实现Tensor的反向传播
            Args:
                grad: 如果该Tensor不是标量,则需要传递梯度进来
                retain_grad: 是否保留梯度的中间变量
                create_graph: 是否整个计算梯度的过程也需要保留到计算图中,即double_backprop
    
            Returns:
    
            '''
            # 只能在requires_grad=True的Tensor上调用此方法
            assert self.requires_grad, "called backward on tensor do not require grad"
    
            if not Config.backprop:
                return
    
            # 如果传递过来的grad为空
            if grad is None:
                if self.shape == ():
                    # 设置梯度值为1,grad本身不需要计算梯度
                    self._grad = Tensor(1., device=self.device)
                else:
                    # 如果当前Tensor得到不是标量,那么grad必须指定
                    raise RuntimeError("grad must be specified for non scalar")
            else:
                self._grad = ensure_tensor(grad, device=self.device)
    
            funcs = []  # 候选函数堆
            seen_set = set()
    
            def add_func(f):
                if f not in seen_set:
                    # heapq是小顶堆,为了实现大顶堆的效果,需要加一个负号
                    heapq.heappush(funcs, (-f.generation, len(seen_set), f))
                    seen_set.add(f)
    
            add_func(self.creator)
            while funcs:
                _, _, f = heapq.heappop(funcs)
                # 获取输出对应的梯度,解决多个输出梯度不一致的问题
                gys = [output().grad.data for output in f.outputs]  # output 是 weakref
    
                with using_config('backprop', create_graph):
                    with OpWrapper(f.__class__.__name__, gys, backward=True):
                        gxs = f.backward(*gys)
                    if not isinstance(gxs, tuple):
                        gxs = (gxs,)
    
                    for x, gx in zip(f.inputs, gxs):
                        if x.requires_grad and gx is not None:
                            assert x.shape == gx.shape, f"grad shape must match tensor shape in {f!r}, {gx.shape!r} != {x.shape!r}"
    
                            gx = Tensor(gx, device=self.device, dtype=self.dtype)
                            if x.grad is None:
                                x._grad = gx
                            else:
                                x._grad = x.grad + gx
    
                            if x.creator is not None:
                                add_func(x.creator)
    
                if not retain_grad:
                    for y in f.outputs:
                        y()._grad = None
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65

    修改点:

    • 通过大顶堆来实现计算图的回溯算法。同时为Tensor增加一个creator,产生该输出的运算(function)。

    • 这里增加了代次(generation)的概念,当前输出的generation由creator对应的加1。

    • 增加了weakref对内存利用进行优化。

    • 通过create_graph支持double_backprop。

    重写Function

    class Function:
        def __init__(self) -> None:
            # 保存需要在backward()中使用的Tensor或其他对象(如Shape)
            self.saved_tensors = []
    
        def save_for_backward(self, *x: Any) -> None:
            self.saved_tensors.extend(x)
    
        def forward(self, *args: Any, **kwargs: Any) -> NdArray:
            '''前向传播,进行真正运算的地方'''
            raise NotImplementedError("You must implement the forward function for custom Function.")
    
        def backward(self, grad: NdArray) -> Any:
            '''实现反向传播,计算梯度'''
            raise NotImplementedError("You must implement the backward method for your custom Function "
                                      "to use it with backward mode AD.")
    
        def __call__(self, *xs: "Tensor", **kwargs) -> "Tensor":
            # [t.data for t in xs]遍历Tensor中的data(NdArray)值,参与实际计算的都是NumPy的数组。
            ys = self.forward(*[t.data for t in xs], **kwargs)
    
            requires_grad = any([t.requires_grad for t in xs])
    
            if not isinstance(ys, tuple):
                ys = (ys,)
    
            outputs = [Tensor(y, device=xs[0].device, requires_grad=requires_grad) for y in ys]
    
            if Config.backprop:
                self.generation = max([x.generation for x in xs])
                for output in outputs:  # 设定每个输出是由此函数得到的
                    output.set_creator(self)
                self.inputs = xs  # 记录输入
                self.outputs = [weakref.ref(output) for output in outputs]  # 通过弱引用记录输出
    
            # 返回多个则通过元组
            return tuple(outputs) if len(outputs) > 1 else outputs[0]
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    所有操作的基类Function进行了重写,同时尽可能兼容之前的代码,在调用方法__call__中维护了该操作的输入和输出。

    如果有多个输出,统一通过元组来保存。

    重写Function子类

    Split为例,作为stack的逆操作,我们的split实际上就是torch.unbind

    旧代码:

    class Split(Function):
        '''Stack的逆操作'''
    
        def forward(ctx, inputs: NdArray, axis: int) -> NdArray:
            xp = get_array_module(inputs)
            xs = xp.split(inputs, inputs.shape[axis], axis)
            ys = [xp.squeeze(y, axis) for y in xs]  # 去掉维度axis
            ctx.save_for_backward(len(ys), axis)
    
            return tuple(ys)
    
        def backward(ctx, grad: NdArray) -> NdArray:
            size, axis = ctx.saved_tensors
            grad /= size
            bigger_grad = [Tensor(grad)] * size
            return stack(bigger_grad, axis)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    改写成:

    class Split(Function):
        '''Stack的逆操作'''
    
        def forward(self, inputs: NdArray, axis: int) -> NdArray:
            xp = get_array_module(inputs)
            xs = xp.split(inputs, inputs.shape[axis], axis)
            ys = [xp.squeeze(y, axis) for y in xs]  # 去掉维度axis
            self.save_for_backward(xp, axis, ys[0].shape, inputs.dtype)
    
            return tuple(ys)
    
        def backward(self, *grad: List[NdArray]) -> NdArray:
            xp, axis, shape, dtype = self.saved_tensors
            grads = [Tensor(xp.zeros(shape, dtype)) if g is None else Tensor(g) for g in grad]
            return stack(grads, axis)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    改动如下:

    • ctx变成了self
    • 为了正确性,重写了其backward
    • 现在梯度支持传入列表,代表split多个输出贡献的不同梯度

    这样才能跑通下面的测试用例:

    def test_split():
        x = np.arange(6).reshape((2, 3)).astype(np.float32)
        # x = array([[0., 1., 2.],
        #           [3., 4., 5.]], dtype=float32)
    
        mx = Tensor(x, requires_grad=True)
    
        tx = torch.tensor(x, requires_grad=True)
    
        my = F.split(mx)
        ty = torch.split(tx, 1)
    
        # 这里返回的是元组
        assert isinstance(my, tuple)
    
        assert np.allclose(my[0].data, ty[0].data)
    
        (my[0]).sum().backward()
        (ty[0]).sum().backward()
    
        print(mx.grad.data)
        print(tx.grad.data)
    
        assert np.allclose(mx.grad.data, tx.grad.data)
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    test_split.py::test_split PASSED                                         [100%]
    [[1. 1. 1.]
     [0. 0. 0.]]
    tensor([[1., 1., 1.],
            [0., 0., 0.]])
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里由于只对split后的第一个元素my[0]进行了后续操作(sum),因此反向传播后,也应该只有该元素会产生梯度,如上输出。

    完整代码

    类似地,我们就可以实现chunk操作,即cat的逆操作,将Tensor沿某一维分开,其参数chunks为分割的份数,axis为分割的维度。具体实现可以参考完整代码:

    https://github.com/nlp-greyfoss/metagrad

  • 相关阅读:
    React useContext
    生物识别技术在汽车领域带来了巨大变革
    【设计模式】【第五章】【开具增值税发票】【建造者模式 + 原型模式】
    Nginx配置文件详解
    在Docker中运行PostgreSQL数据库
    精确率、准确率、召回率
    视频监控管理平台EasyCVR自动注销后,页面还存留播放窗口是什么原因?解决办法是什么?
    spring boot 使用单元测试JUnit5
    C++之内建函数对象
    k8s--基础--23.5--认证-授权-准入控制--通过token令牌登陆dashboard界面
  • 原文地址:https://blog.csdn.net/yjw123456/article/details/125368538