Tensorflow2.0之图像说明文字生成

项目介绍

在此项目中,我们希望能在输入一张图像之后得到一句话来描述该图像,比如输入下面这张图像后会输出 “a man in an all excited to a podium” 之类的说明性文字。
在这里插入图片描述
注意:这里我们使用 MS-COCO 数据集来练习,这个数据集比较大,大概有13G左右。

代码实现

1、导入需要的库

import tensorflow as tf

# You'll generate plots of attention in order to see which parts of an image
# our model focuses on during captioning
import matplotlib.pyplot as plt

# Scikit-learn includes many helpful utilities
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle

2、下载数据集

# Download image files
image_folder = '/train2014/'
if not os.path.exists(os.path.abspath('.') + image_folder):
    image_zip = tf.keras.utils.get_file('train2014.zip',
                                      cache_subdir=os.path.abspath('.'),
                                      origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                      extract = True)
    PATH = os.path.dirname(image_zip) + image_folder
    os.remove(image_zip)
else:
    PATH = os.path.abspath('.') + image_folder
# Download caption annotation files
annotation_folder = '/annotations/'
if not os.path.exists(os.path.abspath('.') + annotation_folder):
    annotation_zip = tf.keras.utils.get_file('captions.zip',
                                          cache_subdir=os.path.abspath('.'),
                                          origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                          extract = True)
    annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'
    os.remove(annotation_zip)

其中 image_zip 是下载的图片压缩包;annotation_zip 是下载的说明性文字压缩包,其中 captions_train2014.json 文件中包含训练集的说明性文字以及对应的图片名称。

3、读取 json 文件

with open(annotation_file, 'r') as f:
    annotations = json.load(f)
annotations.keys()
dict_keys(['info', 'images', 'licenses', 'annotations'])

通过打印 annotations 类型可知它是一个字典,它的键值包括 ‘info’,‘images’,‘licenses’ 和 ‘annotations’,其中我们只需要用到 ‘annotations’。

annotations[‘annotations’] 返回一个列表,这个列表中的每个元素都是含有三个键值的字典,这三个键值分别是 ‘image_id’,‘id’ 和 ‘caption’,其中我们需要 ‘caption’ 来返回说明性文字和 ‘image_id’ 来返回这个说明性文字对应的图片代号。

为节约训练时间,我们只取其中30000个样本训练。

all_captions = []
all_img_name_vector = []

for annot in annotations['annotations']:
    caption = '<start> ' + annot['caption'] + ' <end>'
    image_id = annot['image_id']
    full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id)

    all_img_name_vector.append(full_coco_image_path)
    all_captions.append(caption)

train_captions, img_name_vector = shuffle(all_captions,
                                          all_img_name_vector,
                                          random_state=1)

num_examples = 30000
train_captions = train_captions[:num_examples]
img_name_vector = img_name_vector[:num_examples]

关于 sklearn.utils.shuffle() 的用法请参考:用 sklearn.utils.shuffle 来打乱样本顺序

4、载入图片

def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

其中 tf.io.read_file(image_path) 输出的是图片信息,我们在将其进行解码( tf.image.decode_jpeg(img, channels=3) )后可以得到图片像素。

因为我们用 Inception_v3 网络来提取图片特征,所以我们在这里要将图片像素范围转化为 Inception_v3 网络需要的范围,所以要使用 tf.keras.applications.inception_v3.preprocess_input(img)。

5、载入模型

在这里我们使用 Inception_v3 网络来提取图片特征,输出为该网络中最后一个卷积层的输出。

image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

6、获取图片特征

6.1 删除重复的图片

因为一张图片可能对应不同的说明性文字,所以图片数据集中存在重复的问题。

encode_train = sorted(set(img_name_vector))

此时,encode_train 列表中的图片名称是不重复的。

6.2 切片、分批

首先将列表 encode_train 转化为 dataset 类型的数据。

image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)

然后我们把它映射到 load_image() 函数实现从图片名称到图片的转换。

image_dataset = image_dataset.map(load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)

6.3 将图片输入网络以获取特征

for img, path in image_dataset:
    batch_features = image_features_extract_model(img)
    batch_features = tf.reshape(batch_features,
                                (batch_features.shape[0], -1, batch_features.shape[3]))

    for bf, p in zip(batch_features, path):
        path_of_feature = p.numpy().decode("utf-8")
        np.save(path_of_feature, bf.numpy())

