AE/VAE实战

在这里插入图片描述

import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import Sequential, layers
from PIL import Image


tf.random.set_seed(22)
np.random.seed(22)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = '2'


def save_images(imgs, name):

    # 创建一个280*280大小的图片, 默认为黑色
    new_im = Image.new('L', (280, 280))

    index = 0
    # 以step=28*28遍历new_im的像素点
    # 即往原图片上添加28*28的图片
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            # [b, 28, 28] => im: [28, 28]
            im = imgs[index]
            # 实现array到image的转换
            im = Image.fromarray(im, mode='L')
            # 将一张图粘贴到另一张图像上
            # 第二个参数为图片的左上角位置
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)

h_dim = 20
batchsz = 128
lr = 1e-3

(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train = x_train.astype(np.float32) / 255.
x_test = x_test.astype(np.float32) / 255.
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)

print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)

class AE(keras.Model):

    def __init__(self):
        super(AE, self).__init__()

        # Encoders
        self.encoder = Sequential([
            layers.Dense(256, activation=tf.nn.relu),
            layers.Dense(128, activation=tf.nn.relu),
            layers.Dense(h_dim)
        ])

        # Decoders
        self.decoder = Sequential([
            layers.Dense(128, activation=tf.nn.relu),
            layers.Dense(256, activation=tf.nn.relu),
            layers.Dense(784)
        ])

    def call(self, inputs, training=None):
        # [b, 784] => [b, 20]
        h = self.encoder(inputs)
        # [b, 20] => [b, 784]
        x_hat = self.decoder(h)

        return x_hat

model = AE()
model.build(input_shape=(None, 784))
model.summary()

optimizer = tf.optimizers.Adam(lr=lr)

test_iter = iter(test_db)
for epoch in range(100):
    for step, x in enumerate(train_db):
        # [b, 28, 28] => [b, 784]
        x = tf.reshape(x, [-1, 784])

        with tf.GradientTape() as tape:
            x_rec_logits = model(x)
            # sigmoid
            rec_loss = tf.losses.binary_crossentropy(x, x_rec_logits, from_logits=True)
            rec_loss = tf.reduce_mean(rec_loss)

        grads = tape.gradient(rec_loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if step % 100 == 0:
            print(epoch, step, float(rec_loss))

    # evaluation
    x = next(test_iter)
    logits = model(tf.reshape(x, [-1, 784]))
    # 转换到0~1之间
    x_hat = tf.sigmoid(logits)

    # [b, 784] => [b, 28, 28]
    x_hat = tf.reshape(x_hat, [-1, 28, 28])

    # [b, 28, 28] => [2b, 28, 28]
    # x_concat = tf.concat([x, x_hat], axis=0)
    # x_concat = x_concat.numpy() * 255.
    # x_concat = x_concat.astype(np.uint8)
    x = x.numpy() * 255.
    # 转换类型, image
    x = x.astype(np.uint8)
    # print(x.dtype, x.max(), x.min())
    # 转换到0~255之间的像素值
    x_hat = x_hat.numpy() * 255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x, "images/%d_epochs_x.png" % epoch)
    save_images(x_hat, "images/%d_epochs_rec.png" % epoch)

0_epochs_x.png 原图

在这里插入图片描述

0_epochs_rec.png 重构后的图

在这里插入图片描述

在这里插入图片描述
随着train的进行Mean和std将趋向于std normalization分布
可以直接通过normal分布random sample一些数据,维度同Mean/std层输出,然后直接送到decoder,将得到一些额外输出。

在这里插入图片描述

在这里插入图片描述

import os
import tensorflow as tf
import numpy as np
from tensorflow import keras
from tensorflow.keras import Sequential, layers
from PIL import Image


tf.random.set_seed(22)
np.random.seed(22)
os.environ["TF_CPP_MIN_LOG_LEVEL"] = '2'


def save_images(imgs, name):

    # 创建一个280*280大小的图片, 默认为黑色
    new_im = Image.new('L', (280, 280))

    index = 0
    # 以step=28*28遍历new_im的像素点
    # 即往原图片上添加28*28的图片
    for i in range(0, 280, 28):
        for j in range(0, 280, 28):
            # [b, 28, 28] => im: [28, 28]
            im = imgs[index]
            # 实现array到image的转换
            im = Image.fromarray(im, mode='L')
            # 将一张图粘贴到另一张图像上
            # 第二个参数为图片的左上角位置
            new_im.paste(im, (i, j))
            index += 1

    new_im.save(name)

