import tensorflow as tf
import ops
import utils
class Generator:
#定义实例(Generator)的私有属性---外部无法访问,实例内可以传递
def __init__(self, name, is_training, ngf=64, norm='instance', image_size=128):
self.name = name #卷积层名字
self.reuse = False #令tf.variable_scope直接创建新的变量 true:直接获取已创建的变量
self.ngf = ngf #滤波器数量:32
self.norm = norm #规范化
self.is_training = is_training #是否训练中
self.image_size = image_size #输出图片大小
#编码(3层卷积)---转换(残差网络)---解码(2层反卷积、1层卷积:输出层)
def __call__(self, input):
#上下文变量管理
with tf.variable_scope(self.name):
#卷积1,7*7 filters:32 stride:1 pad:3 relu
c7s1_32 = ops.c7s1_k(input, self.ngf, is_training=self.is_training, norm=self.norm,
reuse=self.reuse, name='c7s1_32') # (?, w, h, 32)
#卷积2,3*3 filters:64 stride:2 relu
d64 = ops.dk(c7s1_32, 2*self.ngf, is_training=self.is_training, norm=self.norm,
reuse=self.reuse, name='d64') # (?, w/2, h/2, 64)
#卷积3,3*3 filters:128 stride:2 relu
d128 = ops.dk(d64, 4*self.ngf, is_training=self.is_training, norm=self.norm,
reuse=self.reuse, name='d128') # (?, w/4, h/4, 128)
#若是输出的图片小于128,使用6层的残差网络
if self.image_size <= 128:
res_output = ops.n_res_blocks(d128, reuse=self.reuse, n=6) # (?, w/4, h/4, 128)
#大于128,则使用9层的残差网络
else:
res_output = ops.n_res_blocks(d128, reuse=self.reuse, n=9) # (?, w/4, h/4, 128)
#反卷积,3*3 filters:64 stride:2 relu
u64 = ops.uk(res_output, 2*self.ngf, is_training=self.is_training, norm=self.norm,
reuse=self.reuse, name='u64') # (?, w/2, h/2, 64)
#反卷积,3*3 filters:32 stride:2 relu
u32 = ops.uk(u64, self.ngf, is_training=self.is_training, norm=self.norm,
reuse=self.reuse, name='u32', output_size=self.image_size) # (?, w, h, 32)
#输出层,7*7 激活函数:tanh stride:1 pad:3
output = ops.c7s1_k(u32, 3, norm=None,
activation='tanh', reuse=self.reuse, name='output') # (?, w, h, 3)
self.reuse = True
#获取优化器训练的变量参数
self.variables = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=self.name)
return output
def sample(self, input):
#把浮点型张量转换为int型 [0-1]转为[0-255]
image = utils.batch_convert2int(self.__call__(input))
#tf.image.encode_jpeg:图片解码---即把图片还原成三维矩阵的形式
image = tf.image.encode_jpeg(tf.squeeze(image, [0])) #tf.squeeze():用于压缩张量中为1的轴---除去张量中形状为1的轴
return image
import tensorflow as tf
import ops
class Discriminator:
#定义实例(Discriminator)的私有属性---外部无法访问,实例内可以传递
def __init__(self, name, is_training, norm='instance', use_sigmoid=False):
self.name = name #卷积层名字
self.is_training = is_training #是否训练中
self.norm = norm #规范化
self.reuse = False #令tf.variable_scope直接创建新的变量 true:直接获取已创建的变量
self.use_sigmoid = use_sigmoid #使用sigmoid阶跃函数
def __call__(self, input):
#调用ops.py的CK()函数,对图像做4层卷积
with tf.variable_scope(self.name):
C64 = ops.Ck(input, 64, reuse=self.reuse, norm=None,
is_training=self.is_training, name='C64') # (?, w/2, h/2, 64)
C128 = ops.Ck(C64, 128, reuse=self.reuse, norm=self.norm,
is_training=self.is_training, name='C128') # (?, w/4, h/4, 128)
C256 = ops.Ck(C128, 256, reuse=self.reuse, norm=self.norm,
is_training=self.is_training, name='C256') # (?, w/8, h/8, 256)
C512 = ops.Ck(C256, 512,reuse=self.reuse, norm=self.norm,
is_training=self.is_training, name='C512') # (?, w/16, h/16, 512)
#最后一层卷积,加一个偏量b,最后使用sigmoid阶跃函数
output = ops.last_conv(C512, reuse=self.reuse,use_sigmoid=self.use_sigmoid, name='output')
self.reuse = True
#获取优化器训练的变量参数
self.variables = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=self.name)
return output
import tensorflow as tf
import ops
import utils
from reader import Reader
from discriminator import Discriminator
from generator import Generator
REAL_LABEL = 0.9
class CycleGAN:
#定义实例内参数
def __init__(self,
X_train_file='',
Y_train_file='',
batch_size=1,
image_size=256,
use_lsgan=True,
norm='instance',
lambda1=10,
lambda2=10,
learning_rate=2e-4,
beta1=0.5,
ngf=64
):
self.lambda1 = lambda1
self.lambda2 = lambda2
self.use_lsgan = use_lsgan #最小二乘法
use_sigmoid = not use_lsgan #sigmoid函数
self.batch_size = batch_size #批次大小
self.image_size = image_size #图片大小
self.learning_rate = learning_rate #学习率
self.beta1 = beta1
self.X_train_file = X_train_file #x域训练文件
self.Y_train_file = Y_train_file #y域训练文件
self.is_training = tf.placeholder_with_default(True, shape=[], name='is_training') #是否训练中---占位符预定义
# G生成器输出--x2y
self.G = Generator('G', self.is_training, ngf=ngf, norm=norm, image_size=image_size)
# D_Y判别器输出---判断x2y是否属于y域
self.D_Y = Discriminator('D_Y',self.is_training, norm=norm, use_sigmoid=use_sigmoid)
# F生成器输出---y2x
self.F = Generator('F', self.is_training, norm=norm, image_size=image_size)
# D_X判别器输出---判断y2x是否属于x域
self.D_X = Discriminator('D_X',self.is_training, norm=norm, use_sigmoid=use_sigmoid)
self.fake_x = tf.placeholder(tf.float32,shape=[batch_size, image_size, image_size, 3])
self.fake_y = tf.placeholder(tf.float32,shape=[batch_size, image_size, image_size, 3])
def model(self):
# 预设x域的图片参数
X_reader = Reader(self.X_train_file, name='X',image_size=self.image_size, batch_size=self.batch_size)
# 预设y域的图片参数
Y_reader = Reader(self.Y_train_file, name='Y',image_size=self.image_size, batch_size=self.batch_size)
# 读取x域图片
x = X_reader.feed()
# 读取y域图片
y = Y_reader.feed()
#调用最下面的函数---循环一致性损失
cycle_loss = self.cycle_consistency_loss(self.G, self.F, x, y)
# 由x生成y
fake_y = self.G(x)
# 这是fake_y的损失,生成y域图片的损失
G_gan_loss = self.generator_loss(self.D_Y, fake_y, use_lsgan=self.use_lsgan)
# G的损失,加上循环一致性损失
G_loss = G_gan_loss + cycle_loss
# 真实样本y的损失
D_Y_loss = self.discriminator_loss(self.D_Y, y, self.fake_y, use_lsgan=self.use_lsgan)
# 由y生成x
fake_x = self.F(y)
# 这是fake_x的损失,生成x域图片的损失
F_gan_loss = self.generator_loss(self.D_X, fake_x, use_lsgan=self.use_lsgan)
# F的损失,加上循环一致性损失
F_loss = F_gan_loss + cycle_loss
# 真实样本x的损失
D_X_loss = self.discriminator_loss(self.D_X, x, self.fake_x, use_lsgan=self.use_lsgan)
# summary---在TensorBoard上描绘
# 以直方图的形式显示.
tf.summary.histogram('D_Y/true', self.D_Y(y))
tf.summary.histogram('D_Y/fake', self.D_Y(self.G(x)))
tf.summary.histogram('D_X/true', self.D_X(x))
tf.summary.histogram('D_X/fake', self.D_X(self.F(y)))
#以标量的形式显示
tf.summary.scalar('loss/G', G_gan_loss)
tf.summary.scalar('loss/D_Y', D_Y_loss)
tf.summary.scalar('loss/F', F_gan_loss)
tf.summary.scalar('loss/D_X', D_X_loss)
tf.summary.scalar('loss/cycle', cycle_loss)
# 以带有图像的形式输出
tf.summary.image('X/generated', utils.batch_convert2int(self.G(x)))
tf.summary.image('X/reconstruction', utils.batch_convert2int(self.F(self.G(x))))
tf.summary.image('Y/generated', utils.batch_convert2int(self.F(y)))
tf.summary.image('Y/reconstruction', utils.batch_convert2int(self.G(self.F(y))))
return G_loss, D_Y_loss, F_loss, D_X_loss, fake_y, fake_x
def optimize(self, G_loss, D_Y_loss, F_loss, D_X_loss):
def make_optimizer(loss, variables, name='Adam'):
global_step = tf.Variable(0, trainable=False) #迭代次数
starter_learning_rate = self.learning_rate #开始学习率
end_learning_rate = 0.0 #最后的学习率---每100000次之后线性衰减到0
start_decay_step = 100000 #开始迭代数
decay_steps = 100000 #衰减速度---在迭代到该次数时学习率衰减为:learning_rate(学习率) * decay_rate(衰减系数)
beta1 = self.beta1
#学习率
learning_rate = (
#tf.where(condition, x=None, y=None, name=None)---用法见tf测试
tf.where(
#返回一个布尔值的张量,例如:a=[true,flash],[true,true]---2*2的布尔值矩阵
tf.greater_equal(global_step, start_decay_step),
#实现指数衰减学习率
tf.train.polynomial_decay(starter_learning_rate, #初始学习率
global_step-start_decay_step, #当前迭代次数-开始迭代数
decay_steps, #衰减速度(在迭代到该次数时学习率衰减为earning_rate * decay_rate)
end_learning_rate, #最终学习率
power=1.0),
#初始学习率
starter_learning_rate
)
)
#以标量形式输出学习率
tf.summary.scalar('learning_rate/{}'.format(name), learning_rate)
learning_step = (
# 选择Adam优化器,让loss最小
tf.train.AdamOptimizer(learning_rate, beta1=beta1, name=name)
.minimize(loss, #最小化的目标变量
global_step=global_step, #梯度下降一次加1,一般用于记录迭代优化的次数
var_list=variables #每次要迭代更新的参数集合
))
return learning_step
#返回以下四个学习次数
G_optimizer = make_optimizer(G_loss, self.G.variables, name='Adam_G')
D_Y_optimizer = make_optimizer(D_Y_loss, self.D_Y.variables, name='Adam_D_Y')
F_optimizer = make_optimizer(F_loss, self.F.variables, name='Adam_F')
D_X_optimizer = make_optimizer(D_X_loss, self.D_X.variables, name='Adam_D_X')
# tf.control_dependencies():返回一个控制依赖的上下文管理器,先执行()内的操作,再执行with:下面的
# 即先求四个学习次数,再执行tf.no_op
with tf.control_dependencies([G_optimizer, D_Y_optimizer, F_optimizer, D_X_optimizer]):
return tf.no_op(name='optimizers') #什么都不做,仅做为点位符使用控制边界。
def discriminator_loss(self, D, y, fake_y, use_lsgan=True):
#使用最小二乘法
if use_lsgan:
#计算y-x是真实样本的均方误差
error_real = tf.reduce_mean(tf.squared_difference(D(y), REAL_LABEL)) #均方误差
#计算x-y-x的均方误差 fake_y:x-y
error_fake = tf.reduce_mean(tf.square(D(fake_y))) #均方误差
else:
# 表达式:-(log(D(y))+log(1-D(G(x))))/2
error_real = -tf.reduce_mean(ops.safe_log(D(y))) #-log(D(y)
error_fake = -tf.reduce_mean(ops.safe_log(1-D(fake_y))) #-log(1-D(G(x)))
loss = (error_real + error_fake) / 2 #-(log(D(y))+log(1-D(G(x))))/2
return loss
def generator_loss(self, D, fake_y, use_lsgan=True):
#是否使用最小二乘法
if use_lsgan:
#tf.reduce_mean:求矩阵均值---求x2y的生成y与真实y的损失
#f.squared_difference:两个矩阵的每个元素相减求平方,形成新矩阵输出
loss = tf.reduce_mean(tf.squared_difference(D(fake_y), REAL_LABEL)) #均方误差
else:
# 表达式:-log(D(G(x)))
loss = -tf.reduce_mean(ops.safe_log(D(fake_y))) / 2
return loss
def cycle_consistency_loss(self, G, F, x, y):
# x -> G(x) -> F(G(x)) ≈ x, 这个过程的损失
forward_loss = tf.reduce_mean(tf.abs(F(G(x))-x))
# y -> F(y) -> G(F(y)) ≈ y, 这个过程的损失
backward_loss = tf.reduce_mean(tf.abs(G(F(y))-y))
#循环的总体损失
loss = self.lambda1*forward_loss + self.lambda2*backward_loss
return loss
import tensorflow as tf
import utils
class Reader():
def __init__(self, tfrecords_file, image_size=256,min_queue_examples=1000, batch_size=1, num_threads=8, name=''):
"""
Args:
tfrecords_file: string, tfrecords file path
min_queue_examples: integer, minimum number of samples to retain in the queue that provides of batches of examples
batch_size: integer, number of images per batch
num_threads: integer, number of preprocess threads
"""
self.tfrecords_file = tfrecords_file #tf文件
self.image_size = image_size #图片大小
self.min_queue_examples = min_queue_examples #队列中的最小样本数
self.batch_size = batch_size #每批的数量大小
self.num_threads = num_threads #线程数
self.reader = tf.TFRecordReader() #tf文件的读取
self.name = name
def feed(self):
with tf.name_scope(self.name):
#批量读取tf文件中的图片
filename_queue = tf.train.string_input_producer([self.tfrecords_file])
#定义tf的文件读取
reader = tf.TFRecordReader()
#读取队列的图像数据,返回文件名和文件
_, serialized_example = self.reader.read(filename_queue)
#逐个读取图像文件
features = tf.parse_single_example(
#文件对象
serialized_example,
features={
#解析文件名
'image/file_name': tf.FixedLenFeature([], tf.string),
#图像压缩编码
'image/encoded_image': tf.FixedLenFeature([], tf.string),
})
image_buffer = features['image/encoded_image']
#图像编码
image = tf.image.decode_jpeg(image_buffer, channels=3)
#使用下面的_preprocess函数,获取图像信息
image = self._preprocess(image)
#输出一个乱序的样本排列的batch
images = tf.train.shuffle_batch(
[image], batch_size=self.batch_size, num_threads=self.num_threads,
capacity=self.min_queue_examples + 3*self.batch_size,
min_after_dequeue=self.min_queue_examples
)
#输出带有图像的probuf---协议缓冲区
tf.summary.image('_input', images)
return images
def _preprocess(self, image):
#resize图片
image = tf.image.resize_images(image, size=(self.image_size, self.image_size))
#把图像从int型转换为float型
image = utils.convert2float(image)
#获取图像信息
image.set_shape([self.image_size, self.image_size, 3])
return image
def test_reader():
# x域的tf文件路径
TRAIN_FILE_1 = 'data/tfrecords/apple.tfrecords'
# y域的tf文件路径
TRAIN_FILE_2 = 'data/tfrecords/orange.tfrecords'
with tf.Graph().as_default():
#读取x域的文件中的图像,批次大小为2
reader1 = Reader(TRAIN_FILE_1, batch_size=2)
#读取y域的文件中的图像,批次大小为2
reader2 = Reader(TRAIN_FILE_2, batch_size=2)
images_op1 = reader1.feed()
images_op2 = reader2.feed()
#定义会话
sess = tf.Session()
#初始化变量
init = tf.global_variables_initializer()
sess.run(init)
#创建一个线程管理器(协调器)
coord = tf.train.Coordinator()
#启动入队线程,返回线程ID的列表
threads = tf.train.start_queue_runners(sess=sess, coord=coord)
try:
step = 0
while not coord.should_stop():
#执行images_op1、images_op2操作
batch_images1, batch_images2 = sess.run([images_op1, images_op2])
#打印日志
print("image shape: {}".format(batch_images1))
print("image shape: {}".format(batch_images2))
print("="*10)
step += 1 #步长+1
#做异常处理---用户中断执行
except KeyboardInterrupt:
print('Interrupted')
#主线程已完成任务,请求关闭处理入队操作的线程
coord.request_stop()
#做异常处理---常规错误
except Exception as e:
#主线程已完成任务,请求关闭处理入队操作的线程
coord.request_stop(e)
finally:
# 主线程已完成任务,请求关闭处理入队操作的线程
coord.request_stop()
# 主线程等待所有线程关闭完毕,再进入下一步
coord.join(threads)
if __name__ == '__main__':
#调用函数
test_reader()
9.ops.py
import tensorflow as tf
# 函数作用:
# 首先为输入图片左右都填充3条边,再用一个773的过滤器,步长为1,
# 将结果先通过normal再用激活函数(tanh或者relu)输出,输出结果的深度为k。
def c7s1_k(input, k, reuse=False, norm='instance', activation='relu', is_training=True, name='c7s1_k'):
""" A 7x7 Convolution-BatchNorm-ReLU layer with k filters and stride 1
Args:
input: 4D tensor
k: integer, number of filters (output depth)
norm: 'instance' or 'batch' or None
activation: 'relu' or 'tanh'
name: string, e.g. 'c7sk-32'
is_training: boolean or BoolTensor
name: string
reuse: boolean
Returns:
4D tensor
"""
#tf.variable_scope(): 管理变量的命名机制 reuse=False:命名不能重复
with tf.variable_scope(name, reuse=reuse):
#定义卷积核
weights = _weights("weights",shape=[7, 7, input.get_shape()[3], k])
#tf.pad:tensor的边缘填充 如下:第2纬、第3纬左右分别加三条边
padded = tf.pad(input, [[0,0],[3,3],[3,3],[0,0]], 'REFLECT') #"REFLECT"表示左右填充,"SYMMETRIC"表示上下填充
#卷积层
conv = tf.nn.conv2d(padded, weights,strides=[1, 1, 1, 1], padding='VALID')
#实例规范化处理
normalized = _norm(conv, is_training, norm)
if activation == 'relu':
#使用relu激活函数
output = tf.nn.relu(normalized)
if activation == 'tanh':
#使用tanh激活函数
output = tf.nn.tanh(normalized)
return output
#函数作用:
#函数具体参数同第一个函数,输入为一个4D-tansor,
#先通过3*3*(图像深度)的过滤器,步长为2,再通过normal函数,最后用relu激活函数输出,输出数据深度为k。
def dk(input, k, reuse=False, norm='instance', is_training=True, name=None):
#管理变量的命名机制
with tf.variable_scope(name, reuse=reuse):
#定义卷积核
weights = _weights("weights",shape=[3, 3, input.get_shape()[3], k])
#卷积层
conv = tf.nn.conv2d(input, weights,strides=[1, 2, 2, 1], padding='SAME')
#实例规范化处理
normalized = _norm(conv, is_training, norm)
#使用tanh激活函数
output = tf.nn.relu(normalized)
return output
# 函数作用:
# 函数具体参数同第一个函数,公有两层。输入是4D-tensor,先在图像左右填充一条边,
# 之后通过3*3*(图像深度)的过滤器,步长为1,再通过normal,最后通过relu激活后送到第二层的输入,输入深度为k;
# 第二层也是左右填充一条边,之后通过3*3*(图像深度)的过滤器,步长为1,再通过normal,和输入的4D-tensor相加,之后输出。
def Rk(input, k, reuse=False, norm='instance', is_training=True, name=None):
#管理变量的命名机制
with tf.variable_scope(name, reuse=reuse):
#第一层卷积
with tf.variable_scope('layer1', reuse=reuse):
#定义卷积核
weights1 = _weights("weights1",shape=[3, 3, input.get_shape()[3], k])
#边缘填充,第二、三维分别增加一条边
padded1 = tf.pad(input, [[0,0],[1,1],[1,1],[0,0]], 'REFLECT')
#卷积
conv1 = tf.nn.conv2d(padded1, weights1,strides=[1, 1, 1, 1], padding='VALID')
#实例规范化处理
normalized1 = _norm(conv1, is_training, norm)
#使用relu激活函数
relu1 = tf.nn.relu(normalized1)
#第二层卷积
with tf.variable_scope('layer2', reuse=reuse):
#定义卷积核
weights2 = _weights("weights2",shape=[3, 3, relu1.get_shape()[3], k])
#边缘填充,第二、三维分别增加一条边
padded2 = tf.pad(relu1, [[0,0],[1,1],[1,1],[0,0]], 'REFLECT')
#卷积
conv2 = tf.nn.conv2d(padded2, weights2,strides=[1, 1, 1, 1], padding='VALID')
#实例规范化处理
normalized2 = _norm(conv2, is_training, norm)
#经过两次卷积和归一化之后,再和原图相加输出
output = input+normalized2
return output
# 函数作用:
# 目测是将输入4D-tensor,连续不断地通过Rk()函数(即函数3),
# 又将输出作为输入,循环n次,Rk()函数定义的内容实际上是ResNet基本操作,在n_res_blocks中将其连续调用n次。
def n_res_blocks(input, reuse, norm='instance', is_training=True, n=6):
#深度
depth = input.get_shape()[3]
#讲目标放入RK()函数,连续6次
for i in range(1,n+1):
output = Rk(input, depth, reuse, norm, is_training, 'R{}_{}'.format(depth, i))
#把输出当做下一次的输入
input = output
#返回6次后的最终输出结果
return output
# 函数作用:
# 函数具体参数同第一个函数,输入为一个4D-tansor,核心操作是一个反卷积函数tf.nn.conv2d_transpose,
# 将输入通过这个反卷积函数,之后得到一个特定大小(output_shape)的tensor,通过normal,再通过relu函数激活之后输出。
def uk(input, k, reuse=False, norm='instance', is_training=True, name=None, output_size=None):
""" A 3x3 fractional-strided-Convolution-BatchNorm-ReLU layer
with k filters, stride 1/2
Args:
input: 4D tensor
k: integer, number of filters (output depth)
norm: 'instance' or 'batch' or None
is_training: boolean or BoolTensor
reuse: boolean
name: string, e.g. 'c7sk-32'
output_size: integer, desired output size of layer
Returns:
4D tensor
"""
with tf.variable_scope(name, reuse=reuse):
#获取输入的维度(元组格式),并转为list格式
input_shape = input.get_shape().as_list()
#定义卷积核
weights = _weights("weights",shape=[3, 3, k, input_shape[3]])
if not output_size:
#将原始图片扩大两倍作为输出图片尺寸
output_size = input_shape[1]*2
#定义输出图片的维度
output_shape = [input_shape[0], output_size, output_size, k]
#反卷积,图片放大一倍
fsconv = tf.nn.conv2d_transpose(input, weights,output_shape=output_shape,strides=[1, 2, 2, 1], padding='SAME')
#归一化
normalized = _norm(fsconv, is_training, norm)
#选择relu激活函数
output = tf.nn.relu(normalized)
return output
# 函数作用:
# 函数具体参数同第一个函数,输入为一个4D-tansor,先通过4*4*(图像深度)的过滤器,
# 步长为自定,默认为2,再通过normal函数,最后用Leaky Relu激活函数输出,输出数据深度为k。
def Ck(input, k, slope=0.2, stride=2, reuse=False, norm='instance', is_training=True, name=None):
""" A 4x4 Convolution-BatchNorm-LeakyReLU layer with k filters and stride 2
Args:
input: 4D tensor
k: integer, number of filters (output depth)
slope: LeakyReLU's slope 斜率
stride: integer
norm: 'instance' or 'batch' or None
is_training: boolean or BoolTensor
reuse: boolean
name: string, e.g. 'C64'
Returns:
4D tensor
"""
with tf.variable_scope(name, reuse=reuse):
#定义卷积核
weights = _weights("weights",shape=[4, 4, input.get_shape()[3], k])
#卷积
conv = tf.nn.conv2d(input, weights,strides=[1, stride, stride, 1], padding='SAME')
#归一化
normalized = _norm(conv, is_training, norm)
#选择leaky_relu函数输出
output = _leaky_relu(normalized, slope)
return output
# 函数作用:
# 用于判别器最后一层,输入为4D-tensor,之后通过一个4*4*(图像深度)的过滤层,
# 步长为1,输出为1维,加上一个偏置,之后可以通过sigmod函数,也可以选择不通过sigmod函数,输出。
def last_conv(input, reuse=False, use_sigmoid=False, name=None):
""" Last convolutional layer of discriminator network
(1 filter with size 4x4, stride 1)
Args:
input: 4D tensor
reuse: boolean
use_sigmoid: boolean (False if use lsgan)
name: string, e.g. 'C64'
"""
with tf.variable_scope(name, reuse=reuse):
#定义卷积核
weights = _weights("weights",shape=[4, 4, input.get_shape()[3], 1])
#常量---b
biases = _biases("biases", [1])
#卷积
conv = tf.nn.conv2d(input, weights,strides=[1, 1, 1, 1], padding='SAME')
#相加后输出
output = conv + biases
if use_sigmoid:
#使用sigmoid阶跃函数输出
output = tf.sigmoid(output)
return output
#定义变量-卷积核
def _weights(name, shape, mean=0.0, stddev=0.02):
var = tf.get_variable(name, shape,initializer=tf.random_normal_initializer(mean=mean, stddev=stddev, dtype=tf.float32))
return var
#定义常量-b
def _biases(name, shape, constant=0.0):
return tf.get_variable(name, shape,initializer=tf.constant_initializer(constant))
#返回一个最大值
def _leaky_relu(input, slope):
return tf.maximum(slope*input, input)
#定义规范化:instance、batch或者无
def _norm(input, is_training, norm='instance'):
if norm == 'instance':
return _instance_norm(input)
elif norm == 'batch':
return _batch_norm(input, is_training)
else:
return input
#batch规范化
def _batch_norm(input, is_training):
with tf.variable_scope("batch_norm"):
return tf.contrib.layers.batch_norm(input,decay=0.9,scale=True,updates_collections=None,is_training=is_training)
#instance规范化
def _instance_norm(input):
with tf.variable_scope("instance_norm"):
depth = input.get_shape()[3]
scale = _weights("scale", [depth], mean=1.0)
offset = _biases("offset", [depth])
mean, variance = tf.nn.moments(input, axes=[1,2], keep_dims=True)
epsilon = 1e-5
inv = tf.rsqrt(variance + epsilon)
normalized = (input-mean)*inv
return scale*normalized + offset
def safe_log(x, eps=1e-12):
return tf.log(x + eps)