本文(英版)档介绍了Ray中结合TensorFlow使用的最佳实践。
要查看使用TensorFlow的更多相关示例,请查看A3C,ResNet,Policy Gradients,LBFGS
如果在分布式环境中训练深层网络,则可能需要在进程(或计算机)之间运送深层网络。 例如,你可能在一台计算机上更新模型,然后使用该模型在另一台计算机上计算梯度。 但是,运输模型并不总是顺利的。
例如,直接尝试序列化TensorFlow图计算会产生不同的结果。 一些示例失败,一些成功(但产生非常大的字符串)。
此外,创建TensorFlow计算图可能需要几十秒,因此序列化图并在另一个进程中重新创建它将是低效的。 更好的解决方案是在开始时在每个worker创建相同的TensorFlow图,然后仅在worker之间传递权重。
假设我们有一个简单的网络定义(这个定义是从TensorFlow文档中修改的)。
import tensorflow as tf
import numpy as np
x_data = tf.placeholder(tf.float32, shape=[100])
y_data = tf.placeholder(tf.float32, shape=[100])
w = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
b = tf.Variable(tf.zeros([1]))
y = w * x_data + b
loss = tf.reduce_mean(tf.square(y - y_data))
# 参考API,https://blog.csdn.net/xierhacker/article/details/53174558
optimizer = tf.train.GradientDescentOptimizer(0.5)
grads = optimizer.compute_gradients(loss)
train = optimizer.apply_gradients(grads)
init = tf.global_variables_initializer()
sess = tf.Session()
要提取权重并设置权重,可以使用以下辅助方法。
import ray
variables = ray.experimental.TensorFlowVariables(loss, sess)
TensorFlowVariables对象提供了获取和设置权重以及收集模型中所有变量的方法。
现在我们可以使用这些方法来提取权重,并将它们设置回网络中,如下所示。
# First initialize the weights.
sess.run(init)
# Get the weights
weights = variables.get_weights() # Returns a dictionary of numpy arrays
# Set the weights
variables.set_weights(weights)
注意:如果我们使用如下所示的assign方法设置权重,则每次调用assign都会向图中添加一个节点,并且图形会随着时间的推移而变得难以管理。
w.assign(np.zeros(1)) # This adds a node to the graph every time you call it.
b.assign(np.zeros(1)) # This adds a node to the graph every time you call it.
权值平均的完整示例
综上所述,我们首先将计算图形嵌入actor中。 在actor中,我们将使用TensorFlowVariables类的get_weights和set_weights方法。 然后,我们将使用这些方法在进程之间传送权重(作为映射到numpy数组的变量名称的字典),而不传送实际的TensorFlow图,这些图是更复杂的Python对象。
import tensorflow as tf
import numpy as np
import ray
ray.init()
BATCH_SIZE = 100
NUM_BATCHES = 1
NUM_ITERS = 201
class Network(object):
def __init__(self, x, y):
# Seed TensorFlow to make the script deterministic.
tf.set_random_seed(0)
# Define the inputs.
self.x_data = tf.constant(x, dtype=tf.float32)
self.y_data = tf.constant(y, dtype=tf.float32)
# Define the weights and computation.
w = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
b = tf.Variable(tf.zeros([1]))
y = w * self.x_data + b
# Define the loss.
self.loss = tf.reduce_mean(tf.square(y - self.y_data))
optimizer = tf.train.GradientDescentOptimizer(0.5)
self.grads = optimizer.compute_gradients(self.loss)
self.train = optimizer.apply_gradients(self.grads)
# Define the weight initializer and session.
init = tf.global_variables_initializer()
self.sess = tf.Session()
# Additional code for setting and getting the weights
self.variables = ray.experimental.TensorFlowVariables(self.loss, self.sess)
# Return all of the data needed to use the network.
self.sess.run(init)
# Define a remote function that trains the network for one step and returns the
# new weights.
def step(self, weights):
# Set the weights in the network.
self.variables.set_weights(weights)
# Do one step of training.
self.sess.run(self.train)
# Return the new weights.
return self.variables.get_weights()
def get_weights(self):
return self.variables.get_weights()
# Define a remote function for generating fake data.
@ray.remote(num_return_vals=2)
def generate_fake_x_y_data(num_data, seed=0):
# Seed numpy to make the script deterministic.
np.random.seed(seed)
x = np.random.rand(num_data)
y = x * 0.1 + 0.3
return x, y
# Generate some training data.
batch_ids = [generate_fake_x_y_data.remote(BATCH_SIZE, seed=i) for i in range(NUM_BATCHES)]
x_ids = [x_id for x_id, y_id in batch_ids]
y_ids = [y_id for x_id, y_id in batch_ids]
# Generate some test data.
x_test, y_test = ray.get(generate_fake_x_y_data.remote(BATCH_SIZE, seed=NUM_BATCHES))
# Create actors to store the networks.
remote_network = ray.remote(Network)
actor_list = [remote_network.remote(x_ids[i], y_ids[i]) for i in range(NUM_BATCHES)]
# Get initial weights of some actor.
weights = ray.get(actor_list[0].get_weights.remote())
# Do some steps of training.
for iteration in range(NUM_ITERS):
# Put the weights in the object store. This is optional. We could instead pass
# the variable weights directly into step.remote, in which case it would be
# placed in the object store under the hood. However, in that case multiple
# copies of the weights would be put in the object store, so this approach is
# more efficient.
weights_id = ray.put(weights)
# Call the remote function multiple times in parallel.
new_weights_ids = [actor.step.remote(weights_id) for actor in actor_list]
# Get all of the weights.
new_weights_list = ray.get(new_weights_ids)
# Add up all the different weights. Each element of new_weights_list is a dict
# of weights, and we want to add up these dicts component wise using the keys
# of the first dict.
weights = {variable: sum(weight_dict[variable] for weight_dict in new_weights_list) / NUM_BATCHES for variable in new_weights_list[0]}
# Print the current weights. They should converge to roughly to the values 0.1
# and 0.3 used in generate_fake_x_y_data.
if iteration % 20 == 0:
print("Iteration {}: weights are {}".format(iteration, weights))
如何使用Ray和Gradients并行训练
在某些情况下,您可能希望在网络上进行数据并行训练。 我们使用上面的网络来说明如何在Ray中执行此操作。 唯一的区别在于远程功能步骤和驱动程序代码。
在step方法中,我们运行grad操作而不是train操作来获得梯度。 由于Tensorflow将梯度与元组中的变量配对,因此我们提取梯度以避免不必要的计算。
提取数值梯度
可以在远程函数中使用如下代码来计算数值梯度。
x_values = [1] * 100
y_values = [2] * 100
numerical_grads = sess.run([grad[0] for grad in grads], feed_dict={x_data: x_values, y_data: y_values})
使用返回的梯度来训练网络
# We can feed the gradient values in using the associated symbolic gradient
# operation defined in tensorflow.
feed_dict = {grad[0]: numerical_grad for (grad, numerical_grad) in zip(grads, numerical_grads)}
sess.run(train, feed_dict=feed_dict)
然后,可以运行variables.get_weights()以查看网络的更新权重。
供参考,完整代码如下:
import tensorflow as tfimport numpy as npimport ray
ray.init()
BATCH_SIZE = 100NUM_BATCHES = 1NUM_ITERS = 201
class Network(object):
def __init__(self, x, y):
# Seed TensorFlow to make the script deterministic.
tf.set_random_seed(0)
# Define the inputs.
x_data = tf.constant(x, dtype=tf.float32)
y_data = tf.constant(y, dtype=tf.float32)
# Define the weights and computation.
w = tf.Variable(tf.random_uniform([1], -1.0, 1.0))
b = tf.Variable(tf.zeros([1]))
y = w * x_data + b
# Define the loss.
self.loss = tf.reduce_mean(tf.square(y - y_data))
optimizer = tf.train.GradientDescentOptimizer(0.5)
self.grads = optimizer.compute_gradients(self.loss)
self.train = optimizer.apply_gradients(self.grads)
# Define the weight initializer and session.
init = tf.global_variables_initializer()
self.sess = tf.Session()
# Additional code for setting and getting the weights
self.variables = ray.experimental.TensorFlowVariables(self.loss, self.sess)
# Return all of the data needed to use the network.
self.sess.run(init)
# Define a remote function that trains the network for one step and returns the
# new weights.
def step(self, weights):
# Set the weights in the network.
self.variables.set_weights(weights)
# Do one step of training. We only need the actual gradients so we filter over the list.
actual_grads = self.sess.run([grad[0] for grad in self.grads])
return actual_grads
def get_weights(self):
return self.variables.get_weights()
# Define a remote function for generating fake [email protected](num_return_vals=2)def generate_fake_x_y_data(num_data, seed=0):
# Seed numpy to make the script deterministic.
np.random.seed(seed)
x = np.random.rand(num_data)
y = x * 0.1 + 0.3
return x, y
# Generate some training data.batch_ids = [generate_fake_x_y_data.remote(BATCH_SIZE, seed=i) for i in range(NUM_BATCHES)]x_ids = [x_id for x_id, y_id in batch_ids]y_ids = [y_id for x_id, y_id in batch_ids]# Generate some test data.x_test, y_test = ray.get(generate_fake_x_y_data.remote(BATCH_SIZE, seed=NUM_BATCHES))
# Create actors to store the networks.remote_network = ray.remote(Network)actor_list = [remote_network.remote(x_ids[i], y_ids[i]) for i in range(NUM_BATCHES)]local_network = Network(x_test, y_test)
# Get initial weights of local network.weights = local_network.get_weights()
# Do some steps of training.for iteration in range(NUM_ITERS):
# Put the weights in the object store. This is optional. We could instead pass
# the variable weights directly into step.remote, in which case it would be
# placed in the object store under the hood. However, in that case multiple
# copies of the weights would be put in the object store, so this approach is
# more efficient.
weights_id = ray.put(weights)
# Call the remote function multiple times in parallel.
gradients_ids = [actor.step.remote(weights_id) for actor in actor_list]
# Get all of the weights.
gradients_list = ray.get(gradients_ids)
# Take the mean of the different gradients. Each element of gradients_list is a list
# of gradients, and we want to take the mean of each one.
mean_grads = [sum([gradients[i] for gradients in gradients_list]) / len(gradients_list) for i in range(len(gradients_list[0]))]
feed_dict = {grad[0]: mean_grad for (grad, mean_grad) in zip(local_network.grads, mean_grads)}
local_network.sess.run(local_network.train, feed_dict=feed_dict)
weights = local_network.get_weights()
# Print the current weights. They should converge to roughly to the values 0.1
# and 0.3 used in generate_fake_x_y_data.
if iteration % 20 == 0:
print("Iteration {}: weights are {}".format(iteration, weights))
故障排除
请注意,TensorFlowVariables使用变量名来确定调用set_weights时要设置的变量。 当在同一TensorFlow图中定义两个网络时,会出现一个常见问题。 在这种情况下,TensorFlow将下划线和整数附加到变量名称以消除它们的歧义。 这将导致TensorFlowVariables失败。 例如,如果我们定义一个带有TensorFlowVariables实例的网络:
import rayimport tensorflow as tf
class Network(object):
def __init__(self):
a = tf.Variable(1)
b = tf.Variable(1)
c = tf.add(a, b)
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
self.variables = ray.experimental.TensorFlowVariables(c, sess)
def set_weights(self, weights):
self.variables.set_weights(weights)
def get_weights(self):
return self.variables.get_weights()
并运行以下代码:
a = Network()
b = Network()
b.set_weights(a.get_weights())
代码会失败。 如果我们在自己的TensorFlow图中定义每个网络,那么它将成功:
with tf.Graph().as_default():
a = Network()
with tf.Graph().as_default():
b = Network()
b.set_weights(a.get_weights())
在包含network的actor之间不会发生此问题,因为每个actor都在其自己的进程中,因此在其自己的图中。 使用set_flat时也不会发生这种情况。
要记住的另一个问题是TensorFlowVariables需要向图计算中添加新操作。 如果关闭图并使其不可变,例如 创建MonitoredTrainingSession初始化将失败。 要解决此问题,只需在关闭图之前创建实例。