转载自https://www.azraelkuan.me/archives/multiply-gpu-parallel-training-using-tensorflow
本文由 azraelkuan 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
基本简介
深度学习框架一般都支持多GPU并行计算,主要分为数据并行和模型并行.TensorFlow支持的是数据并行.
数据并行的原理:
- CPU负责梯度平均和参数更新
- 在GPU上训练模型的副本
由图中可以看出多GPU并行计算的过程:
- 模型副本定义在GPU上
- 对于每一个GPU, 都是从CPU获得数据,前向传播进行计算,得到loss,并计算出梯度
- CPU接到GPU的梯度,取平均值,然后进行梯度更新
同时在使用GPU的时候,有几个注意点:
- 使用
tf.get_variable
定义变量,这样可以读取已经定义过的变量实现参数共享 - 分别使用
tf.name_scope
和tf.variable_scope
来定义区间
范例说明
下面结合官网上的例子进行说明: GITHUB
分为四个部分:
- 输入数据处理:
cifar10_input.py
- 模型定义,loss计算等:
cifar.py
- 多gpu训练脚本:
cifar10_multi_gpu_train.py
- 评估脚本:
cifar10_eval.py
这里针对多gpu训练脚本做一个详细的说明:
from datetime import datetime
import os.path
import re
import time
import numpy as np
from six.moves import xrange
import tensorflow as tf
import cifar10
FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string('train_dir', './cifar10_train',
"""Directory where to write event logs """
"""and checkpoint.""")
tf.app.flags.DEFINE_integer('max_steps', 1000000,
"""Number of batches to run.""")
tf.app.flags.DEFINE_integer('num_gpus', 1,
"""How many GPUs to use.""")
tf.app.flags.DEFINE_boolean('log_device_placement', False,
"""Whether to log device placement.""")
def tower_loss(scope, images, labels):
"""
计算当前tower的损失
这里的损失包括最后的损失和weight的L2正则损失 具体可以
:param scope: 当前空间名
:param images: 输入的图像
:param labels: 图像的label
:return: 总的loss
"""
logits = cifar10.inference(images)
_ = cifar10.loss(logits, labels)
# 获得losses集合中的所有损失
losses = tf.get_collection('losses', scope)
# 将所有损失加和
total_loss = tf.add_n(losses, name='total_loss')
# 将loss记录到summary中
for l in losses + [total_loss]:
loss_name = re.sub('%s_[0-9]*/' % cifar10.TOWER_NAME, '', l.op.name)
tf.summary.scalar(loss_name, l)
return total_loss
def average_gradients(tower_grads):
"""
梯度平均
:param tower_grads: 所有tower的梯度
:return: 每一个变量的平均梯度
"""
average_grads = []
# 枚举所有的变量 计算变量在所有GPU下梯度的平均值
for grad_and_vars in zip(*tower_grads):
grads = []
for g, _ in grad_and_vars:
expanded_g = tf.expand_dims(g, 0)
grads.append(expanded_g)
grad = tf.concat(axis=0, values=grads)
grad = tf.reduce_mean(grad, 0)
v = grad_and_vars[0][2]
grad_and_var = (grad, v)
average_grads.append(grad_and_var)
return average_grads
def train():
# 基本运算都定义在CPU上
with tf.Graph().as_default(), tf.device('/cpu:0'):
# 当前step
global_step = tf.get_variable(
'global_step', [],
initializer=tf.constant_initializer(0), trainable=False)
# 得到训练的batch
num_batches_per_epoch = (cifar10.NUM_EXAMPLES_PER_EPOCH_FOR_TRAIN /
FLAGS.batch_size)
# 定义学习率衰减的步数
decay_steps = int(num_batches_per_epoch * cifar10.NUM_EPOCHS_PER_DECAY)
# 定义学习率 指数衰减
lr = tf.train.exponential_decay(cifar10.INITIAL_LEARNING_RATE,
global_step,
decay_steps,
cifar10.LEARNING_RATE_DECAY_FACTOR,
staircase=True)
# 优化器
opt = tf.train.GradientDescentOptimizer(lr)
# 建立数据队列
images, labels = cifar10.distorted_inputs()
batch_queue = tf.contrib.slim.prefetch_queue.prefetch_queue(
[images, labels], capacity=2 * FLAGS.num_gpus)
tower_grads = []
with tf.variable_scope(tf.get_variable_scope()):
for i in xrange(FLAGS.num_gpus):
with tf.device('/gpu:%d' % i):
with tf.name_scope('%s_%d' % (cifar10.TOWER_NAME, i)) as scope:
image_batch, label_batch = batch_queue.dequeue()
loss = tower_loss(scope, image_batch, label_batch)
# 在第一次声明变量之后,将控制变量重用的参数设置为True。这样可以
# 让不同的GPU更新同一组参数。注意tf.name_scope函数并不会影响
# tf.get_ variable的命名空间。
tf.get_variable_scope().reuse_variables()
summaries = tf.get_collection(tf.GraphKeys.SUMMARIES, scope)
# 计算梯度
grads = opt.compute_gradients(loss)
tower_grads.append(grads)
# 计算平均梯度
grads = average_gradients(tower_grads)
summaries.append(tf.summary.scalar('learning_rate', lr))
for grad, var in grads:
if grad is not None:
summaries.append(tf.summary.histogram(var.op.name + '/gradients', grad))
# 更新梯度
# 注意这里的梯度是多个batch的平均 因此下面在计算时间,速度均要考虑gpu个数
apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)
for var in tf.trainable_variables():
summaries.append(tf.summary.histogram(var.op.name, var))
# 计算变量的滑动平均值
variable_averages = tf.train.ExponentialMovingAverage(
cifar10.MOVING_AVERAGE_DECAY, global_step)
variables_averages_op = variable_averages.apply(tf.trainable_variables())
# 每一轮迭代需要更新变量的取值并更新变量的滑动平均值
train_op = tf.group(apply_gradient_op, variables_averages_op)
saver = tf.train.Saver(tf.global_variables())
summary_op = tf.summary.merge(summaries)
init = tf.global_variables_initializer()
sess = tf.Session(config=tf.ConfigProto(
allow_soft_placement=True,
gpu_options=tf.GPUOptions(allow_growth=True),
log_device_placement=FLAGS.log_device_placement))
sess.run(init)
tf.train.start_queue_runners(sess=sess)
summary_writer = tf.summary.FileWriter(FLAGS.train_dir, sess.graph)
for step in xrange(FLAGS.max_steps):
start_time = time.time()
_, loss_value = sess.run([train_op, loss])
duration = time.time() - start_time
assert not np.isnan(loss_value), 'Model diverged with loss = NaN'
if step % 10 == 0:
# 这里统计计算速度 都要考虑到gpu的个数
# batch 应该是基本*gpu_num
num_examples_per_step = FLAGS.batch_size * FLAGS.num_gpus
# 每一个epoch的耗时也应该除以gpu个数
examples_per_sec = num_examples_per_step / duration
# 单个epoch的时间应该是总时间除gpu个数
sec_per_batch = duration / FLAGS.num_gpus
format_str = ('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f '
'sec/batch)')
print(format_str % (datetime.now(), step, loss_value,
examples_per_sec, sec_per_batch))
if step % 100 == 0:
summary_str = sess.run(summary_op)
summary_writer.add_summary(summary_str, step)
if step % 1000 == 0 or (step + 1) == FLAGS.max_steps:
checkpoint_path = os.path.join(FLAGS.train_dir, 'model.ckpt')
saver.save(sess, checkpoint_path, global_step=step)
def main(argv=None):
cifar10.maybe_download_and_extract()
if tf.gfile.Exists(FLAGS.train_dir):
tf.gfile.DeleteRecursively(FLAGS.train_dir)
tf.gfile.MakeDirs(FLAGS.train_dir)
train()
if __name__ == '__main__':
tf.app.run()