horovod tensorflow 分布式多gpu

概念
rank is your index within the entire ring, local_rank is your index within your node. For example, you have 4 nodes and 4 GPUs each node, so you spawn 16 workers. Every worker will have a rank [0, 15], and every worker will have a local_rank [0, 3]. You use local_rank for GPU pinning because there’s typically one GPU available on the node per process. It wouldn’t make sense to use rank here because rank could be 10, but you only have 4 GPUs so there is no GPU 10.

# 在其他import前引入
try:
    import horovod.tensorflow as hvd
    hvd.init()
except Exception as e:
    hvd = None
    print('no horovod')

# 打印信息
if hvd:
    tf.logging.info('Total workers: {}, local workers: {}'.format(
        hvd.size(), hvd.local_size()))
    tf.logging.info('Global rank: {}, local rank: {}'.format(
        hvd.rank(), hvd.local_rank()))

# 数据集读取配置:对数据集进行分片, 不同进程读取不同子集。
d = tf.data.TFRecordDataset(input_file)
if is_training:
    if hvd is not None:
        d = d.shard(hvd.size(), hvd.rank())
    d = d.shuffle(buffer_size=100)
    d = d.repeat()

# 加载权重配置:只对第一个rank载入权重
if init_checkpoint and is_training and (hvd is None or hvd.rank()==0):
    for init_file in init_checkpoint.split(","):
        assignment_map, tmp_init_map = get_assignment_map_from_checkpoint(tvars, init_file, extra_load_var)
        tf.train.init_from_checkpoint(init_file, assignment_map)
        initialized_variable_names.update(tmp_init_map)

# 学习率调整:
if hvd:
    learning_rate = learning_rate * hvd.size()

# 分布式优化器配置:使用 ring-allreduce 平均梯度
if hvd is not None:
    # we enable compression only for fp16
    from horovod.tensorflow.compression import Compression
    if use_fp16:
        compression = Compression.fp16
    else:
        compression = Compression.none

    optimizer = hvd.DistributedOptimizer(optimizer, sparse_as_dense=True,
                                         compression=compression)

# 配置每个进程模型迭代次数
if FLAGS.do_train:
    # train_examples = processor.get_train_examples(FLAGS.data_dir, FLAGS.img_dir)
    num_train_steps = int(
        train_num / FLAGS.train_batch_size * FLAGS.num_train_epochs)
        # len(train_examples) / FLAGS.train_batch_size * FLAGS.num_train_epochs)
    num_warmup_steps = int(num_train_steps * FLAGS.warmup_proportion)

    if hvd:
        num_train_steps = num_train_steps // hvd.size()

model_fn = model_fn_builder(
    bert_config=bert_config,
    num_labels=len(label_list),
    init_checkpoint=FLAGS.init_checkpoint,
    learning_rate=FLAGS.learning_rate,
    num_train_steps=num_train_steps,
    num_warmup_steps=num_warmup_steps)

# GPU config GPU配置:使用local rank分配当前机器上当前进程可视gpu
run_config = tf.ConfigProto()
# train_params.get('gpu_allow_growth', False)
run_config.gpu_options.allow_growth = True
run_config.allow_soft_placement = True

if hvd:
    run_config.gpu_options.visible_device_list = str(hvd.local_rank())

if FLAGS.use_xla:
    run_config.graph_options.optimizer_options.global_jit_level = tf.OptimizerOptions.ON_1

# checkpoint配置:只对第一个保存模型
save_checkpoints_steps = FLAGS.save_checkpoints_steps if hvd is None or hvd.rank() == 0 else None
estimator = tf.estimator.Estimator(
    model_fn=model_fn,
    model_dir=FLAGS.output_dir,
    config=tf.estimator.RunConfig(
        save_checkpoints_steps=save_checkpoints_steps,
        save_checkpoints_secs=None,
        keep_checkpoint_every_n_hours=2,
        log_step_count_steps=400,
        session_config=run_config))


# 模型训练hook配置:将变量从第一个流程向其他流程传播,以实现一致性初始化。
if FLAGS.do_train and hvd is not None:
    training_hook = [hvd.BroadcastGlobalVariablesHook(0)]
else:
    training_hook = []
estimator.train(input_fn=train_input_fn, max_steps=num_train_steps,
                hooks=training_hook)
发布了557 篇原创文章 · 获赞 500 · 访问量 153万+

猜你喜欢

转载自blog.csdn.net/qq_16234613/article/details/96186398