Tensorflow 2.0 Common Modules

Saving and Loading Variables

Checkpoint 只保存模型的参数,不保存模型的计算过程,因此一般用于在具有模型源代码的时候恢复之前训练好的模型参数。

TensorFlow 提供了tf.train.Checkpoint用于变量保存与恢复的类,有save()restore()方法将TensorFlow中所有包含Checkpointable State的对象进行保存和恢复。具体而言:

  • tf.keras.optimizer
  • tf.Variable
  • f.keras.Layer
  • tf.keras.Model

上面类的实例都可以被保存。使用方法如下:

checkpoint = tf.train.Checkpoint(model=model)
# 例子如下:
checkpoint = tf.train.Checkpoint(myAwesomeModel=model, myAwesomeOptimizer=optimizer)

# 保存checkpoint
checkpoint.save(save_path_with_prefix) # save_path_with_prefix 是保存文件的目录 + 前缀

# 恢复模型参数
model_to_be_restored = MyModel()                                        # 待恢复参数的同一模型
checkpoint = tf.train.Checkpoint(myAwesomeModel=model_to_be_restored)   # 键名保持为“myAwesomeModel”
checkpoint.restore(save_path_with_prefix_and_index) # 注意需要有序号

上面的函数接受的参数是**kwargs,即是一系列的键值对,键名可以随意取,值为需要保存的对象。注意,在恢复变量的时候,我们需要对应的键名。

典型的回复与保存变量的代码框架如下:

# train.py 模型训练阶段
model = MyModel()
# 实例化Checkpoint,指定保存对象为model(如果需要保存Optimizer的参数也可加入)
checkpoint = tf.train.Checkpoint(myModel=model)
# ...(模型训练代码)
# 模型训练完毕后将参数保存到文件(也可以在模型训练过程中每隔一段时间就保存一次)
checkpoint.save('./save/model.ckpt')

# test.py 模型使用阶段
model = MyModel()
checkpoint = tf.train.Checkpoint(myModel=model)             # 实例化Checkpoint,指定恢复对象为model
checkpoint.restore(tf.train.latest_checkpoint('./save'))    # 从文件恢复模型参数,后面可以添加模型使用代码

某些情况下我们想仅仅保存最新的k个训练模型参数,可以使用tf.train.CheckpointManager, 管理Checkpoint,只保存最新的k个模型参数

manager = tf.train.CheckpointManager(checkpoint, directory='./save', checkpoint_name='model.ckpt', max_to_keep=5)

详见Checkpoint Variables

Tensorboard

默认情况下,TensorBoard 每 30 秒更新一次数据。

详见Code

  1. listen logdir

    tensorboard --logdir logs
    

    then open URL: http://localhost:6006

  2. build summary instance

    current_time = datetime.datetime.now().strftime(r"%Y%m%d-%H%M%S")
    log_dir = 'logs/' + current_time
    summary_writer = tf.summary.create_file_writer(log_dir)
    
  3. fed data into summary instance

    • fed scalar

      with summary_writer.as_default():
          tf.summary.scalar('loss', float(loss), step=epoch)
          tf.summary.scalar('accuracy'), float(train_accuracy), step=epoch)
      
    • fed single image

      # get x from (x,y)
      sample_img   next(iter(db))[0]
      # get first image instance
      sample_img = sample_img[0]
      sample_img = tf.reshape(sample_img, [1,28,28,1])
      with summary_writer.as_default():
          tf.summary.image('Training Sample:', sample_img, step=0)
      
    • fed multi-images

      val_images = x[:25]
      val_images = tf.reshape(val_images, [-1, 28, 28, 1])
      with summary_writer.as_default():
          tf.summary.scalar('test-acc', float(loss), step=step)
          tf.summary.image('val-onebyone-images:', val_images, max_outputs=25)
      
  4. 辅助函数,将多个image合并为一张png图片保存:

def plot_to_image(figure):

        """Converts the matplotlib plot specified by 'figure' to a PNG image and
        returns it. The supplied figure is closed and inaccessible after this call.
        """
        # Save the plot to a PNG in memory.
        buf = io.BytesIO()
        plt.savefig(buf, format='png')
        # Closing the figure prevents it from being displayed directly inside
        # the notebook.
        plt.close(figure)
        buf.seek(0)
        # Convert PNG buffer to TF image
        image = tf.image.decode_png(buf.getvalue(), channels=4)
        # Add the batch dimension
        image = tf.expand_dims(image, 0)
        return image


def image_grid(images):

        """
        Return a 5x5 grid of the MNIST images as a matplotlib figure.
        """

        # Create a figure to contain the plot.
        figure = plt.figure(figsize=(10, 10))
        for i in range(25):
            # Start next subplot.
            plt.subplot(5, 5, i + 1, title='name')
            plt.xticks([])
            plt.yticks([])
            plt.grid(False)
            plt.imshow(images[i], cmap=plt.cm.binary)

        return figure

Graph and Profile

在训练时使用tf.summary.trace_on开启Trace,此时TensorFlow会将训练时的大量信息(如计算图的结构,每个操作所耗费的时间等)记录下来。在训练完成后,使用tf.summary.trace_export将记录结果输出到文件。

tf.summary.trace_on(graph=True, profiler=True) # 开启Trace,可以记录图解钩和profile信息

# 进行训练

with summary_writer.as_default():
    tf.summary.trace_export(name='model_trace', step=0, profiler_outdir=log_dir) # 保存Trace信息到文件

详见Graph and Profile

Graph Execution模式

