接下来由浅入深的讲解tf2.0的自定义求导,如果你是将本文作为工具来查阅,请自行选读;如果你只想看tf2.0的自定义求导,请选读第五部分。
1、梯度计算的基础
1.1 利用定义求解导数
def f(x):
return 3. * x ** 2 + 2. * x - 1
def approximate_derivative(f, x, eps=1e-3): #x指在哪个点的导数
return (f(x + eps) - f(x - eps)) / (2. * eps)
print(approximate_derivative(f, 1.))
1.2 求解偏导数
def approximate_derivative(f, x, eps=1e-3): #x指在哪个点的导数
return (f(x + eps) - f(x - eps)) / (2. * eps)
def g(x1, x2):
return (x1 + 5) * (x2 ** 2)
def approximate_gradient(g, x1, x2, eps=1e-3):
dg_x1 = approximate_derivative(lambda x: g(x, x2), x1, eps)#固定x2,求g对x1的偏导
dg_x2 = approximate_derivative(lambda x: g(x1, x), x2, eps)
return dg_x1, dg_x2
print(approximate_gradient(g, 2., 3.))
2、 tensorflow中导数求解
x1 = tf.Variable(2.0)
x2 = tf.Variable(3.0)
with tf.GradientTape() as tape:
z = g(x1, x2)
dz_x1 = tape.gradient(z, x1)
print(dz_x1)
#在tensorflow的实现里边tape只能用1次
try:
dz_x2 = tape.gradient(z, x2)
except RuntimeError as ex:
print(ex)
note:在tf的导数求解中,tf.GradientTape是一个很重要的方法。但是GradientTape.gradient只能被使用一次,解决办法是:设置persistent = True,且最后要释放资源,del tape,如下所示:
x1 = tf.Variable(2.0)
x2 = tf.Variable(3.0)
with tf.GradientTape(persistent = True) as tape:
z = g(x1, x2)
dz_x1 = tape.gradient(z, x1)
dz_x2 = tape.gradient(z, x2)
print(dz_x1, dz_x2)
del tape #注意,要自己释放资源,因为已经将persistent设置为True
2.1 同时求解z对x1,x2的偏导
#同时把z关于x1和x2的偏导求出来
x1 = tf.Variable(2.0)
x2 = tf.Variable(3.0)
with tf.GradientTape() as tape:
z = g(x1, x2)
dz_x1x2 = tape.gradient(z, [x1, x2]) #结果为一个列表
print(dz_x1x2)
2.2 z1,z2同时对x求导
#两个目标函数,对一个变量求导数
x = tf.Variable(5.0)
with tf.GradientTape() as tape:
z1 = 3 * x
z2 = x ** 2
tape.gradient([z1, z2], x) #结果是二者导数之和
2.3 常量求偏导
#常量求偏导
x1 = tf.constant(2.0)
x2 = tf.constant(3.0)
with tf.GradientTape() as tape:
z = g(x1, x2)
dz_x1x2 = tape.gradient(z, [x1, x2])
print(dz_x1x2)
输出:
[None, None]
因此需要做以下修改:
x1 = tf.constant(2.0)
x2 = tf.constant(3.0)
with tf.GradientTape() as tape:
tape.watch(x1)
tape.watch(x2)
z = g(x1, x2)
dz_x1x2 = tape.gradient(z, [x1, x2])
print(dz_x1x2)
2.4 二阶导数求解
#求二阶导数,使用嵌套tape实现
x1 = tf.Variable(2.0)
x2 = tf.Variable(3.0)
with tf.GradientTape(persistent=True) as outer_tape:
with tf.GradientTape(persistent=True) as inner_tape:
z = g(x1, x2)
inner_grads = inner_tape.gradient(z, [x1, x2]) #求x1和x2的偏导
outer_grads = [outer_tape.gradient(inner_grad, [x1, x2])
for inner_grad in inner_grads]
print(outer_grads)
del inner_tape
del outer_tape
3、梯度下降的模拟
#梯度下降的模拟
learning_rate = 0.1
x = tf.Variable(0.0)
for _ in range(100):
with tf.GradientTape() as tape:
z = f(x)
dz_dx = tape.gradient(z, x) #求导数
x.assign_sub(learning_rate * dz_dx) #更新x
print(x)
4、与optimizer结合使用
#与optimizer结合使用
learning_rate = 0.1
x = tf.Variable(0.0)
optimizer = keras.optimizers.SGD(lr = learning_rate)
for _ in range(100):
with tf.GradientTape() as tape:
z = f(x)
dz_dx = tape.gradient(z, x)
optimizer.apply_gradients([(dz_dx, x)]) #梯度在前,变量在后
print(x)
5、tensorflow2.0自定义求导
在fit函数中做的事情: 1. 以batch的形式遍历训练集,统计训练集上的metric (包含自动求导) 2. 每个epoch结束后,在验证集进行验证,并统计验证集上的metric。(因此想要替换求导这部分,需要实现这三部分)
关键代码如下:
epochs = 100
batch_size = 32
steps_per_epoch = len(x_train_scaled) // batch_size #在每个epochs训练steps_per_epoch次batch_size
optimizer = keras.optimizers.SGD()
metric = keras.metrics.MeanSquaredError()
def random_batch(x, y, batch_size=32):
idx = np.random.randint(0, len(x), size=batch_size) #随机取索引
return x[idx], y[idx] #[idx]是一个列表
model = keras.models.Sequential([
keras.layers.Dense(30, activation='relu',
input_shape=x_train.shape[1:]),
keras.layers.Dense(1),
])
#用for循环替换了fit与compile函数
for epoch in range(epochs):
metric.reset_states() #在每次
for step in range(steps_per_epoch):
x_batch, y_batch = random_batch(x_train_scaled, y_train,
batch_size)
with tf.GradientTape() as tape:
y_pred = model(x_batch) #获取数据的预测值,将model作为函数使用,输入x,输出为预测值
y_pred = tf.squeeze(y_pred, 1)
loss = keras.losses.mean_squared_error(y_batch, y_pred)
metric(y_batch, y_pred) #累计计算metric
grads = tape.gradient(loss, model.variables) #目标函数,所有的参数
grads_and_vars = zip(grads, model.variables) #梯度与变量一一绑定
optimizer.apply_gradients(grads_and_vars)
print("\rEpoch", epoch, " train mse:",
metric.result().numpy(), end="") #end="",如果加上的话可以显示出中间每一次遍历epoch的结果
#在每个epoch执行完之后,就可以在验证集上进行验证了
y_valid_pred = model(x_valid_scaled)
y_valid_pred = tf.squeeze(y_valid_pred, 1)
valid_loss = keras.losses.mean_squared_error(y_valid_pred, y_valid)
print("\t", "valid mse: ", valid_loss.numpy())
上面的代码实现核心思想就是用两层for循环替换了fit与compile函数。
附tf2.0自定义求导完整代码:
import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras
print(tf.__version__)
print(sys.version_info)
for module in mpl, np, pd, sklearn, tf, keras:
print(module.__name__, module.__version__)
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()
print(housing.DESCR)
print(housing.data.shape)
print(housing.target.shape)
from sklearn.model_selection import train_test_split
x_train_all, x_test, y_train_all, y_test = train_test_split(
housing.data, housing.target, random_state = 7)
x_train, x_valid, y_train, y_valid = train_test_split(
x_train_all, y_train_all, random_state = 11)
print(x_train.shape, y_train.shape)
print(x_valid.shape, y_valid.shape)
print(x_test.shape, y_test.shape)
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
x_train_scaled = scaler.fit_transform(x_train)
x_valid_scaled = scaler.transform(x_valid)
x_test_scaled = scaler.transform(x_test)
# metric使用
metric = keras.metrics.MeanSquaredError()
print(metric([5.], [2.]))
print(metric([0.], [1.]))
metric([1.], [3.])
print(metric.result())
metric.reset_states() #可以在每个epoch结束时调用reset_states(),统计在这个epoch内训练集上的结果
metric([1.], [3.])
print(metric.result())
#在fit函数中做的事情:
# 1. 以batch的形式遍历训练集, 统计metric
# 1.1 自动求导(想要替换这部分,需要实现这三部分)
# 2. epoch结束 在验证集进行验证 统计验证集上的metric
"""训练集上打印metric是因为我们把训练集分成了好多batch,而验证集上只有一个,
因而验证集上只要打印loss就足够了,如果验证集被分成多个batch,
那么也需要用metrics来计算。"""
epochs = 100
batch_size = 32
steps_per_epoch = len(x_train_scaled) // batch_size #在每个epochs训练steps_per_epoch次batch_size
optimizer = keras.optimizers.SGD()
metric = keras.metrics.MeanSquaredError()
#
def random_batch(x, y, batch_size=32):
idx = np.random.randint(0, len(x), size=batch_size) #随机取索引
return x[idx], y[idx] #[idx]是一个列表
model = keras.models.Sequential([
keras.layers.Dense(30, activation='relu',
input_shape=x_train.shape[1:]),
keras.layers.Dense(1),
])
#用for循环替换了fit与compile函数
for epoch in range(epochs):
metric.reset_states() #在每次
for step in range(steps_per_epoch):
x_batch, y_batch = random_batch(x_train_scaled, y_train,
batch_size)
with tf.GradientTape() as tape:
y_pred = model(x_batch) #获取数据的预测值,将model作为函数使用,输入x,输出为预测值
y_pred = tf.squeeze(y_pred, 1)
loss = keras.losses.mean_squared_error(y_batch, y_pred)
metric(y_batch, y_pred) #累计计算metric
grads = tape.gradient(loss, model.variables) #目标函数,所有的参数
grads_and_vars = zip(grads, model.variables) #梯度与变量一一绑定
optimizer.apply_gradients(grads_and_vars)
print("\rEpoch", epoch, " train mse:",
metric.result().numpy(), end="") #end="",如果加上的话可以显示出中间每一次遍历epoch的结果
#在每个epoch执行完之后,就可以在验证集上进行验证了
y_valid_pred = model(x_valid_scaled)
y_valid_pred = tf.squeeze(y_valid_pred, 1)
valid_loss = keras.losses.mean_squared_error(y_valid_pred, y_valid)
print("\t", "valid mse: ", valid_loss.numpy())