这里的 np.save(path_of_feature, bf.numpy()) 的目的是:在调用 np.load(path_of_feature+’.npy’) 的时候能输出这个路径下的图片对应的特征 bf.numpy()。

最终得到的图片特征的维度为:(batch_size, 64, 2048)。

7、文本 \rightarrow 数字向量

7.1 构建分词器

为了节省内存,我们把词汇表大小限制在前5000个单词,其他的单词用 “<unk>” 代替。

top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(train_captions)

7.2 构建数字向量

train_seqs = tokenizer.texts_to_sequences(train_captions)

7.3 将数字向量填充到同一长度

cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')

8、划分训练集和验证集

img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector,
                                                                    cap_vector,
                                                                    test_size=0.2,
                                                                    random_state=0)

9、建立 tf.data 数据集

在 6.2 部分中建立的数据集是为了将图片输入 Inception_v3 网络得到特征,而在这里建立的数据集对应的样本是图片的(解码前的)名称,标签是这张图片对应的说明性文字的数字向量。

dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

接着,我们要将数据集中的图片名称转换为这张图片对应的特征,所以我们要使用 np.load() 函数。

def map_func(img_name, cap):
    img_tensor = np.load(img_name.decode('utf-8')+'.npy')
    return img_tensor, cap

dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_func, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE)

接着,对数据集进行打乱、分批:

dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

10、编码器

因为在之前我们已经用卷积神经网络提取了特征(batch_size, 64, 2048),所以在这个编码器中我们只需要定义全连接层(其神经元个数是词嵌入维度)即可。

class CNN_Encoder(tf.keras.Model):
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

经过编码器后,图像特征的形状变为(batch_size, 64, embedding_dim)。

11、Bahdanau 注意力

相关论文参考:BahdanauAttention

class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, features, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)

        score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

        attention_weights = tf.nn.softmax(self.V(score), axis=1)

        context_vector = attention_weights * features
        context_vector = tf.reduce_sum(context_vector, axis=1)

        return context_vector, attention_weights

这里的 features 其实就是编码器中输出的结果,经过含 units 个神经元的 Dense 层之后,其形状从 (batch_size, 64, embedding_dim) 变成了 (batch_size, 64, units)。

这里的 hidden 其实就是解码器中输出的隐层向量,我们需要将其维度从 (batch_size, embedding_dim) 变成 (batch_size, 1, embedding_dim) 来执行之后的加法以计算分数,将增加维度后的向量经过含 units 个神经元的 Dense 层之后,其形状从 (batch_size, 1, embedding_dim) 变成了 (batch_size, 1, units)。

将以上两个输出相加得到的形状为 (batch_size, 64, units),经过含1个神经元的 Dense 层之后得到 score,其形状变成 (batch_size, 64, 1)。

Softmax 默认被应用于最后一个轴,但是这里我们想将它应用于第二个轴(即 axis=1),因为分数 (score) 的形状是 (batch_size, 64, 1)。我们想为每个输入的特征 (batch_size, 64, embedding_dim) 分配一个权重,所以 softmax 应该用在 64 这个轴上。经过 Softmax 层之后,得到的注意力权重形状和 score 的形状相同,都是 (batch_size, 64, 1)。
【注】Softmax 的不同的轴的计算规则参考:tf.nn.softmax(x, axis)里axis起什么作用?

将注意力权重和 features 相乘,得到上下文向量,其形状为 (batch_size, 16, embedding_dim)。此向量也就是加了权重的编码向量。将上下文向量基于第二个轴求和(原因与之前相同),得到最终的上下文向量,其形状为 (batch_size, embedding_dim)。

12、解码器

class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size):
        super(RNN_Decoder, self).__init__()
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.units)

    def call(self, x, features, hidden):
        context_vector, attention_weights = self.attention(features, hidden)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state = self.gru(x)
        x = self.fc1(output)
        x = tf.reshape(x, (-1, x.shape[2]))
        x = self.fc2(x)
        return x, state, attention_weights

    def reset_state(self, batch_size):
        return tf.zeros((batch_size, self.units))

在解码器中,我们首先通过 Bahdanau 注意力得到上下文向量和注意力权重,他们的形状分别为 (batch_size, embedding_dim) 和 (batch_size, 64, 1)。