在tf2.0中使用@tf.function实现Graph Execution,从而将模型转换为易于部署且高性能的Tensorflow模型,速度会非常快。

注意:

并不是任何函数都可以被@tf.function修饰!, @tf.function使用静态编译将函数内的代码转换成计算图,因此对函数内可使用的语句有一定限制(仅支持 Python 语言的一个子集),且需要函数内的操作本身能够被构建为计算图。

建议在函数内只使用 TensorFlow 的原生操作,不要使用过于复杂的 Python 语句,函数参数只包括 TensorFlow 张量或 NumPy 数组,并最好是能够按照计算图的思想去构建函数(换言之,@tf.function 只是给了你一种更方便的写计算图的方法,而不是一颗能给任何函数加速的银子弹。

一般而言,当模型由较多小的操作组成的时候,@tf.function带来的提升效果较大。而当模型的操作数量较少,但单一操作均很耗时的时候,则@tf.function带来的性能提升不会太大。

详见tf.function内在机制

详见Code

GPU Usage and Configuration

指定当前程序使用的GPU

获取当前主机上的设备列表

import os
import tensorflow as tf

gpus = tf.config.experimental.list_physical_devices(device_type='GPU')
cpus = tf.config.experimental.list_physical_devices(device_type='CPU')
print(gpus, cpus)

# 设置当前脚本可见的设备
tf.config.experimental.set_visible_devices(device_type='GPU', devices=gpus[0:2])

# 可以通过环境变量的方式设置可见的设备
# export CUDA_VISIBLE_DEVICES = 0,1
os.environ['CUDA_VISIBLE_DEVICES'] = "0,1"

设置显存使用策略

默认情况下,TF使用几乎所有可用的显存,以避免内存碎片化所带来的性能损失。TF提供了两种定制化的显存使用策略:

  • 仅仅在需要时申请显存空间
  • 固定消耗的显存大小,程序超出会报错
# 设置显存使用策略
# 方式1:动态申请显存
for gpu in gpus:
    tf.config.experimental.set_memory_growth(device=gpu, enable=True)

# 方式2:固定显存大小,超出将会报错
# 可以理解为建立了一个显存大小为1GB的虚拟GPU
for gpu in gpus:
    tf.config.experimental.set_virtual_device_configuration(gpu, [
        tf.config.experimental.VirtualDeviceConfiguration(memory_limit=1024)])

单GPU模拟多GPU环境

# 在实体GPU2号上面建立4个显存均为2GB的虚拟GPU
tf.config.experimental.set_virtual_device_configuration(gpus[2], [
    tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
    tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
    tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048),
    tf.config.experimental.VirtualDeviceConfiguration(memory_limit=2048)])

logical_gpus = tf.config.experimental.list_logical_devices(device_type='GPU')
print(len(logical_gpus))

Tensorflow Distributed Training

单机多卡训练: MirroredStrategy

tf.distribute.MirroredStrategy是一种简单且高性能的,数据并行的同步式分布式策略,主要支持多个 GPU 在同一台主机上训练。

使用方法:

strategy = tf.distribute.MirroredStrategy(devices=["/gpu:0", "/gpu:1"]) # 指定使用gpu0,gpu1参与分布式策略

# 将模型构建的代码放入下方scope:
with strategy.scope():
    model = tf.keras.applications.MobileNetV2()
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=learning_rate),
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        metrics=[tf.keras.metrics.sparse_categorical_accuracy]
    )
# 训练模型
model.fit(dataset, num_epochs)

详见Code

MirroredStrategy过程

MirroredStrategy的步骤如下:

  • 训练开始前,该策略在所有 N 个计算设备上均各复制一份完整的模型
  • 每次训练传入一个批次的数据时,将数据分成 N 份,分别传入 N 个计算设备(即数据并行)
  • N 个计算设备使用本地变量(镜像变量)分别计算自己所获得的部分数据的梯度
  • 使用分布式计算的All-reduce操作,在计算设备间高效交换梯度数据并进行求和,使得最终每个设备都有了所有设备的梯度之和
  • 使用梯度求和的结果更新本地变量(镜像变量)
  • 当所有设备均更新本地变量后,进行下一轮训练(即该并行策略是同步的)
  • 默认情况下,TensorFlow中的MirroredStrategy策略使用NVIDIA NCCL进行All-reduce操作。

多机多卡训练: MultiWorkerMirroredStrategy

由于涉及到多台计算机之间的通讯,需要设置环境变量:

os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["localhost:20000", "localhost:20001"]
    },
    'task': {'type': 'worker', 'index': 0}
})

TF_CONFIGclustertask两部分组成:

  • cluster说明了整个多机集群的结构和每台机器的网络地址(IP + 端口号)。对于每一台机器,cluster的值都是相同的
  • task说明了当前机器的角色。例如,{'type': 'worker', 'index': 0}说明当前机器是cluster中的第0个worker(即 localhost:20000)。每一台机器的task值都需要针对当前主机进行分别的设置。

以上内容设置完成后,在所有的机器上逐个运行训练代码即可。先运行的代码在尚未与其他主机连接时会进入监听状态,待整个集群的连接建立完毕后,所有的机器即会同时开始训练。

注意:

  • 在各台机器上均需要打开防火墙的设置,尤其是需要开放与其他主机通信的端口。如: 上例的 0 号 worker 需要开放 20000 端口,1 号 worker 需要开放 20001 端口。
  • 对于Dataset来说,需要设置repeat()

详见Code

Reference

  1. TensorFlow-2.x-Tutorials
  2. 简单粗暴Tensorflow 2.0

Note: Cover Picture