h_dim = 20
batchsz = 128
lr = 1e-3

(x_train, y_train), (x_test, y_test) = keras.datasets.fashion_mnist.load_data()
x_train = x_train.astype(np.float32) / 255.
x_test = x_test.astype(np.float32) / 255.
train_db = tf.data.Dataset.from_tensor_slices(x_train)
train_db = train_db.shuffle(batchsz * 5).batch(batchsz)
test_db = tf.data.Dataset.from_tensor_slices(x_test)
test_db = test_db.batch(batchsz)

print(x_train.shape, y_train.shape)
print(x_test.shape, y_test.shape)
z_dim = 10

class VAE(keras.Model):

    def __init__(self):
        super(VAE, self).__init__()

        # Encoder
        self.fc1 = layers.Dense(128)
        self.fc2 = layers.Dense(z_dim)  # get mean prediction [b, z_dim]
        self.fc3 = layers.Dense(z_dim)  # get variance

        # Decoder
        self.fc4 = layers.Dense(128)
        self.fc5 = layers.Dense(784)

    def encoder(self, x):
        h = tf.nn.relu(self.fc1(x))
        # get mean
        mu = self.fc2(h)
        # get variance
        log_var = self.fc3(h)

        return mu, log_var

    def decoder(self, z):
        out = tf.nn.relu(self.fc4(z))
        out = self.fc5(out)

        return out

    def reparameter(self, mu, log_var):
        eps = tf.random.normal(log_var.shape)
        # exp(1/2 * log(x))
        std = tf.exp(log_var * 0.5)
        z = mu + std * eps

        return z

    def call(self, inputs, training=None):

        # [b, 784] => [b, z_dim], [b, z_dim]
        mu, log_var = self.encoder(inputs)
        # reparameterization trick
        z = self.reparameter(mu, log_var)

        x_hat = self.decoder(z)

        return x_hat, mu, log_var

model = VAE()
model.build(input_shape=(4, 784))
optimizer = tf.optimizers.Adam(lr)

for epoch in range(1000):

    for step, x in enumerate(train_db):

        x = tf.reshape(x, [-1, 784])
        with tf.GradientTape() as tape:
            x_rec_logits, mu, log_var = model(x)

            rec_loss = tf.nn.sigmoid_cross_entropy_with_logits(labels=x, logits=x_rec_logits)
            rec_loss = tf.reduce_sum(rec_loss) / x.shape[0]

            # compute kl divergence (mu, var) ~ N (0, 1)
            # https://stats.stackexchange.com/questions/7440/kl-divergence-between-two-univariate-gaussians
            kl_div = -0.5 * (log_var + 1 - mu ** 2 - tf.exp(log_var))
            kl_div = tf.reduce_sum(kl_div) / x.shape[0]

            loss = rec_loss + 1. * kl_div

        grads = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(grads, model.trainable_variables))

        if step % 100 == 0:
            print(epoch, step, 'kl div:', float(kl_div), 'rec loss:', float(rec_loss))

    # evaluation [b, 10]
    z = tf.random.normal((batchsz, z_dim))
    logits = model.decoder(z)
    x_hat = tf.sigmoid(logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, "images2/sample_epoch%d.png" % epoch)

    x = next(iter(test_db))
    x = tf.reshape(x, [-1, 784])
    x_hat_logits, _, _ = model(x)
    x_hat = tf.sigmoid(x_hat_logits)
    x_hat = tf.reshape(x_hat, [-1, 28, 28]).numpy() * 255.
    x_hat = x_hat.astype(np.uint8)
    save_images(x_hat, "images2/rec_epoch%d.png" % epoch)

rec_epoch0.png 重构后的
在这里插入图片描述
sample_epoch0.png smaple出来的
在这里插入图片描述

猜你喜欢

转载自blog.csdn.net/qq_46456049/article/details/113463673
VAE
AE