这里的输入 x 是一个词对应的数字(第一个输入模型的数字一定是 “<start>” 对应的数字),其形状为(batch_size, 1),经过词嵌入层之后,其形状变为(batch_size, embedding_dim),将其与增加了第二维度的上下文向量合并后得到形状为 (batch_size, 64, 2*embedding_dim)。

将其输入 GRU,得到输出为 (batch_size, 1, units),隐藏状态为 (batch_size, units)。

然后,经过全连接层后,得到 (batch_size, vocab_size)。

13、设置超参数建立模型

BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = top_k
num_steps = len(img_name_train) // BATCH_SIZE
features_shape = 2048
attention_features_shape = 64

encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

14、初始化优化器

optimizer = tf.keras.optimizers.Adam()

15、损失函数

当输入的向量中出现0元素,说明这个元素所在的文本已经结束了,这个文本不再参与损失的计算,所以在计算损失的时候,要使用掩膜处理,将已结束文本的损失置零。

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

16、配置检查点

checkpoint_path = "./checkpoints/train"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

# 如果检查点存在,则恢复最新的检查点。
start_epoch = 0
if ckpt_manager.latest_checkpoint:
    start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
    # restoring the latest checkpoint in checkpoint_path
    ckpt.restore(ckpt_manager.latest_checkpoint)

17、梯度下降

@tf.function
def train_step(img_tensor, target):
    loss = 0

    # initializing the hidden state for each batch
    # because the captions are not related from image to image
    hidden = decoder.reset_state(batch_size=target.shape[0])

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)

    with tf.GradientTape() as tape:
        features = encoder(img_tensor)

        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, _ = decoder(dec_input, features, hidden)

            loss += loss_function(target[:, i], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(target[:, i], 1)

    total_loss = (loss / int(target.shape[1]))

    trainable_variables = encoder.trainable_variables + decoder.trainable_variables

    gradients = tape.gradient(loss, trainable_variables)

    optimizer.apply_gradients(zip(gradients, trainable_variables))

    return loss, total_loss

18、训练

loss_plot = []

EPOCHS = 20

# 训练从 start_epoch 训练到 EPOCHS
for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

        if batch % 100 == 0:
            print ('Epoch {} Batch {} Loss {:.4f}'.format(
              epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
    # storing the epoch end loss value to plot later
    loss_plot.append(total_loss / num_steps)

    if epoch % 5 == 0:
        ckpt_manager.save()

    print ('Epoch {} Loss {:.6f}'.format(epoch + 1,
                                         total_loss/num_steps))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

19、验证

19.1 验证函数

  • 1、初始化解码器的隐藏状态;
  • 2、为图像添加维度;
  • 3、提取图像特征并将所得形状转换成编码器需要的形状;
  • 4、将图像特征输入编码器;
  • 5、初始化解码器输入为 ‘<start>’;
  • 6、经解码器得到的数据形状为 (batch_size, vocab_size),要用 tf.random.categorical() 函数找出每个样本的概率最大的下一个单词。
  • 7、将预测的单词放到一个列表中。
def evaluate(image):
    attention_plot = np.zeros((max_length, attention_features_shape))

    hidden = decoder.reset_state(batch_size=1)

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

    features = encoder(img_tensor_val)

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot

19.2 画注意力图

def plot_attention(image, result, attention_plot):
    temp_image = np.array(Image.open(image))

    fig = plt.figure(figsize=(10, 10))

    len_result = len(result)
    for l in range(len_result):
        temp_att = np.resize(attention_plot[l], (8, 8))
        ax = fig.add_subplot(len_result//2, len_result//2, l+1)
        ax.set_title(result[l])
        img = ax.imshow(temp_image)
        ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())

    plt.tight_layout()
    plt.show()

19.3 随机测试验证集图片

# captions on the validation set
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)

print ('Real Caption:', real_caption)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image, result, attention_plot)

19.4 测试自己的图片

image_url = 'https://tensorflow.org/images/surf.jpg'
image_extension = image_url[-4:]
image_path = tf.keras.utils.get_file('image'+image_extension,
                                     origin=image_url)

result, attention_plot = evaluate(image_path)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image_path, result, attention_plot)
# opening the image
Image.open(image_path)
原创文章 151 获赞 56 访问量 15万+

猜你喜欢

转载自blog.csdn.net/qq_36758914/article/details/105563550