• Tensorflow 2.x入门教程


    前言

      至于为什么写这个教程,首先是为了自己学习做个记录,其次是因为Tensorflow的API写的很好,但是他的教程写的太乱了,不适合新手学习。tensorflow 1 和tensorflow 2 有相似之处但是不兼容,tensorflow 2将keras融合了。TensorFlow™ 是一个采用 数据流图(data flow graphs),用于数值计算的开源软件库。图中得节点(Nodes)表示数学操作,图中的线(edges)则表示在节点间相互联系的多维数据数组,即张量(tensor)。它灵活的架构让你可以在多种平台上展开计算,例如台式计算机中的一个或多个CPU(或GPU),服务器,移动设备等等。

    TensorFlow的主要优点:

    • 灵活性:支持底层数值计算,C++自定义操作符
    • 可移植性:从服务器到PC到手机,从CPU到GPU到TPU
    • 分布式计算:分布式并行计算,可指定操作符对应计算设备

    层次结构

      TensorFlow的层次结构从低到高可以分成如下五层:硬件层,内核层,低阶API,中阶API,高阶API。

    • 第一层:硬件层,TensorFlow支持CPU、GPU或TPU加入计算资源池。
    • 第二层:内核层,为C++实现的内核,kernel可以跨平台分布运行。
    • 第三层:低阶API,由Python实现的操作符,提供了封装C++内核的低级API指令。主要包括各种张量操作算子、计算图、自动微分。如tf.Variable,tf.constant,tf.function,tf.GradientTape,tf.nn.softmax... 
    • 第四层:中阶API,由Python实现的模型组件,对低级API进行了函数封装。主要包括数据管道(tf.data)、特征列(tf.feature_column)、激活函数(tf.nn)、模型层(tf.keras.layers)、损失函数(tf.keras.losses)、评估函数(tf.keras.metrics)、优化器(tf.keras.optimizers)、回调函数(tf.keras.callbacks) 等等。
    • 第五层:高阶API,由Python实现的模型成品。主要为tf.keras.models提供的模型的类接口,主要包括:模型的构建(Sequential、functional API、Model子类化)、型的训练(内置fit方法、内置train_on_batch方法、自定义训练循环、单GPU训练模型、多GPU训练模型、TPU训练模型)、模型的部署(tensorflow serving部署模型、使用spark(scala)调用tensorflow模型)。 

    概述

    创建张量

    复制代码
    tf.constant(value, dtype=tf.float32)    # 常数
    tf.range(start, limit=None, delta=1)    # 生成一个范围内间隔为delta的 张量
    tf.linspace(start, stop, num) # 在一个间隔内生成均匀间隔的值
    tf.zeros()  # 创建全0张量
    tf.ones()  # 创建全1张量
    tf.zeros_like(input)  # 创建和input一样大小的张量
    tf.fill(dims, value)    # 创建shape为dim,全为value的张量
    
    tf.random.uniform([5], minval=0, maxval=10)     # 均匀分布随机
    tf.random.normal([3, 3], mean=0.0, stddev=1.0)  # 正态分布随机
    
    tf.Variable(initial_value)  # 变量
    复制代码

    tf.Variable:

    • name:变量的名字,默认情况下,会自动获得唯一的变量名
    • trainable:设置为 False 可以关闭梯度。例如,训练计步器就是一个不需要梯度的变量

    tf.rank(a):求矩阵的秩

    变量的设备位置

    为了提高性能,TensorFlow 会尝试将张量和变量放在与其 dtype 兼容的最快设备上。这意味着如果有 GPU,那么大部分变量都会放置在 GPU 上,不过,我们可以重写变量的位置。

    复制代码
    with tf.device('CPU:0'):
      # Create some tensors
      a = tf.Variable([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
      b = tf.constant([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
      c = tf.matmul(a, b)
    
    print(c)
    复制代码

    使用assign重新分配张量

    a.assign([5, 6])  # a = [5, 6]
    a.assign_add([2, 3])  # a = a+[2,3]
    a.assign_sub([7, 9])  # a=a-[7,9]

    维度变换

    维度变换相关函数主要有 tf.reshape,tf.squeeze,tf.expand_dims,tf.transpose。

    • tf.reshape():改变张量的形状
    • tf.squeeze():减少维度为1的维度
    • tf.expand_dims(input, axis):增加维度
    • tf.transpose(a, perm=None):交换维度

    tf.reshape可以改变张量的形状,但是其本质上不会改变张量元素的存储顺序,所以,该操作实际上非常迅速,并且是可逆的。

    合并分隔

    和numpy类似,可以用tf.concattf.stack方法对多个张量进行合并,可以用tf.split方法把一个张量分割成多个张量。

    tf.concat和tf.stack有略微的区别,tf.concat是连接,不会增加维度,而tf.stack是堆叠,会增加维度。

    复制代码
    a = tf.constant([[1.0, 2.0], [3.0, 4.0]])  # (2,2)
    b = tf.constant([[5.0, 6.0], [7.0, 8.0]])  # (2,2)
    
    c = tf.concat([a, b], axis=0)  # (4, 2)
    c = tf.stack([a, b], axis=0)  # (2, 2, 2)
    d = tf.split(c, 2, axis=0)  # [(1, 2, 2),(1, 2, 2)]
    复制代码

    Tensor与Array的转换

    c = np.array(b)     # tensor 转 np
    c = b.numpy()       # tensor 转 np
    tf.convert_to_tensor(c)     # np 转 tensor

    数学运算

    tf.add(a,b)        # 加法 a+b
    tf.multiply(a,b)       # 逐元素乘法a*b
    tf.matmul(a,b)     # 矩阵乘法a@b

    类型转换

    tensorflow支持的模型有:tf.float16、tf.float64、tf.int8、tf.int16、tf.int32...

    a = tf.constant([2.2, 3.3, 4.4], dtype=tf.float64)
    b = tf.cast(a, dtype=tf.float16)    # 类型转换

    计算图

      有三种计算图的构建方式:静态计算图动态计算图,以及Autograph。在TensorFlow1.0时代,采用的是静态计算图,需要先使用TensorFlow的各种算子创建计算图,然后再开启一个会话Session,显式执行计算图。而在TensorFlow2.0时代,采用的是动态计算图,即每使用一个算子后,该算子会被动态加入到隐含的默认计算图中立即执行得到结果,而无需开启Session。使用动态计算图(Eager Excution)的好处是方便调试程序,它会让TensorFlow代码的表现和Python原生代码的表现一样,写起来就像写numpy一样,各种日志打印,控制流全部都是可以使用的。使用动态计算图的缺点是运行效率相对会低一些。因为使用动态图会有许多次Python进程和TensorFlow的C++进程之间的通信。而静态计算图构建完成之后几乎全部在TensorFlow内核上使用C++代码执行,效率更高。此外静态图会对计算步骤进行一定的优化,剪去和结果无关的计算步骤。

      如果需要在TensorFlow2.0中使用静态图,可以使用@tf.function装饰器将普通Python函数转换成对应的TensorFlow计算图构建代码。运行该函数就相当于在TensorFlow1.0中用Session执行代码。使用tf.function构建静态图的方式叫做 Autograph。当然Autograph机制能够转换的代码并不是没有任何约束的,有一些编码规范需要遵循,否则可能会转换失败或者不符合预期。

    1. 被@tf.function修饰的函数应尽可能使用TensorFlow中的函数而不是Python中的其他函数。例如使用tf.print而不是print,使用tf.range而不是range,使用tf.constant(True)而不是True.
    2. 避免在@tf.function修饰的函数内部定义tf.Variable
    3. 被@tf.function修饰的函数不可修改该函数外部的列表或字典等数据结构变量

      计算图由节点(nodes)和线(edges)组成。节点表示操作符Operator,或者称之为算子,线表示计算间的依赖。实线表示有数据传递依赖,传递的数据即张量。虚线通常可以表示控制依赖,即执行先后顺序。

    复制代码
    import tensorflow as tf
    
    # 使用autograph构建静态图
    @tf.function
    def strjoin(x,y):
        z =  tf.strings.join([x,y],separator = " ")
        tf.print(z)
        return z
    
    result = strjoin(tf.constant("hello"),tf.constant("world"))
    
    print(result)
    复制代码

    您可以像这样测量静态图和动态图性能差异:

    x = tf.random.uniform(shape=[10, 10], minval=-1, maxval=2, dtype=tf.dtypes.int32)
    
    def power(x, y):
      result = tf.eye(10, dtype=tf.dtypes.int32)
      for _ in range(y):
        result = tf.matmul(x, result)
      return result
    
    print("Eager execution:", timeit.timeit(lambda: power(x, 100), number=1000))        # 2.56378621799
    
    # 将python函数转换为图形
    power_as_graph = tf.function(power)
    print("Graph execution:", timeit.timeit(lambda: power_as_graph(x, 100), number=1000))        # 0.683253670
    View Code

    我们还可以再函数前使用装饰器 @tf.function 调用tf.function,同时也可以使用 tf.config.run_functions_eagerly(True) 关闭Function创建和运行图形的能力。

      前面在介绍Autograph的编码规范时提到构建Autograph时应该避免在@tf.function修饰的函数内部定义tf.Variable。但是如果在函数外部定义tf.Variable的话,又会显得这个函数有外部变量依赖,封装不够完美。一种简单的思路是定义一个类,并将相关的tf.Variable创建放在类的初始化方法中。而将函数的逻辑放在其他方法中。

    class DemoModule(tf.Module):
        def __init__(self, init_value=tf.constant(0.0), name=None):
            super(DemoModule, self).__init__(name=name)
            with self.name_scope:  # 相当于with tf.name_scope("demo_module")
                self.x = tf.Variable(init_value, dtype=tf.float32, trainable=True)
    
        @tf.function
        def addprint(self, a):
            with self.name_scope:
                self.x.assign_add(a)
                tf.print(self.x)
                return self.x
    View Code

    自动微分

      自动微分用于训练神经网络的反向传播非常有用,TensorFlow 会记住在前向传递过程中哪些运算以何种顺序发生。随后,在后向传递期间,以相反的顺序遍历此运算列表来计算梯度。

    Tensorflow一般使用tf.GradientTape来记录正向运算过程,然后反向传播自动计算梯度值。

    f(x)=ax^2+bx+c

    复制代码
    x = tf.Variable(0.0,name = "x",dtype = tf.float32)
    a = tf.constant(1.0)
    b = tf.constant(-2.0)
    c = tf.constant(1.0)
    
    with tf.GradientTape() as tape:
        y = a*tf.pow(x,2) + b*x + c
        
    dy_dx = tape.gradient(y,x)
    print(dy_dx)    # tf.Tensor(-2.0, shape=(), dtype=float32)
    复制代码

    对常量张量也可以求导,只不过需要增加watch

    with tf.GradientTape() as tape:
        tape.watch([a,b,c])
        y = a*tf.pow(x,2) + b*x + c
        
    dy_dx,dy_da,dy_db,dy_dc = tape.gradient(y,[x,a,b,c])
    print(dy_da)    # tf.Tensor(0.0, shape=(), dtype=float32)
    print(dy_dc)    # tf.Tensor(1.0, shape=(), dtype=float32)
    View Code

    可以求二阶导数

    with tf.GradientTape() as tape2:
        with tf.GradientTape() as tape1:   
            y = a*tf.pow(x,2) + b*x + c
        dy_dx = tape1.gradient(y,x)   
    dy2_dx2 = tape2.gradient(dy_dx,x)
    
    print(dy2_dx2)    # tf.Tensor(2.0, shape=(), dtype=float32)
    View Code

    利用梯度和优化器求最小值

    # 求f(x) = a*x**2 + b*x + c的最小值
    # 使用optimizer.apply_gradients
    
    x = tf.Variable(0.0,name = "x",dtype = tf.float32)
    a = tf.constant(1.0)
    b = tf.constant(-2.0)
    c = tf.constant(1.0)
    
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
    for _ in range(1000):
        with tf.GradientTape() as tape:
            y = a*tf.pow(x,2) + b*x + c
        dy_dx = tape.gradient(y,x)  # 计算梯度
        optimizer.apply_gradients(grads_and_vars=[(dy_dx,x)])  # 根据梯度更新变量
        
    tf.print("y =",y,"; x =",x)
    View Code

    如果不想被计算梯度:

    with tf.GradientTape(watch_accessed_variables=False) as tape:
        pass

    使用TensorFlow实现神经网络模型的一般流程包括:

    1. 准备数据
    2. 定义模型
    3. 训练模型
    4. 评估模型
    5. 推理模型
    6. 保存模型

    数据输入

      tensorflow支持 Numpy 数组、Pandas DataFrame、Python 生成器、csv文件、文本文件、文件路径、TFrecords文件等方式构建数据管道。如果您的数据很小并且适合内存,我们建议您使用tf.data.Dataset.from_tensor_slices()从Numpy array构建数据管道

    • Dataset:如果您有大型数据集并且需要进行分布式训练
    • Sequence:如果您有大型数据集并且需要执行大量在 TensorFlow 中无法完成的自定义 Python 端处理(例如,如果您依赖外部库进行数据加载或预处理)
    • 通过tfrecords文件方式构建数据管道较为复杂,需要对样本构建tf.Example后压缩成字符串写到tfrecoreds文件,读取后再解析成tf.Example。但tfrecoreds文件的优点是压缩后文件较小,便于网络传播,加载速度较快。

    Numpy构建数据

    官方推荐使用 tf.data.Dataset.from_tensors() 或 tf.data.Dataset.from_tensor_slices() 创建数据集,Dataset支持一类特殊的操作Trainformation(打乱、生成epoch...等操作)

    • data.map(function):将转换函数映射到数据集每一个元素
    • data.batch(batch_size):构建batch
    • data.shuffle(buffer_size):随机打乱输入数据,从该缓冲区中随机采样元素
    • data.repeat():repeat的功能就是将整个序列重复多次,一般不带参数
    • data.prefetch(tf.data.experimental.AUTOTUNE)  :预先取 数据
    • data.take():采样,从开始位置取前几个元素
    • ...
    复制代码
    features = np.arange(0, 100, dtype=np.int32)    # # (100,)
    labels = np.zeros(100, dtype=np.int32)  # (100,)
    
    data = tf.data.Dataset.from_tensor_slices((features, labels))   # 创建数据集
    data = data.repeat()    # 无限期地补充数据
    data = data.shuffle(buffer_size=100)    # 打乱数据
    data = data.batch(batch_size=4)     # 批量数据
    data = data.prefetch(buffer_size=1)     # 预取批处理(预加载批处理,消耗更快)
    
    for batch_x, batch_y in data.take(5):
        print(batch_x.shape, batch_y.shape)     # (4,) (4,)
        break
    复制代码

    注意:如果你打算多次调用,你可以使用迭代器的方式:

    ite_data = iter(data)
    for i in range(5):
    batch_x, batch_y = next(ite_data)
    print(batch_x, batch_y)
    
    for i in range(5):
    batch_x, batch_y = next(ite_data)
    print(batch_x, batch_y)
    View Code

    提升管道性能

      训练深度学习模型常常会非常耗时。模型训练的耗时主要来自于两个部分,一部分来自数据准备,另一部分来自参数迭代。参数迭代过程的耗时通常依赖于GPU来提升。而数据准备过程的耗时则可以通过构建高效的数据管道进行提升。

    以下是一些构建高效数据管道的建议。

    1. 使用 prefetch 方法让数据准备和参数迭代两个过程相互并行。
    2. 使用 interleave 方法可以让数据读取过程多进程执行,并将不同来源数据夹在一起。
    3. 使用 map 时设置num_parallel_calls 让数据转换过程多进行执行。
    4. 使用 cache 方法让数据在第一个epoch后缓存到内存中,仅限于数据集不大情形。
    5. 使用 map转换时,先batch,然后采用向量化的转换方法对每个batch进行转换。

    生成器构建数据

    复制代码
    def generate_features():
        # 函数生成一个随机字符串
        def random_string(length):
            return ''.join(random.choice(string.ascii_letters) for m in range(length))
        # 返回一个随机字符串、一个随机向量和一个随机整数
        yield random_string(4), np.random.uniform(size=4), random.randint(0, 10)
    
    
    data = tf.data.Dataset.from_generator(generate_features, output_types=(tf.string, tf.float32, tf.int32))
    data = data.repeat()        # 无限期地补充数据
    data = data.shuffle(buffer_size=100)    # 打乱数据
    data = data.batch(batch_size=4)     # 批量数据(将记录聚合在一起)
    data = data.prefetch(buffer_size=1)     # 预取批量(预加载批量以便更快的消耗)
    
    # Display data.
    for batch_str, batch_vector, batch_int in data.take(5):
        # (4,) (4, 4) (4,)
        print(batch_str.shape, batch_vector.shape, batch_int.shape)
    复制代码

    keras.utils.Sequence

    特别是,keras.utils.Sequence该类提供了一个简单的接口来构建 Python 数据生成器,该生成器可以感知多处理并且可以洗牌。

    Sequence必须实现两种方法:

    • __getitem__:返回一个batch数据
    • __len__:整型,返回batch的数量
    复制代码
    import tensorflow as tf
    from keras.utils.data_utils import Sequence
    
    class SequenceDataset(Sequence):
        def __init__(self, batch_size):
            self.input_data = tf.random.normal((640, 8192, 1))
            self.label_data = tf.random.normal((640, 8192, 1))
            self.batch_size = batch_size
    
        def __len__(self):
            return int(tf.math.ceil(len(self.input_data) / float(self.batch_size)))
    
        # 每次输出一个batch
        def __getitem__(self, idx):
            batch_x = self.input_data[idx * self.batch_size:(idx + 1) * self.batch_size]
            batch_y = self.label_data[idx * self.batch_size:(idx + 1) * self.batch_size]
            return batch_x, batch_y
    
    
    sequence = SequenceDataset(batch_size=64)
    
    for batch_idx, (x, y) in enumerate(sequence):
        print(batch_idx, x.shape, y.shape)  # tf.float32
        # 0 (64, 8192, 1) (64, 8192, 1)
        break
    复制代码

    搭建模型

      深度学习模型一般由各种模型层组合而成,如果这些内置模型层不能够满足需求,我们也可以通过编写tf.keras.Lambda匿名模型层或继承tf.keras.layers.Layer基类构建自定义的模型层。其中tf.keras.Lambda匿名模型层只适用于构造没有学习参数的模型层。

    搭建模型有以下3种方式构建模型:

    1. Sequential顺序模型:用于简单的层堆栈, 其中每一层恰好有一个输入张量一个输出张量
    2. 函数式API模型:多输入多输出,或者模型需要共享权重,或者模型具有残差连接等非顺序结构,
    3. 继承Model基类自定义模型:如果无特定必要,尽可能避免使用Model子类化的方式构建模型,这种方式提供了极大的灵活性,但也有更大的概率出错

    顺序建模

    model = keras.Sequential([
        layers.Dense(2, activation="relu", name="layer1"),
        layers.Dense(3, activation="relu", name="layer2"),
        layers.Dense(4, name="layer3"),
    ])

    还可以通过add方法创建顺序模型

    model = keras.Sequential()
    model.add(layers.Dense(2, activation="relu"))
    model.add(layers.Dense(3, activation="relu"))
    model.add(layers.Dense(4))

    因为模型不知道输入shape,所以起初模型没有权重,因此我们需要告知模型输入shape

    复制代码
    # 在第一层添加Input
    model.add(keras.Input(shape=(4,)))
    model.add(layers.Dense(2, activation="relu"))
    
    # 或者在第一层添加input_shape
    model.add(layers.Dense(2, activation="relu", input_shape=(4,)))
    复制代码

    顺序模型可以配合add 和 model.summary() 在模型的任何位置查看该层的输入输出。

    函数式API建模

      函数式API搭建模型比Sequential更加灵活,可以处理具有非线性拓扑、共享层甚至多个输入或输出的模型。

    复制代码
    inputs = keras.Input(shape=(784,))
    x = layers.Dense(64, activation="relu")(inputs)
    x = layers.Dense(64, activation="relu")(x)
    outputs = layers.Dense(10)(x)
    model = keras.Model(inputs=inputs, outputs=outputs, name="mnist_model")
    model.summary()     # 查看模型摘要
    复制代码

    还可以将模型绘制为图形

    keras.utils.plot_model(model, "my_first_model_with_shape_info.png", show_shapes=True)

    补充:函数式模型是可以嵌套的

    自定义建模

      在 TensorFlow 中,模型类的继承关系为: 

      tf.keras.Modeltf.keras.layers.Layertf.Module

      通常使用Layer类来定义内部计算块,并使用Model类定义外部模型(训练的对象)。

    继承tf.Module栗子:

    class SequentialModule(tf.Module):
        def __init__(self, name=None):
            super().__init__(name=name)
            self.dense_1 = Dense(in_features=3, out_features=3)
            self.dense_2 = Dense(in_features=3, out_features=2)
    
        def __call__(self, x):
            x = self.dense_1(x)
            return self.dense_2(x)
    
    my_model = SequentialModule(name="the_model")
    View Code

    继承tf.keras.layers.Layer栗子:

    class ResBlock(layers.Layer):
        def __init__(self, kernel_size, **kwargs):
            super(ResBlock, self).__init__(**kwargs)
            self.kernel_size = kernel_size
    
        def build(self, input_shape):
            self.conv1 = layers.Conv1D(filters=64, kernel_size=self.kernel_size,
                                       activation="relu", padding="same")
            self.conv2 = layers.Conv1D(filters=32, kernel_size=self.kernel_size,
                                       activation="relu", padding="same")
            self.conv3 = layers.Conv1D(filters=input_shape[-1],
                                       kernel_size=self.kernel_size, activation="relu", padding="same")
            self.maxpool = layers.MaxPool1D(2)
            super(ResBlock, self).build(input_shape)  # 相当于设置self.built = True
    
        def call(self, inputs):
            x = self.conv1(inputs)
            x = self.conv2(x)
            x = self.conv3(x)
            x = layers.Add()([inputs, x])
            x = self.maxpool(x)
            return x
    
        # 如果要让自定义的Layer通过Functional API 组合成模型时可以序列化,需要自定义get_config方法。
        def get_config(self):
            config = super(ResBlock, self).get_config()
            config.update({'kernel_size': self.kernel_size})
            return config
    
    
    resblock = ResBlock(kernel_size=3)
    resblock.build(input_shape=(None, 200, 7))
    resblock.compute_output_shape(input_shape=(None, 200, 7))
    View Code

    继承tf.Module栗子:

    class ImdbModel(models.Model):
        def __init__(self):
            super(ImdbModel, self).__init__()
    
        def build(self, input_shape):
            self.embedding = layers.Embedding(MAX_WORDS, 7)
            self.block1 = ResBlock(7)
            self.block2 = ResBlock(5)
            self.dense = layers.Dense(1, activation="sigmoid")
            super(ImdbModel, self).build(input_shape)
    
        def call(self, x):
            x = self.embedding(x)
            x = self.block1(x)
            x = self.block2(x)
            x = layers.Flatten()(x)
            x = self.dense(x)
            return (x)
    
    
    model = ImdbModel()
    model.build(input_shape=(None, 200))
    model.compile(optimizer='Nadam', loss='binary_crossentropy', metrics=['accuracy', "AUC"])
    View Code

    Model类具有Layer类相同的API,但有以下区别:

    • Model类 提供了内置的训练  model.fit()  、评估 model.evaluate()  和预测  model.predict()  API
    • Model类 可以通过 model.layers 属性公开内层的列表。
    • Model类 提供了保存和序列化 API  save()、save_weights() ...

      Layer 类对应于“层”,Model 类对应于“模型,如果您想知道“我应该使用Layer类还是Model类?”,请问自己:我需要调用fit()它吗?我需要save() 吗?如果是这样,请与Model。如果不是,请使用Layer。

    把 Layer 类和 Model 类用在一起,吃个栗子:

    class Sampling(layers.Layer):
        """使用(z_mean, z_log_var)对z进行采样,z是对一个数字进行编码的向量"""
    
        def call(self, inputs):
            z_mean, z_log_var = inputs
            batch = tf.shape(z_mean)[0]
            dim = tf.shape(z_mean)[1]
            epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
            return z_mean + tf.exp(0.5 * z_log_var) * epsilon
    
    
    class Encoder(layers.Layer):
        """将MNIST数字映射为一个三元组(z_mean, z_log_var, z)"""
    
        def __init__(self, latent_dim=32, intermediate_dim=64, name="encoder", **kwargs):
            super(Encoder, self).__init__(name=name, **kwargs)
            self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
            self.dense_mean = layers.Dense(latent_dim)
            self.dense_log_var = layers.Dense(latent_dim)
            self.sampling = Sampling()
    
        def call(self, inputs):
            x = self.dense_proj(inputs)
            z_mean = self.dense_mean(x)
            z_log_var = self.dense_log_var(x)
            z = self.sampling((z_mean, z_log_var))
            return z_mean, z_log_var, z
    
    
    class Decoder(layers.Layer):
        """将已编码的数字向量z转换回可读的数字"""
    
        def __init__(self, original_dim, intermediate_dim=64, name="decoder", **kwargs):
            super(Decoder, self).__init__(name=name, **kwargs)
            self.dense_proj = layers.Dense(intermediate_dim, activation="relu")
            self.dense_output = layers.Dense(original_dim, activation="sigmoid")
    
        def call(self, inputs):
            x = self.dense_proj(inputs)
            return self.dense_output(x)
    
    
    class VariationalAutoEncoder(keras.Model):
        """将编码器和解码器组合成端到端的训练模型。"""
    
        def __init__(self, original_dim, intermediate_dim=64, latent_dim=32, name="autoencoder", **kwargs):
            super(VariationalAutoEncoder, self).__init__(name=name, **kwargs)
            self.original_dim = original_dim
            self.encoder = Encoder(latent_dim=latent_dim, intermediate_dim=intermediate_dim)
            self.decoder = Decoder(original_dim, intermediate_dim=intermediate_dim)
    
        def call(self, inputs):
            z_mean, z_log_var, z = self.encoder(inputs)
            reconstructed = self.decoder(z)
            # Add KL divergence regularization loss.
            kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
            self.add_loss(kl_loss)
            return reconstructed
    View Code

    自定义循环训练模型

    original_dim = 784
    vae = VariationalAutoEncoder(original_dim, 64, 32)
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    mse_loss_fn = tf.keras.losses.MeanSquaredError()
    
    loss_metric = tf.keras.metrics.Mean()
    
    (x_train, _), _ = tf.keras.datasets.mnist.load_data()
    x_train = x_train.reshape(60000, 784).astype("float32") / 255
    
    train_dataset = tf.data.Dataset.from_tensor_slices(x_train)
    train_dataset = train_dataset.shuffle(buffer_size=1024).batch(64)
    
    epochs = 2
    
    # Iterate over epochs.
    for epoch in range(epochs):
        print("Start of epoch %d" % (epoch,))
    
        # Iterate over the batches of the dataset.
        for step, x_batch_train in enumerate(train_dataset):
            with tf.GradientTape() as tape:
                reconstructed = vae(x_batch_train)
                # Compute reconstruction loss
                loss = mse_loss_fn(x_batch_train, reconstructed)
                loss += sum(vae.losses)  # Add KLD regularization loss
    
            grads = tape.gradient(loss, vae.trainable_weights)
            optimizer.apply_gradients(zip(grads, vae.trainable_weights))
    
            loss_metric(loss)
    
            if step % 100 == 0:
                print("step %d: mean loss = %.4f" % (step, loss_metric.result()))
    View Code

    内置循环方法

    vae = VariationalAutoEncoder(784, 64, 32)
    
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    
    vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
    vae.fit(x_train, x_train, epochs=2, batch_size=64)
    View Code

    函数式API和自定义Model类混搭训练

    original_dim = 784
    intermediate_dim = 64
    latent_dim = 32
    
    # Define encoder model.
    original_inputs = tf.keras.Input(shape=(original_dim,), name="encoder_input")
    x = layers.Dense(intermediate_dim, activation="relu")(original_inputs)
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()((z_mean, z_log_var))
    encoder = tf.keras.Model(inputs=original_inputs, outputs=z, name="encoder")
    
    # Define decoder model.
    latent_inputs = tf.keras.Input(shape=(latent_dim,), name="z_sampling")
    x = layers.Dense(intermediate_dim, activation="relu")(latent_inputs)
    outputs = layers.Dense(original_dim, activation="sigmoid")(x)
    decoder = tf.keras.Model(inputs=latent_inputs, outputs=outputs, name="decoder")
    
    # Define VAE model.
    outputs = decoder(z)
    vae = tf.keras.Model(inputs=original_inputs, outputs=outputs, name="vae")
    
    # Add KL divergence regularization loss.
    kl_loss = -0.5 * tf.reduce_mean(z_log_var - tf.square(z_mean) - tf.exp(z_log_var) + 1)
    vae.add_loss(kl_loss)
    
    # Train.
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
    vae.compile(optimizer, loss=tf.keras.losses.MeanSquaredError())
    vae.fit(x_train, x_train, epochs=3, batch_size=64)
    函数式API训练模型
    View Code

    补充知识:自定义层

    如果自定义模型层没有需要被训练的参数,一般推荐使用Lamda层实现。

    mypower = layers.Lambda(lambda x:tf.math.pow(x,2))
    mypower(tf.range(5))

    如果自定义模型层有需要被训练的参数,则可以通过继承Layer基类实现。Layer的子类化一般需要重新实现初始化方法,Build方法和Call方法。如果built = False,调用__call__时会先调用build方法, 再调用call方法。

    损失函数

      一般来说,损失值 由 损失函数 和 正则化项组成(Objective = Loss + Regularization)。对于keras模型,正则化项一般在各层中指定,例如使用Dense的 kernel_regularizer 和 bias_regularizer 等参数指定权重使用 L1 或者 L2 正则化项,此外还可以用kernel_constraint 和 bias_constraint等参数约束权重的取值范围,这也是一种正则化手段。

    损失函数在 model.compile 时候指定。

    • 对于回归模型,通常使用的损失函数是平方损失函数 mean_squared_error,简写为 mse,类实现形式为 MeanSquaredError 和 MSE
    • 对于二分类模型,通常使用的是二元交叉熵损失函数 binary_crossentropy。
    • 对于多分类模型,如果label是one-hot编码的,则使用交叉熵损失函数 categorical_crossentropy。如果label是序号编码的,则需要使用稀疏类别交叉熵损失函数 sparse_categorical_crossentropy。
    • 如果有需要,也可以自定义损失函数,自定义损失函数需要接收两个张量y_true,y_pred作为输入参数,并输出一个标量作为损失函数值。

    常见的Loss可以参看Tensorflow的官网:tf.keras.losses

    自定义损失

    keras提供了两种自定义损失:

    函数实现:输入 y_true 和 y_pred,返回 损失值

    def custom_mean_squared_error(y_true, y_pred):
        return tf.math.reduce_mean(tf.square(y_true - y_pred))
    model.compile(optimizer=..., loss=custom_mean_squared_error)

    类实现:继承 tf.keras.losses.Loss ,在init中初始化参数,在call中重写计算逻辑。

    复制代码
    class CustomMSE(keras.losses.Loss):
        def __init__(self, regularization_factor=0.1, name="custom_mse"):
            super().__init__(name=name)
            self.regularization_factor = regularization_factor
    
        def call(self, y_true, y_pred):
            mse = tf.math.reduce_mean(tf.square(y_true - y_pred))
            reg = tf.math.reduce_mean(tf.square(0.5 - y_pred))
            return mse + reg * self.regularization_factor
    
    model.compile(optimizer=..., loss=CustomMSE())
    复制代码

    度量函数

      人们通常会通过度量函数来从另一个方面 评估模型的好坏,可以在 model.compile 时,通过列表形式指定单/多个评估指标。

    • model.compile 时,可以
    • 也可以自定义评估指标。自定义评估指标需要接收两个张量y_true,y_pred作为输入参数,并输出一个标量作为评估值。
    • 也可以继承 tf.keras.metrics.Metric 自定义度量方法,update_state方法,result方法实现评估指标的计算逻辑,从而得到评估指标的类的实现形式。

    自定义度量

      继承 tf.keras.metrics.Metric,并且实现4个类方法:

    • __init__(self):创建状态变量
    • update_state(self, y_true, y_pred, sample_weight=None):根据 y_true 和 y_pred 来更新状态变量
    • result(self):使用状态变量来计算 最终结果
    • reset_state(self):重新初始化状态变量
    class SnrCustomMetrics(metrics.Metric):
        def __init__(self, name="snr_metrics", **kwargs):
            # 创建状态变量
            super(SnrCustomMetrics, self).__init__(name=name, **kwargs)
            self.metrics_value = self.add_weight(name="snr_value", initializer="zeros")
    
        def update_state(self, y_true, y_pred, sample_weight=None):
            """使用目标 y_true 和模型预测 y_pred 来更新状态变量
            :param y_true: target
            :param y_pred: 预测值
            :param sample_weight: 权重
            """
            labels_pow = tf.pow(y_true, 2)
            noise_pow = tf.pow(y_pred - y_true, 2)
            snr_value = 10 * tf.math.log(tf.reduce_sum(labels_pow) / tf.reduce_sum(noise_pow)) / tf.math.log(10.)
    
            if sample_weight is not None:
                sample_weight = tf.cast(sample_weight, "float32")
                snr_value = tf.multiply(snr_value, sample_weight)
            self.metrics_value.assign_add(snr_value)
    
        # 使用状态变量来计算最终结果
        def result(self):
            return self.metrics_value
    
        # metrics 的状态将在每个epoch开始时重置。
        def reset_state(self):
            self.metrics_value.assign(0.0)
    
    model.compile(optimizer=..., loss=..., metrics=[SnrCustomMetrics()])
    View Code

    优化器

      机器学习界有一群炼丹师,他们每天的日常是:拿来药材(数据),架起八卦炉(模型),点着六味真火(优化算法),就摇着蒲扇等着丹药出炉了。不过,当过厨子的都知道,同样的食材,同样的菜谱,但火候不一样了,这出来的口味可是千差万别。火小了夹生,火大了易糊,火不匀则半生半糊。机器学习也是一样,模型优化算法的选择直接关系到最终模型的性能。有时候效果不好,未必是特征的问题或者模型设计的问题,很可能就是优化算法的问题。

      深度学习优化算法大概经历了 SGD -> SGDM -> NAG ->Adagrad -> Adadelta(RMSprop) -> Adam -> Nadam 这样的发展历程。model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)

    • SGD:默认参数为纯SGD, 设置momentum参数不为0实际上变成SGDM, 考虑了一阶动量, 设置 nesterov为True后变成NAG,即 Nesterov Acceleration Gradient,在计算梯度时计算的是向前走一步所在位置的梯度
    • Adagrad:考虑了二阶动量,对于不同的参数有不同的学习率,即自适应学习率。缺点是学习率单调下降,可能后期学习速率过慢乃至提前停止学习
    • RMSprop:考虑了二阶动量,对于不同的参数有不同的学习率,即自适应学习率,对Adagrad进行了优化,通过指数平滑只考虑一定窗口内的二阶动量
    • Adadelta:考虑了二阶动量,与RMSprop类似,但是更加复杂一些,自适应性更强
    • Adam:同时考虑了一阶动量和二阶动量,可以看成RMSprop上进一步考虑了Momentum
    • Nadam:在Adam基础上进一步考虑了 Nesterov Acceleration

      对于一般新手炼丹师,优化器直接使用Adam,并使用其默认参数就OK了。一些爱写论文的炼丹师由于追求评估指标效果,可能会偏爱前期使用Adam优化器快速下降,后期使用SGD并精调优化器参数得到更好的结果。此外目前也有一些前沿的优化算法,据称效果比Adam更好,例如LazyAdam,Look-ahead,RAdam,Ranger等。

      初始化优化器时会创建一个变量optimier.iterations用于记录迭代的次数。因此优化器和tf.Variable一样,一般在@tf.function外创建。

    优化器主要使用apply_gradients方法传入变量和对应梯度从而来对给定变量进行迭代,

    复制代码
    optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
    
    with tf.GradientTape() as tape:
        ...
    grads = tape.gradient(loss, model.trainable_weights)  # 根据损失 求梯度
    optimizer.apply_gradients(zip(grads, model.trainable_weights))  # 根据梯度 优化模型
    复制代码

    或者直接使用minimize方法对目标函数进行迭代优化。

    复制代码
    @tf.function
    def train(epoch=1000):
        for _ in tf.range(epoch):
            optimizer.minimize(loss, model.trainable_weights)
        tf.print("epoch = ", optimizer.iterations)
        return loss
    复制代码

    当然,更常见的使用是在编译时将优化器传入model.fit

    model.compile(optimizer=optimizers.SGD(learning_rate=0.01), loss=loss)

    回调函数

      tf.keras的回调函数实际上是一个类,一般是在model.fit时作为参数指定,一般收集一些日志信息,改变学习率等超参数,提前终止训练过程等等

    大部分时候,keras.callbacks子模块中定义的回调函数类已经足够使用了,如果有特定的需要,我们也可以通过对keras.callbacks.Callbacks实施子类化构造自定义的回调函数

    • BaseLogger: 收集每个epoch上metrics平均值,对stateful_metrics参数中的带中间状态的指标直接拿最终值无需对各个batch平均,指标均值结果将添加到logs变量中。该回调函数被所有模型默认添加,且是第一个被添加的
    • History: 将BaseLogger计算的各个epoch的metrics结果记录到history这个dict变量中,并作为model.fit的返回值。该回调函数被所有模型默认添加,在BaseLogger之后被添加
    • EarlyStopping: 当被监控指标在设定的若干个epoch后没有提升,则提前终止训练
    • TensorBoard: 为Tensorboard可视化保存日志信息。支持评估指标,计算图,模型参数等的可视化
    • ModelCheckpoint: 在每个epoch后保存模型
    • ReduceLROnPlateau:如果监控指标在设定的若干个epoch后没有提升,则以一定的因子减少学习率
    • TerminateOnNaN:如果遇到loss为NaN,提前终止训练
    • LearningRateScheduler:学习率控制器。给定学习率lr和epoch的函数关系,根据该函数关系在每个epoch前调整学习率
    • CSVLogger:将每个epoch后的logs结果记录到CSV文件中
    • ProgbarLogger:将每个epoch后的logs结果打印到标准输出流中

    EarlyStopping

    当监控的指标停止改进时停止训练

    tf.keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=0, patience=0, verbose=0, mode='auto', baseline=None, restore_best_weights=False)
    • monitor:要监控的 对象
    • min_delta:被监测对象的最小变化 小于 min_delta 的绝对变化,将被视为没有改进
    • patience:如果经过num个epoch,监控对象没有改善,则停止训练
    • verbose:详细模式。1打印过程信息,0不打印
    • mode:{"auto", "min", "max"}之一。
      • min,当监控的数量停止减少时,培训停止
      • max,当监控的数量停止增加时,它将停止
      • auto,方向会从被监控对象的名称中自动推断出来
    • restore_best_weights:是否从监测量的最佳值的epoch恢复模型权重。如果为 False,则使用在训练的最后一步获得的模型权重

    ModelCheckpoint

    以某种频率保存 Keras 模型或模型权重

    tf.keras.callbacks.ModelCheckpoint(
        filepath, monitor='val_loss', verbose=0, save_best_only=False,
        save_weights_only=False, mode='auto', save_freq='epoch',
        options=None, initial_value_threshold=None, **kwargs
    )
    • filepath:保存模型文件的路径
    • monitor:监控指标
    • save_best_only:是否仅保存最佳模型
    • save_weights_only:是否仅保存模型权重
    • save_freq:epoch 或整数

    Tensorboard

      Tensorboard有助于追踪模型训练过程的Scalars、Graphs、Distributions等等

    • Scalars:显示损失和指标在每个时期如何变化。 还可以使用它来跟踪训练速度,学习率和其他标量值。
    • Graphs:可帮助您可视化模型。
    • DistributionsHistograms 显示张量随时间的分布。 可以 可视化权重和偏差并验证它们是否以预期的方式变化
    tf.keras.callbacks.TensorBoard(
        log_dir='logs', histogram_freq=0, write_graph=True,
        write_images=False, write_steps_per_second=False, update_freq='epoch',
        profile_batch=0, embeddings_freq=0, embeddings_metadata=None, **kwargs
    )
    • log_dir:TensorBoard的日志路径
    • histogram_freq:计算模型层的激活和权重直方图的频率(以时期为单位)。如果设置为 0,则不会计算直方图。
    • write_graph:是否在 TensorBoard 中可视化graph。
    • write_steps_per_second:是否将每step的训练步数记录到 Tensorboard 中。这支持epoch和batch频率记录。
    • update_freq:'batch'或'epoch'或整数。使用 时'batch',在每批之后将损失和指标写入 TensorBoard。

    在model.fit中使用

      当使用Model.fit() 函数进行训练时, 添加 tf.keras.callback.TensorBoard 回调函数可确保创建和存储日志

    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir="./event_file")
    model.fit(... 
              callbacks=[tensorboard_callback])

    在自定义训练框架中使用

    复制代码
    summary_writer = tf.summary.create_file_writer("./event_file")
    with summary_writer.as_default():
        tf.summary.scalar('train/loss', train_loss, step=epoch)
        tf.summary.scalar('train/accuracy', train_accuracy, step=epoch)
        tf.summary.scalar('val/loss', val_loss, step=epoch)
        tf.summary.scalar('val/accuracy', val_accuracy, step=epoch)
    复制代码

    启动Tensorboard,在当前文件夹中,cmd运行:

    tensorboard --logdir "./"

    训练模型

    训练模型通常有3种方法,

    • 内置fit方法:支持对numpy array,tf.data.Dataset以及 Python generator数据进行训练,并且可以通过设置回调函数实现对训练过程的复杂控制逻辑。
    • 内置train_on_batch方法
    • 自定义训练循环

    model.fit

    model.fit(
        x=None, y=None, batch_size=None, epochs=1, verbose='auto',
        callbacks=None, validation_split=0.0, validation_data=None, shuffle=True,
        initial_epoch=0, steps_per_epoch=None)

    参数:

    • x:输入数据,可以是:
      • numpy数组、数组列表(如果模型有多个输入)
      • Tensorflow张量或张量列表(如果模型有多个输入)
      • 如果模型具有命名输入,则将输入名称映射到相应的数组/张量的字典。
      • tf.data数据集,应该返回一个(inputs, targets)或 的元组(inputs, targets, sample_weights)
      • 生成器或keras.utils.Sequence返回(inputs, targets) 或(inputs, targets, sample_weights)。
    • y:目标数据。与输入数据一样x,它可以是 Numpy 数组或 TensorFlow 张量。它应该是一致的x(你不能有 Numpy 输入和张量目标,或者相反)。如果x是数据集、生成器或keras.utils.Sequence实例,y则不应指定(因为目标将从 获取x)。
    • batch_size:每次梯度更新的样本数。如果未指定,将默认为 32。如果您的数据是数据集、生成器或实例的形式(因为它们生成批次),请不要指定。
    • epochs:训练模型的周期数
    • verbose:'auto'、0、1 或 2。详细模式。0 =静默,1 = 进度条,2 = 每个 epoch 一行。'auto' 在大多数情况下默认为 1
    • callbacks:训练期间调用的回调列表。tf.keras.callbacks
    • validation_split:在 0 和 1 之间浮动。从训练数据集中分离一部分数据用于验证,并将在每个 epoch 结束时评估该数据的损失和指标
    • validation_data:验证数据集
    • shuffle:布尔值(是否在每个 epoch 之前对训练数据进行洗牌)
    • initial_epoch:整数。开始训练的epoch(对于恢复之前的训练运行很有用)。

    返回:history,调用 history.history 可以查看训练期间损失值和度量值的记录

    train_on_batch

      该内置方法相比较fit方法更加灵活,可以不通过回调函数而直接在batch层次上更加精细地控制训练的过程。

    复制代码
    for epoch in tf.range(1, epoches + 1):
        for x, y in ds_train:
            train_result = model.train_on_batch(x, y)
    
        for x, y in ds_valid:
            valid_result = model.test_on_batch(x, y, reset_metrics=False)
    复制代码

    自定义训练循环

     自定义训练循环无需编译模型,直接利用优化器根据损失函数反向传播迭代参数,拥有最高的灵活性。

    训练循环包括按顺序重复执行三个任务:

    • 给模型输入batch数据以生成输出
    • 通过将输出与标签进行比较来计算损失
    • 使用GradientTape计算梯度
    • 使用这些梯度优化变量
    复制代码
    optimizer = keras.optimizers.SGD(learning_rate=1e-3)  # 实例化一个优化器
    loss_fn = keras.losses.BinaryCrossentropy()  # 实例化损失函数
    
    train_loss = keras.metrics.Mean(name='train_loss')
    valid_loss = keras.metrics.Mean(name='valid_loss')
    
    train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy')
    valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy')
    
    
    @tf.function
    def train_step(features, labels):
        with tf.GradientTape() as tape:
            logits = model(features, training=True)
            loss_value = loss_fn(labels, logits)
            # loss_value += sum(model.losses)   # 添加额外的损失
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
    
        train_loss.update_state(loss_value)
        train_metric.update_state(labels, logits)
    
    
    @tf.function
    def valid_step(features, labels):
        val_logits = model(features, training=False)
    
        loss_value = loss_fn(labels, val_logits)
        valid_loss.update_state(loss_value)
        valid_metric.update_state(labels, val_logits)
    
    
    epochs = 2
    for epoch in range(epochs):
        start_time = time.time()
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            loss_value = train_step(x_batch_train, y_batch_train)
        # 在每个epoch结束时运行验证循环
        for x_batch_val, y_batch_val in val_dataset:
            valid_step(x_batch_val, y_batch_val)
    
        if epoch % 5 == 0:
            print('Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'.format(epoch, train_loss.result(),
                                                                                        train_metric.result(),
                                                                                        valid_loss.result(),
                                                                                        valid_metric.result()))
        train_loss.reset_states()
        valid_loss.reset_states()
        train_metric.reset_states()
        valid_metric.reset_states()
    
        print("运行时间: %.2fs" % (time.time() - start_time))
    复制代码

    评估模型

    通过自定义训练循环训练的模型没有经过编译,无法直接使用model.evaluate(ds_valid)方法

    model.evaluate(x = x_test,y = y_test)

    推理模型

    可以使用以下方法:

    model.predict(ds_test)

    model(x_test)

    model.call(x_test)

    model.predict_on_batch(x_test)

    推荐优先使用model.predict(ds_test)方法,既可以对Dataset,也可以对Tensor使用。

    保存和加载模型

      可以使用Keras方式保存模型,也可以使用TensorFlow原生方式保存。前者仅仅适合使用Python环境恢复模型,后者则可以跨平台进行模型部署。推荐使用后一种方式进行保存。

    Keras保存和加载模型

    保存的模型包括:

    • 模型架构 / 配置
    • 模型权重值
    • 模型的编译信息(如果 compile() 被调用)
    • 优化器及其状态(如果有)(这使您可以在离开的地方重新开始训练)

    保存和加载 模型

    低级API   tf.saved_model 

    • 保存模型: tf.saved_model.save(model, path_to_dir) 
    • 加载模型: model = tf.saved_model.load(path_to_dir) 

    高级API  tf.keras.Model 

    • 保存模型:
      •  model.save(SaveModel_path) 
      •  tf.keras.models.save_model(model, SaveModel_path) 
    • 加载模型:
      •  model = tf.keras.models.load_model(SaveModel_path)  
    SaveModel_path = 'path/to/location'
    
    model.save(SaveModel_path)  
    # 或者 
    tf.keras.models.save_model(model, SaveModel_path)  

    加载模型

    model = tf.keras.models.load_model(SaveModel_path)  # 加载模型

    我们可以使用两种格式将整个模型保存到磁盘:

    • SavedModel 格式(推荐):
      • 模型架构 / 配置
      • 模型权重值
      • 模型的编译信息(如果 compile() 被调用,回调函数等)
      • 优化器及其状态(如果有)(这使您可以在离开的地方重新开始训练)
    •  Keras H5 格式(较旧的):包含模型架构、权重值和compile()信息。它是SavedModel的轻量级替代品。
      • 通过model.add_loss()和model.add_metric()添加的外部损失和度量 不会被保存
      • 自定义对象(如自定义层)的计算图不包含在保存的h5文件中。在加载时,Keras需要访问这些对象的Python类/函数来重构模型。

    我们可以通过以下方式保存 H5 格式:

    1. 传递save_format='h5'给save()
    2. 或者以.h5或.keras结尾的文件名传递给save()

    只保存模型结构

    json_str = model.to_json()  # 保存模型结构
    model_json = models.model_from_json(json_str)  # 恢复模型结构

    或者只保存模型权重

    复制代码
    # 保存模型权重
    model.save_weights(...)
    
    # 恢复模型结构
    model_json = models.model_from_json(json_str)
    model_json.compile(...)
    
    # 加载权重
    model_json.load_weights(...)
    复制代码

    TensorFlow原生方式保存和加载

    保存模型结构与模型参数到文件,该方式保存的模型具有跨平台性便于部署

    model.save(..., save_format="tf")   # 保存模型
    model_loaded = tf.keras.models.load_model(...)  # 加载模型

    也可以仅保存权重

    model.save_weights(...,save_format = "tf")

    GPU训练

    指定GPU训练

    深度学习的训练过程常常非常耗时,一个模型训练几个小时是家常便饭,训练几天也是常有的事情,有时候甚至要训练几十天。

    训练过程的耗时主要来自于两个部分,一部分来自数据准备,另一部分来自参数更新。

      数据准备过程可以使用更多进程处理数据来缩减时间。参数更新时间可以应用GPU或者Google的TPU来进行加速。

      当存在可用的GPU时,如果不特意指定device,tensorflow会自动优先选择使用GPU来创建张量和执行张量计算。但如果是在公司或者学校实验室的服务器环境,存在多个GPU和多个使用者时,为了不让单个同学的任务占用全部GPU资源导致其他同学无法使用(tensorflow默认获取全部GPU的全部内存资源权限,但实际上只使用一个GPU的部分资源),我们通常会在开头增加以下几行代码以控制每个任务使用的GPU编号和显存大小,以便其他同学也能够同时训练模型。

    复制代码
    gpus = tf.config.list_physical_devices("GPU")
    tf.print(gpus)
    # [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'),
    #  PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU'),
    #  PhysicalDevice(name='/physical_device:GPU:2', device_type='GPU'),
    #  PhysicalDevice(name='/physical_device:GPU:3', device_type='GPU')]
    
    if gpus:
        gpu0 = gpus[0]  # 如果有多个GPU,仅使用第0个GPU
        tf.config.experimental.set_memory_growth(gpu0, True)  # 设置GPU显存用量按需使用
        # 或者也可以设置GPU显存为固定使用量(例如:4G)
        # tf.config.experimental.set_virtual_device_configuration(gpu0,
        #    [tf.config.experimental.VirtualDeviceConfiguration(memory_limit=4096)])
        tf.config.set_visible_devices([gpu0], "GPU")
    复制代码

    单机多卡训练

      TensorFlow 在 tf.distribute.MirroredStrategy  中为我们提供了单机多卡训练策略,使用这种策略时,我们只需实例化一个 MirroredStrategy 策略,并将模型构建的代码放入 strategy.scope() 的上下文环境中:

    strategy = tf.distribute.MirroredStrategy()
    
    with strategy.scope():
        # 模型构建代码

    可以在参数中指定设备,如:

    # 指定只使用第 0、1 号 GPU 参与分布式策略。
    strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"])

    MirroredStrategy 的步骤如下:

    • 训练开始前,该策略在所有 N 个计算设备上均各复制一份完整的模型;
    • 每次训练传入一个batch的数据时,将数据分成 N 份,分别传入 N 个计算设备(即数据并行);
    • N 个计算设备使用本地变量(镜像变量)分别计算自己所获得的部分数据的梯度;
    • 使用分布式计算的 All-reduce 操作,在计算设备间高效交换梯度数据并进行求和,使得最终每个设备都有了所有设备的梯度之和;
    • 使用梯度求和的结果更新本地变量(镜像变量);
    • 当所有设备均更新本地变量后,进行下一轮训练(即该并行策略是同步的)。

    默认情况下,TensorFlow 中的 MirroredStrategy 策略使用 NVIDIA NCCL 进行 All-reduce 操作。

    tf.distribute.Strategy和model.fit

    tf.distribute.Strategy被集成到tf.keras,tf.keras是用于构建和训练模型的高级 API。您可以使用 Model.fit 来无缝分布式训练模型

    TensorFlow 分布策略支持所有类型的 Keras 模型——Sequential、Functional和subclassed。以下是您需要在代码中更改的内容:

    • 创建实例 tf.distribute.Strategy。
    • 在strategy.scope中创建 Keras 模型、优化器和度量
    复制代码
    num_epochs = 5
    batch_size_per_replica = 64 # 每个显卡上的batch数
    learning_rate = 0.001
    
    strategy = tf.distribute.MirroredStrategy()
    print('Number of devices: %d' % strategy.num_replicas_in_sync)  # 输出设备数量
    batch_size = batch_size_per_replica * strategy.num_replicas_in_sync     # 总batch_size
    
    dataset = ...
    
    with strategy.scope():
        model = tf.keras.applications.MobileNetV2(weights=None, classes=2)
        model.compile(
            optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
            loss=tf.keras.losses.sparse_categorical_crossentropy,
            metrics=[tf.keras.metrics.sparse_categorical_accuracy]
        )
    
    model.fit(dataset, epochs=num_epochs)
    复制代码

    分布式训练汇总

    import os
    import tensorflow as tf
    import tensorflow_datasets as tfds
    
    strategy = tf.distribute.MirroredStrategy()
    print('设备数量: {}'.format(strategy.num_replicas_in_sync))
    
    epochs = 12
    batch_size_per_replica = 64
    batch_size = batch_size_per_replica * strategy.num_replicas_in_sync
    
    
    # 数据集
    def scale(image, label):
        image = tf.cast(image, tf.float32)
        image /= 255
    
        return image, label
    
    
    datasets, info = tfds.load(name='mnist', with_info=True, as_supervised=True)
    mnist_train, mnist_test = datasets['train'], datasets['test']
    train_dataset = mnist_train.map(scale).cache().shuffle(10000).batch(batch_size)
    eval_dataset = mnist_test.map(scale).batch(batch_size)
    
    with strategy.scope():
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(28, 28, 1)),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10)
        ])
    
        model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                      optimizer=tf.keras.optimizers.Adam(),
                      metrics=['accuracy'])
    
    checkpoint_dir = './training_checkpoints'  # 定义用于存储检查点的检查点目录
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt_{epoch}")  # 定义检查点文件的名称
    
    
    # 定义一个函数来衰减学习速率。
    def decay(epoch):
        if epoch < 3:
            return 1e-3
        elif epoch >= 3 and epoch < 7:
            return 1e-4
        else:
            return 1e-5
    
    
    # 定义一个回调函数,用于在每个epoch的末尾打印学习速率
    class PrintLR(tf.keras.callbacks.Callback):
        def on_epoch_end(self, epoch, logs=None):
            print('\nepoch的 {} 学习率是 {}'.format(epoch + 1, model.optimizer.lr.numpy()))
    
    
    # 把所有的回调放在一起
    callbacks = [tf.keras.callbacks.TensorBoard(log_dir='./logs'),
                 tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint_prefix,
                                                    save_weights_only=True),
                 tf.keras.callbacks.LearningRateScheduler(decay),
                 PrintLR()]
    
    # 训练和评估
    model.fit(train_dataset, epochs=epochs, callbacks=callbacks)
    View Code

    保存为SavedModel格式模型,您可以使用或不使用Strategy.scope.

    path = 'saved_model/'
    model.save(path, save_format='tf')
    
    # 加载模型,不Strategy.scope
    unreplicated_model = tf.keras.models.load_model(path)
    unreplicated_model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.Adam(),
        metrics=['accuracy'])
    
    eval_loss, eval_acc = unreplicated_model.evaluate(eval_dataset)
    print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
    
    
    
    # 加载模型Strategy.scope
    with strategy.scope():
        replicated_model = tf.keras.models.load_model(path)
        replicated_model.compile(loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                                 optimizer=tf.keras.optimizers.Adam(),
                                 metrics=['accuracy'])
    
        eval_loss, eval_acc = replicated_model.evaluate(eval_dataset)
        print('Eval loss: {}, Eval Accuracy: {}'.format(eval_loss, eval_acc))
    View Code

    tf.distribute.Strategy和自定义训练循环

    1、在mirrored_strategy.scope() 内创建模型和优化器

    with mirrored_strategy.scope():
      model = tf.keras.Sequential([tf.keras.layers.Dense(1, input_shape=(1,))])
      optimizer = tf.keras.optimizers.SGD()

    2、调用 tf.distribute.Strategy.experimental_distribute_dataset 创建分布式数据集

    dataset = tf.data.Dataset.from_tensors(([1.], [1.])).repeat(100).batch(global_batch_size)
    dist_dataset = mirrored_strategy.experimental_distribute_dataset(dataset)

    3、使用 tf.nn.compute_average_loss 计算损失。tf.nn.compute_average_loss对每个样本损失求和并将总和除以global_batch_size。

    def compute_loss(labels, predictions):
        per_example_loss = loss_object(labels, predictions)
        return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

    4、将train_step放入 tf.distribute.Strategy.run 中,并传入之前创建的数据集

    5、使用 tf.distribute.Strategy.reduce 来聚合 tf.distribute.Strategy.run。tf.distribute.Strategy.run返回每个GPU结果。您还可以tf.distribute.Strategy.experimental_local_results获取结果值列表。

    复制代码
    def train_step(inputs):
        features, labels = inputs
    
        with tf.GradientTape() as tape:
            predictions = model(features, training=True)
            loss = compute_loss(labels, predictions)
    
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        return loss
    
    
    @tf.function
    def distributed_train_step(dist_inputs):
        per_replica_losses = mirrored_strategy.run(train_step, args=(dist_inputs,))
        return mirrored_strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    复制代码

    6、迭代dist_dataset并循环运行训练:

    for dist_inputs in dist_dataset:
        print(distributed_train_step(dist_inputs))

    分布式训练汇总

    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    import numpy as np
    import os
    
    
    strategy = tf.distribute.MirroredStrategy()     # 实例化MirroredStrategy
    print('设备数量: {}'.format(strategy.num_replicas_in_sync))
    
    epochs = 10
    batch_size_per_replica = 64  # 每个GPU上得batch数
    global_batch_size = batch_size_per_replica * strategy.num_replicas_in_sync  # 总batch数
    
    # 数据集
    fashion_mnist = tf.keras.datasets.fashion_mnist
    (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data()
    train_images = train_images[..., None]
    test_images = test_images[..., None]
    
    # 获取[0,1]范围内的图像
    train_images = train_images / np.float32(255)
    test_images = test_images / np.float32(255)
    
    buffer_size = len(train_images)
    
    train_dataset = tf.data.Dataset.from_tensor_slices((train_images, train_labels)).shuffle(buffer_size).batch(
        global_batch_size)
    test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)
    
    train_dist_dataset = strategy.experimental_distribute_dataset(train_dataset)    # 分布式数据集
    test_dist_dataset = strategy.experimental_distribute_dataset(test_dataset)      # 分布式数据集
    
    # 创建模型
    def create_model():
        model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(32, 3, activation='relu'),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Conv2D(64, 3, activation='relu'),
            tf.keras.layers.MaxPooling2D(),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(64, activation='relu'),
            tf.keras.layers.Dense(10)
        ])
    
        return model
    
    
    # 创建一个检查点目录来存储检查点
    checkpoint_dir = './training_checkpoints'
    checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
    
    
    with strategy.scope():
        # 创建训练损失
        # 将reduction设置为“none”,这样我们可以在之后进行reduction,并除以全局batch size
        loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
            from_logits=True,
            reduction=tf.keras.losses.Reduction.NONE)
    
        def compute_loss(labels, predictions):
            per_example_loss = loss_object(labels, predictions)
            return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)
    
        test_loss = tf.keras.metrics.Mean(name='test_loss')     # 创建测试损失
        train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')  # 训练精度
        test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')  # 测试精度
    
        # 模型、优化器和检查点必须创建在 'strategy.scope' 下。
        model = create_model()
        optimizer = tf.keras.optimizers.Adam()
        checkpoint = tf.train.Checkpoint(optimizer=optimizer, model=model)
    
    
    # 训练step
    def train_step(inputs):
        images, labels = inputs
    
        with tf.GradientTape() as tape:
            predictions = model(images, training=True)
            loss = compute_loss(labels, predictions)
    
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    
        train_accuracy.update_state(labels, predictions)
        return loss
    
    
    # 测试step
    def test_step(inputs):
        images, labels = inputs
    
        predictions = model(images, training=False)
        t_loss = loss_object(labels, predictions)
    
        test_loss.update_state(t_loss)
        test_accuracy.update_state(labels, predictions)
    
    
    # 分布式训练
    @tf.function
    def distributed_train_step(dataset_inputs):
        per_replica_losses = strategy.run(train_step, args=(dataset_inputs,))
        return strategy.reduce(tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None)
    
    
    # 分布式测试
    @tf.function
    def distributed_test_step(dataset_inputs):
        return strategy.run(test_step, args=(dataset_inputs,))
    
    
    for epoch in range(epochs):
        # 训练循环
        total_loss = 0.0
        num_batches = 0
        for x in train_dist_dataset:
            total_loss += distributed_train_step(x)
            num_batches += 1
        train_loss = total_loss / num_batches
    
        # 测试循环
        for x in test_dist_dataset:
            distributed_test_step(x)
    
        if epoch % 10 == 0:
            checkpoint.save(checkpoint_prefix)
    
        print("Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}".format(epoch + 1, train_loss,
                                                                                          train_accuracy.result() * 100,
                                                                                          test_loss.result(),
                                                                                          test_accuracy.result() * 100))
    
        test_loss.reset_states()
        train_accuracy.reset_states()
        test_accuracy.reset_states()
    View Code

    tf.distribute.Strategy 可以再没有strategy的情况下恢复最新的检查点并测试

    test_dataset = tf.data.Dataset.from_tensor_slices((test_images, test_labels)).batch(global_batch_size)
    eval_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='eval_accuracy')
    
    new_model = create_model()
    new_optimizer = tf.keras.optimizers.Adam()
    
    
    @tf.function
    def eval_step(images, labels):
        predictions = new_model(images, training=False)
        eval_accuracy(labels, predictions)
    
    
    checkpoint = tf.train.Checkpoint(optimizer=new_optimizer, model=new_model)
    checkpoint.restore(tf.train.latest_checkpoint(checkpoint_dir))
    
    for images, labels in test_dataset:
        eval_step(images, labels)
    
    print('在没有strategy的情况下恢复保存的模型后的准确性: {}'.format(eval_accuracy.result() * 100))
    View Code

    汇总

    keras训练汇总

     

    自定义训练汇总

    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    import numpy as np
    import time
    
    # 超参数
    lr = 1e-3   # 学习率
    batch_size = 64
    epochs = 2
    
    # 数据准备
    (x_train, y_train), (x_test, y_test) = keras.datasets.mnist.load_data()
    x_train = np.reshape(x_train, (-1, 784))
    x_test = np.reshape(x_test, (-1, 784))
    
    # 保留1万个样品用于验证
    x_val = x_train[-10000:]
    y_val = y_train[-10000:]
    x_train = x_train[:-10000]
    y_train = y_train[:-10000]
    
    # 准备训练数据集
    train_dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
    train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
    
    # 准备验证数据集
    val_dataset = tf.data.Dataset.from_tensor_slices((x_val, y_val))
    val_dataset = val_dataset.batch(batch_size)
    
    # 搭建模型
    inputs = keras.Input(shape=(784,), name="digits")
    x1 = layers.Dense(64, activation="relu")(inputs)
    x2 = layers.Dense(64, activation="relu")(x1)
    outputs = layers.Dense(10, name="predictions")(x2)
    model = keras.Model(inputs=inputs, outputs=outputs)
    
    optimizer = keras.optimizers.SGD(learning_rate=lr)    # 实例化一个优化器
    loss_fn = keras.losses.SparseCategoricalCrossentropy(from_logits=True)  # 实例化损失函数
    
    train_metric = keras.metrics.BinaryAccuracy(name='train_accuracy')
    valid_metric = keras.metrics.BinaryAccuracy(name='valid_accuracy')
    
    @tf.function
    def train_step(x, y):
        with tf.GradientTape() as tape:
            logits = model(x, training=True)
            loss_value = loss_fn(y, logits)
            # loss_value += sum(model.losses)   # 添加额外的损失
        grads = tape.gradient(loss_value, model.trainable_weights)
        optimizer.apply_gradients(zip(grads, model.trainable_weights))
        train_metric.update_state(y, logits)
        return loss_value
    
    @tf.function
    def test_step(x, y):
        val_logits = model(x, training=False)
        valid_metric.update_state(y, val_logits)
    
    
    for epoch in range(epochs):
        start_time = time.time()
        for step, (x_batch_train, y_batch_train) in enumerate(train_dataset):
            loss_value = train_step(x_batch_train, y_batch_train)
    
            if step % 200 == 0:
                print("训练损失( %d: %.4f" % (step, float(loss_value)))
    
        train_acc = train_metric.result()
        print("训练精度: %.4f" % (float(train_acc),))
        train_metric.reset_states()  # 在每个epoch结束时重置训练指标
    
        # 在每个epoch结束时运行验证循环
        for x_batch_val, y_batch_val in val_dataset:
            test_step(x_batch_val, y_batch_val)
    
        val_acc = valid_metric.result()
        valid_metric.reset_states()
        print("验证精度: %.4f" % (float(val_acc),))
        print("运行时间: %.2fs" % (time.time() - start_time))
    View Code

    模型部署

    tensorflow-serving

    TensorFlow Lite

      TensorFlow Lite 是 TensorFlow 在移动和 IoT 等边缘设备端的解决方案,提供了 Java、Python 和 C++ API 库,可以运行在 Android、iOS 和 Raspberry Pi 等设备上。AI技术在边缘设备上的应用,TFLite 将会是愈发重要的角色。

      目前 TFLite 只提供了推理功能,在服务器端进行训练后,经过如下简单处理即可部署到边缘设备上。

    • 模型转换:由于边缘设备计算等资源有限,使用 TensorFlow 训练好的模型,模型太大、运行效率比较低,不能直接在移动端部署,需要通过相应工具进行转换成适合边缘设备的格式。
    • 边缘设备部署:本节以 android 为例,简单介绍如何在 android 应用中部署转化后的模型,完成 Mnist 图片的识别。

    参考

    【电子书】简单粗暴 TensorFlow 2

    【知乎】最全Tensorflow2.0 入门教程持续更新

    【和鲸社区】30天吃掉那只TensorFlow2.0 | Github

    【书籍】TensorFlow 2深度学习开源书 | PDF下载 提取码:juqs

    【bilibili】tensorflow2.0入门与实战 2019年最通俗易懂的课程

    【bilibili】神经网络与深度学习——TensorFlow2.0实战【中文课程】

    【github】TensorFlow-Examples 

    【github】TensorFlow-2.x-Tutorials 

    作者:凌逆战
    欢迎任何形式的转载,但请务必注明出处。
    限于本人水平,如果文章和代码有表述不当之处,还请不吝赐教。
    本文章不做任何商业用途,仅作为自学所用,文章后面会有参考链接,我可能会复制原作者的话,如果介意,我会修改或者删除。

  • 相关阅读:
    [vue]——vue3.0+高德地图的正确打开方式
    java计算机毕业设计springboot+vue宠物服务管理系统
    做着做着感觉自媒体做不下去了?
    第二次授课内容
    如何开发一款基于 vite+vue3 的在线表格系统(下)
    uni-app : 生成三位随机数、自定义全局变量、自定义全局函数、传参、多参数返回值
    用Python蹭别人家图片接口,做一个【免费图床】吧
    【uni-app系列】uni-app之nvue使用
    C++类和对象
    罗茨气体流量计的结构设计
  • 原文地址:https://www.cnblogs.com/LXP-Never/p/15917498.html