语义分割的UNET网络结构
Unet是2015年诞生的模型,它几乎是当前segmentation项目中应用最广的模型。
Unet能从更少的训练图像中进行学习,当它在少于40张图的生物医学数据集上训练时,IOU值仍能达到92%。
Unet网络非常简单,前半部分作用是特征提取,后半部分是上采样。在一些文献中也把这样的结构叫做编码器-解码器结构。由于此网络整体结构类似于大写的英文字母U,故得名U-net。
这里的融合使用的是tf.concat(),FCN中使用的是tf.add()。
Unet实例,城市街景数据集
Cityscapes城市景观数据集
数据集主页:https://www.cityscapes-dataset.com
主要包含在欧洲城市驾驶的车辆上拍摄的带标签视频。Cityscapes数据集专注于对城市街道场景的语义理解。
城市景观数据集主要是是对城市街道场景的语义理解数据集,该大型数据集包含来自50个不同城市的街道场景中记录的多种立体视频序列,除了20000个弱注释帧以外,还包括5000帧高质量像素级注释。
Cityscapes数据集有两套评测标准:
1、5000张精细标注的图像
2、5000张精细标注外加20000张粗糙标注的图像。
精细注释数据集包含2975张训练图片和500张验证图片。包含了街景图片和对应的标签,标签共34类。
数据集包含两部分
images和gtFine
images是图像,gtFine包含图像对应的分割图
gtFine中有多个目标分割图
其中包含_gtFine_color代表彩色语义分割图,包含_gtFine_instanceids代表实例分割图,包含_gtFine_labelids代表语义分割的目标图,也就是标签。
代码实现
环境:Windows10+Pycharm+Python3.6.8+TensorFlow2.2.0
# -*- coding: UTF-8 -*-"""
Author: LGD
FileName: semantic_segmentation_UNET
DateTime: 2021/1/1 19:41
SoftWare: PyCharm
"""import tensorflow as tf
import numpy as np
import os
import matplotlib.pyplot as plt
import glob
"""
加载图片路径
"""
img = glob.glob('dataset/images/train/*/*.png') # tf.io.glob.glob
print('the image number: ', len(img))
print('the first three image path: ', img[:3])
print('the last three image path: ', img[-3:])
label = glob.glob('dataset/gtFine/train/*/*_gtFine_labelIds.png')
print('the label number: ', len(label))
print('the first three label path: ', label[:3])
print('the last three label path: ', label[-3:])
# 防止训练时一次加载的图片和标签都是前一个城市或两个城市,需要将路径打乱
index = np.random.permutation(len(img))
img = np.array(img)[index]
label = np.array(label)[index]
train_count = len(img)
img_val = glob.glob('dataset/images/val/*/*.png')
label_val = glob.glob('dataset/gtFine/val/*/*_gtFine_labelIds.png')
print('the val img and label number: ', len(img_val), len(label_val))
val_count = len(img_val)
"""
创建dataset
"""
dataset_train = tf.data.Dataset.from_tensor_slices((img, label))
dataset_val = tf.data.Dataset.from_tensor_slices((img_val, label_val))
def read_png(path): # 读取png图像
image = tf.io.read_file(path)
image = tf.image.decode_png(image, channels=3)
return image
def read_png_label(path): # 读取label的png图像
label_img = tf.io.read_file(path)
label_img = tf.image.decode_png(label_img, channels=1)
return label_img
# img_1 = read_png(img[0])# label_1 = read_png_label(label[0])# print(img_1.shape, label_1.shape)
"""
图片增强
1、随机翻转 tf.image.flip_left_right()
2、随机裁剪 tf.concat() 先将两张图片叠加在一起,再随机裁剪
"""
def crop_img(img, mask): # 裁剪图像
concat_img = tf.concat([img, mask], axis=-1)
concat_img = tf.image.resize(concat_img, (280, 280), method=tf.image.ResizeMethod.NEAREST_NEIGHBOR)
crop_img = tf.image.random_crop(concat_img, [256, 256, 4])
return crop_img[:, :, :3], crop_img[:, :, 3:]
# # 测试裁剪函数# img_1, label_1 = crop_img(img_1, label_1)# plt.subplot(1, 2, 1)# plt.imshow(img_1.numpy())# plt.subplot(1, 2, 2)# plt.imshow(np.squeeze(label_1.numpy()))# plt.show()
def normal(img, mask): # 归一化图像
img = tf.cast(img, tf.float32) / 127.5 - 1
mask = tf.cast(mask, tf.int32)
return img, mask
def load_image_train(img_path, mask_path): # 加载训练图像
img = read_png(img_path)
mask = read_png_label(mask_path)
img, mask = crop_img(img, mask)
if tf.random.uniform(()) > 0.5:
img = tf.image.flip_left_right(img)
mask = tf.image.flip_left_right(mask)
img, mask = normal(img, mask)
return img, mask
def load_image_val(img_path, mask_path): # 加载验证图像
img = read_png(img_path)
mask = read_png_label(mask_path)
img = tf.image.resize(img, (256, 256))
mask = tf.image.resize(mask, (256, 256))
img, mask = normal(img, mask)
return img, mask
BATCH_SIZE = 32
BUFFER_SIZE = 300
step_per_epoch = train_count // BATCH_SIZE
val_step = val_count // BATCH_SIZE
auto = tf.data.experimental.AUTOTUNE
dataset_train = dataset_train.map(load_image_train, num_parallel_calls=auto)
dataset_val = dataset_val.map(load_image_val, num_parallel_calls=auto)
"""
创建模型阶段
"""
dataset_train = dataset_train.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset_val = dataset_val.batch(BATCH_SIZE)
# 向前传播,下采样class DownSample(tf.keras.layers.Layer):
def __init__(self, units):
super(DownSample, self).__init__()
self.conv1 = tf.keras.layers.Conv2D(units, kernel_size=3, padding='same')
self.conv2 = tf.keras.layers.Conv2D(units, kernel_size=3, padding='same')
self.pool = tf.keras.layers.MaxPool2D()
def call(self, x, is_pool=True):
if is_pool:
x = self.pool(x)
x = self.conv1(x)
x = tf.nn.relu(x)
x = self.conv2(x)
x = tf.nn.relu(x)
return x
# 上采样class UpSample(tf.keras.layers.Layer):
def __init__(self, units):
super(UpSample, self).__init__()
self.conv1 = tf.keras.layers.Conv2D(units, kernel_size=3, padding='same')
self.conv2 = tf.keras.layers.Conv2D(units, kernel_size=3, padding='same')
self.deconv = tf.keras.layers.Conv2DTranspose(units//2, kernel_size=2, strides=2, padding='same')
def call(self, x):
x = self.conv1(x)
x = tf.nn.relu(x)
x = self.conv2(x)
x = tf.nn.relu(x)
x = self.deconv(x)
x = tf.nn.relu(x)
return x
# 定义Unet模型class Unet_model(tf.keras.Model):
def __init__(self):
super(Unet_model, self).__init__()
# 下采样
self.down1 = DownSample(64)
self.down2 = DownSample(128)
self.down3 = DownSample(256)
self.down4 = DownSample(512)
self.down5 = DownSample(1024)
# 上采样
self.up = tf.keras.layers.Conv2DTranspose(filters=512, kernel_size=2, strides=2, padding='same')
self.up1 = UpSample(512)
self.up2 = UpSample(256)
self.up3 = UpSample(128)
self.conv_last = DownSample(64)
self.last = tf.keras.layers.Conv2D(filters=34, kernel_size=1, padding='same')
def call(self, x):
# 下采样层
x1 = self.down1(x, is_pool=False)
x2 = self.down2(x1)
x3 = self.down3(x2)
x4 = self.down4(x3)
x5 = self.down5(x4)
# 上采样层
x5 = self.up(x5)
# concat融合
x5 = tf.concat([x4, x5])
x6 = self.up1(x5)
x6 = tf.concat([x3, x6])
x7 = self.up2(x6)
x7 = tf.concat([x2, x7])
x8 = self.up3(x7)
x8 = tf.concat([x1, x8])
x9 = self.conv_last(x8, is_pool=False)
x10 = self.last(x9)
return x10
model = Unet_model()
# 定义优化函数、损失函数、计算指标
optimizer = tf.keras.optimizers.Adam(0.0001)
loss_func = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) # labels 0, 1, 2, 3
class MeanIOU(tf.keras.metrics.MeanIoU):
def __call__(self, y_true, y_pred):
y_pred = tf.argmax(y_pred)
return super(MeanIOU, self).__call__(y_true, y_pred)
train_loss = tf.keras.metrics.Mean(name='train_loss')
# 预测结果是一个长度为34的向量与原有数据结果不一样,需要SparseCategoricalAccuracy解析成对应的结果才能计算准确率
train_acc = tf.keras.metrics.SparseCategoricalAccuracy(name='train_acc')
train_iou = MeanIOU(num_classes=34, name='train_iou')
test_loss = tf.keras.metrics.Mean(name='test_loss')
# 预测结果是一个长度为34的向量与原有数据结果不一样,需要SparseCategoricalAccuracy解析成对应的结果才能计算准确率
test_acc = tf.keras.metrics.SparseCategoricalAccuracy(name='test_acc')
test_iou = MeanIOU(num_classes=34, name='test_iou')
@tf.function # 编译成自动图,提高计算速率def train_step(images, labels):
with tf.GradientTape() as t:
pred = model(images)
loss = loss_func(labels, pred)
gradients = t.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss(loss)
train_acc(labels, pred)
train_iou(labels, pred)
@tf.functiondef test_step(images, labels):
pred = model(images)
t_loss = loss_func(labels, pred)
test_loss(t_loss)
test_acc(labels, pred)
test_iou(labels, pred)
EPOCH = 60
def train():
for epoch in range(EPOCH):
# 在下一个epoch开始时,重置评估指标
train_loss.reset_states()
train_acc.reset_states()
train_iou.reset_states()
test_loss.reset_states()
test_acc.reset_states()
test_iou.reset_states()
for images, labels in dataset_train:
train_step(images, labels)
for test_images, test_labels in dataset_val:
test_step(test_images, test_labels)
template = 'Epoch {}, loss: {:.3f}, acc: {:.3f}. IOU: {:.3f}.' \
'test loss: {:.3f}, test acc: {:.3f}, test IOU: {:.3f}.'
print(template.format(epoch+1, train_loss.result(), train_acc.result()*100, train_iou.result(),
test_loss.result(), test_acc.result()*100, test_iou.result()))