一、检测任务
1、从硬件内存中读取
#-*- coding: utf-8 -*-
import keras
import tensorflow as tf
print('TensorFlow version:', tf.__version__)
print('Keras version:', keras.__version__)
from keras.layers import GlobalAveragePooling2D
#from numpy.random import seed
#seed(1)
#from tensorflow import set_random_seed
#set_random_seed(2)
#from densenet_fast2 import create_dense_net
from keras.applications.nasnet import NASNetMobile
import os
from os.path import join
import json
# import random
# import itertools
# import re
# import datetime
# import cairocffi as cairo
#import editdistance
import numpy as np
from scipy import ndimage
#import pylab
#import matplotlib.pyplot as plt
#import matplotlib.gridspec as gridspec
from keras import backend as K
from keras import regularizers
from keras.layers.convolutional import Conv2D, MaxPooling2D, ZeroPadding2D
from keras.layers.wrappers import TimeDistributed, Bidirectional
from keras.layers import Input, Dense, Activation, Dropout, Permute, Flatten
from keras.layers import Reshape, Lambda
from keras.layers.normalization import BatchNormalization
from keras.layers.merge import add, concatenate
from keras.models import Model, load_model
from keras.layers.recurrent import GRU, LSTM
from keras.optimizers import SGD, Adam, Adadelta
from keras.utils.data_utils import get_file
from keras.preprocessing import image
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.utils import multi_gpu_model
#from keras.applications.vgg16 import VGG16
from keras.applications.densenet import DenseNet121
import cv2
import logging
from collections import Counter
import codecs
from densenet_fast import create_dense_net
from keras.regularizers import l2
# -------------------------config section-------------------------
# 身份证四点回归
imagepath = "/workdir/data/"
gpu_count = 1
gpu_list = "0" #指定使用的GPU列表
image_size = 224 # image_size = image_h = image_w ,输入图片的大小
output_model_path = "/workdir/model_chenyu_v1/"
load_weight_filename = "weights.02-2599.09.hdf5"
load_weight_flag = True #True 为加载预训练权重
model_filename = "idcard_corner.h5"
load_model_flag = False #load_model_flag=False为训练模式
# -------------------------config section-------------------------
logging.basicConfig(filename=join(output_model_path, "result.log"), level=logging.INFO) #日志基础配置(记录日志)
config = tf.ConfigProto()
config.allow_soft_placement = True #如果你指定的设备不存在,允许TF自动分配设备
config.gpu_options.allow_growth = True #GPU显存根据需求增长
config.gpu_options.visible_device_list = gpu_list #设置了tensorflow可见的GPU编号
sess = tf.Session(config=config)
K.set_session(sess) # 开启后端的模块中的函数
# ----------------------------------------------------------------------------
# imagenames = []
# with open(imagenamelist, 'r') as f: #将txt中的label加入列表中
# for line in f:
# line = line.strip()
# imagenames.append(line)
# ---- 数据生成器 ----
class ImageGenerator:
def __init__(self,
dirpath,
img_size,
batch_size
, is_train=True
):
self.img_size = img_size
self.batch_size = batch_size
self.corners = []
self.dirpath=dirpath
self.tmp=[0]*8
# import pdb
# pdb.set_trace()
if is_train:
with open(os.path.join(self.dirpath, 'train.txt'), 'r') as f_val: #txt文档
self.f_list = f_val.readlines()
else:
with open(os.path.join(self.dirpath, 'test.txt'), 'r') as f_train:
self.f_list = f_train.readlines()
self.n=(len(self.f_list)//self.batch_size)*3
# import pdb
# pdb.set_trace()
# self.tmp=[]
def next_sample(self):
# load img and img anno
random_img = np.random.choice(self.f_list)
img_filename = str(random_img).strip()
# load img and img anno
img_filepath = join(self.dirpath,'img', img_filename+'.jpg') #遍历照片
#print(img_filepath)
img = cv2.imread(img_filepath)
h, w, channel = img.shape
#img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
img = cv2.resize(img, (self.img_size, self.img_size), interpolation=cv2.INTER_CUBIC) # ---- resize ---
img = img.astype(np.float32) # ------------ uint8 转换成 float32 -------
img /= 255 # ---- img 的每一个像素都除以255, 归一化 -------
json_filepath = join(self.dirpath,'label',img_filename+'.json') #遍历json文件
ann = json.load(codecs.open(json_filepath, 'r', encoding='utf-8')) #codecs.open读入json直接解码
#idcardcorners = np.float32([ann['lt']['x'], ann['lt']['y'], ann['rt']['x'], ann['rt']['y'], ann['rb']['x'], ann['rb']['y'], ann['lb']['x'], ann['lb']['y']])
#self.tmp.append(idcardcorners)
self.tmp[0] =(np.float32(self.img_size) / np.float32(w))*(np.float32(ann['lt']['x']))
self.tmp[2] = (np.float32(self.img_size) / np.float32(w))*(np.float32(ann['lt']['y']))
self.tmp[4] = (np.float32(self.img_size) / np.float32(w))*(np.float32(ann['rt']['x']))
self.tmp[6] = (np.float32(self.img_size) / np.float32(w))*(np.float32(ann['rt']['y']))
self.tmp[1] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['rb']['x']))
self.tmp[3] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['rb']['y']))
self.tmp[5] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['lb']['x']))
self.tmp[7] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['lb']['y']))
#self.corners.append(self.tmp)
return img_filepath,img,self.tmp
def next_batch(self):
while True:
X_data = np.ones([self.batch_size, self.img_size, self.img_size, 3])
Y_data = np.ones([self.batch_size, 8]) # 初始化label
filenames = []
loss_out = np.zeros((self.batch_size, 1))
for i in range(self.batch_size):
filename, img, corner = self.next_sample()
X_data[i] = img # --------- 图片赋值 ------------
Y_data[i] = corner
filenames.append(filename)
inputs = {
'the_input': X_data,
'y_true': Y_data,
'filenames': filenames
}
outputs = {'loss_out': loss_out}
yield (inputs, outputs) #利用生成器,相当于一个步进器,每次见到yileld停止,下次从此处步进
#---------------------迭代一个batch,给出一些基本说明-----------------------
tiger = ImageGenerator(imagepath, image_size, 1)
for inp, out in tiger.next_batch(): #打印出训练的图片大小和标签
print('Text generator output (data which will be fed into the neutral network):')
print('1) the_input (image)', inp['the_input'][0].shape)
print('2) the_labels is {0}'.format(inp['y_true'][0]))
break
#--------------------------------------------------------------------------
# def lambda_loss_func(args): #定义损失函数
# y_pred, y_true = args
# return K.mean(K.square(y_pred - y_true))
def lambda_loss_func(args):
y_pred, y_true = args
abs_loss = tf.abs(y_true - y_pred)
sq_loss = 0.5 * (y_true - y_pred)**2
l1_loss = tf.where(tf.less(abs_loss, 1.0), sq_loss, abs_loss - 0.5)
return tf.reduce_sum(l1_loss, -1)
def train(resume=False):
saved_checkpoint_path = join(output_model_path, "weights.{epoch:02d}-{val_loss:.2f}.hdf5")
load_checkpoint_path = join(output_model_path, load_weight_filename)
# Input Parameters
# Network parameters
if K.image_data_format() == 'channels_first': #调整图片维度数据
input_shape = (3, image_size, image_size)
else:
input_shape = (image_size, image_size, 3)
batch_size = 64
tiger_train = ImageGenerator(imagepath, image_size, batch_size)
tiger_val = ImageGenerator(imagepath, image_size, batch_size,is_train=False)
# input_data = Input(name='the_input', shape=input_shape, dtype='float32')
#
# densenet121 = DenseNet121(input_tensor=input_data, weights='imagenet', include_top=False)
# inner = Flatten()(densenet121.output)
# inner=Conv2D(32, (3, 3), activation='relu', padding='same', name='block1_conv1')(inner)
# inner = Dense(512)(inner)
# inner = Dense(64)(inner)
# y_pred = Dense(8, name='y_pred')(inner)
input_data = Input(name='the_input', shape=input_shape, dtype='float32')
nasnetmobile = NASNetMobile(input_tensor=input_data, weights='imagenet', include_top=False)
inner = GlobalAveragePooling2D()(nasnetmobile.output)
inner = Dense(128)(inner)
y_pred = Dense(8, name='y_pred')(inner)
#自己编写的模型
# input_data,y_pred=create_dense_net(input_shape)
Model(inputs=input_data, outputs=y_pred).summary() #summary打印模型概述
y_true = Input(name='y_true', shape=[8], dtype='float32')
loss_out = Lambda(lambda_loss_func, output_shape=(1,), name='loss_out')([y_pred, y_true])
# clipnorm seems to speeds up convergence
#sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True, clipnorm=5)
model = Model(inputs=[input_data, y_true], outputs=loss_out)
if(gpu_count > 1):
parallel_model = multi_gpu_model(model, gpus=gpu_count)
elif(gpu_count == 1):
parallel_model = model
if resume:
parallel_model.load_weights(load_checkpoint_path) #Model类函数,加载HDM5文件
optimizer = Adam(lr=10.0,decay=0.1)
parallel_model.compile(loss={'loss_out': lambda y_true, y_pred: y_pred}, optimizer=optimizer) #配置训练模型
early_stopping = EarlyStopping(monitor='val_loss', patience=20, verbose=1, mode='min') #patience没有进步的训练轮数,在这之后训练就会被停止
checkpoint = ModelCheckpoint(saved_checkpoint_path, monitor='val_loss', save_best_only=True, mode='min', save_weights_only=True)
#每个训练期之后保存模型
parallel_model.fit_generator(generator=tiger_train.next_batch(), #next_batch返回输入和输出
steps_per_epoch=tiger_train.n,
epochs=90,
callbacks=[checkpoint],
validation_data=tiger_val.next_batch(),
validation_steps=tiger_val.n)
#Python 生成器逐批生成的数据,按批次训练模型。
return model
if not load_model_flag: #选择训练模式还是测试模式,load_model_flag=False为训练模式
model = train(resume=load_weight_flag)#是否加载预训练权重 ,load_weight_flag=True 为加载预训练权重
model.save(join(output_model_path, model_filename)) #保存模型和权重
else:
# -------- test --------
# load from a pre-trained model
model = load_model(join(output_model_path, model_filename), compile=False)
tiger_test = ImageGenerator(imagepath, image_size, 1)
#tiger_test.build_data()
valid_samples_count = tiger_test.n
sample_count = 0
print("valid samples: ", valid_samples_count)
net_inp = model.get_layer(name='the_input').input #根据索引值查找网络层
net_out = model.get_layer(name='dense_1').output
#net_out = model.get_layer(name='y_pred').output
for inp_value, _ in tiger_test.next_batch():
bs = inp_value['the_input'].shape[0]
X_data = inp_value['the_input']
pred_corners = sess.run(net_out, feed_dict={net_inp:X_data}) #从模型中输出预测的角点数据
labels = inp_value['y_true'] #真实的角点数据
filenames = inp_value['filenames']
for i in range(1):
sample_count += 1
print("filename:", filenames[i])
print("predict :", pred_corners[i])
print("true :", labels[i])
img = cv2.imread(filenames[i])
h, w, channel = img.shape
pred_corners[i][0] *= np.float32(w) / np.float32(image_size)
pred_corners[i][2] *= np.float32(w) / np.float32(image_size)
pred_corners[i][4] *= np.float32(w) / np.float32(image_size)
pred_corners[i][6] *= np.float32(w) / np.float32(image_size)
pred_corners[i][1] *= np.float32(h) / np.float32(image_size)
pred_corners[i][3] *= np.float32(h) / np.float32(image_size)
pred_corners[i][5] *= np.float32(h) / np.float32(image_size)
pred_corners[i][7] *= np.float32(h) / np.float32(image_size)
labels[i][0] *= np.float32(w) / np.float32(image_size)
labels[i][2] *= np.float32(w) / np.float32(image_size)
labels[i][4] *= np.float32(w) / np.float32(image_size)
labels[i][6] *= np.float32(w) / np.float32(image_size)
labels[i][1] *= np.float32(h) / np.float32(image_size)
labels[i][3] *= np.float32(h) / np.float32(image_size)
labels[i][5] *= np.float32(h) / np.float32(image_size)
labels[i][7] *= np.float32(h) / np.float32(image_size)
for idx in range(4): #图片上绘图
cv2.circle(img, (int(labels[i][2*idx]),int(labels[i][2*idx+1])), 3, (255,255,0), -1)
cv2.circle(img, (int(pred_corners[i][2*idx]), int(pred_corners[i][2*idx+1])), 3, (255,0,0), -1)
cv2.imwrite(os.path.join(os.path.split(filenames[i])[0], "predict_"+os.path.split(filenames[i])[1]), img)
if sample_count == valid_samples_count:
break
本质:
1、建立一个空的[batch ,img] 的数组,然后一张一张图片和其标签进行预处理(resize、/255、astype)等
2、将处理完成图片按照循环(Batch)放入空数组中
3、生成迭代器,满足fit_generation API)
fit_generation API:此方法节省内存,但会增加训练耗时
2、从内存中读取(fit_generator版)
#-*- coding: utf-8 -*-
import keras
import tensorflow as tf
print('TensorFlow version:', tf.__version__)
print('Keras version:', keras.__version__)
from keras.layers import GlobalAveragePooling2D
#from numpy.random import seed
#seed(1)
#from tensorflow import set_random_seed
#set_random_seed(2)
#from densenet_fast2 import create_dense_net
from keras.applications.nasnet import NASNetMobile
import os
from os.path import join
import json
# import random
# import itertools
# import re
# import datetime
# import cairocffi as cairo
#import editdistance
import numpy as np
from scipy import ndimage
#import pylab
#import matplotlib.pyplot as plt
#import matplotlib.gridspec as gridspec
from keras import backend as K
from keras import regularizers
from keras.layers.convolutional import Conv2D, MaxPooling2D, ZeroPadding2D
from keras.layers.wrappers import TimeDistributed, Bidirectional
from keras.layers import Input, Dense, Activation, Dropout, Permute, Flatten
from keras.layers import Reshape, Lambda
from keras.layers.normalization import BatchNormalization
from keras.layers.merge import add, concatenate
from keras.models import Model, load_model
from keras.layers.recurrent import GRU, LSTM
from keras.optimizers import SGD, Adam, Adadelta
from keras.utils.data_utils import get_file
from keras.preprocessing import image
from keras.callbacks import EarlyStopping, ModelCheckpoint
from keras.utils import multi_gpu_model
#from keras.applications.vgg16 import VGG16
from keras.applications.densenet import DenseNet121
import cv2
import logging
from collections import Counter
import codecs
from densenet_fast import create_dense_net
from keras.regularizers import l2
# -------------------------config section-------------------------
# 身份证四点回归
imagepath = "/workdir/data/"
#imagenamelist = "/workdir/data/filelist"
#imgfilename = {"front": "front.jpg", "back": "back.jpg"}
#jsonfilename = {"front": "cardbound_front.json", "back": "cardbound_back.json"}
gpu_count = 1
gpu_list = "1" #指定使用的GPU列表
image_size = 224 # image_size = image_h = image_w ,输入图片的大小
output_model_path = "/workdir/model_chenyu/"
load_weight_filename = " weights.47-18358.13.hdf5"
load_weight_flag = False #True 为加载预训练权重
model_filename = "idcard_corner.h5"
load_model_flag = False #load_model_flag=False为训练模式
# -------------------------config section-------------------------
logging.basicConfig(filename=join(output_model_path, "result.log"), level=logging.INFO) #日志基础配置(记录日志)
config = tf.ConfigProto()
config.allow_soft_placement = True #如果你指定的设备不存在,允许TF自动分配设备
config.gpu_options.allow_growth = True #GPU显存根据需求增长
config.gpu_options.visible_device_list = gpu_list #设置了tensorflow可见的GPU编号
sess = tf.Session(config=config)
K.set_session(sess) # 开启后端的模块中的函数
# ----------------------------------------------------------------------------
# imagenames = []
# with open(imagenamelist, 'r') as f: #将txt中的label加入列表中
# for line in f:
# line = line.strip()
# imagenames.append(line)
# ---- 数据生成器 ----
class ImageGenerator:
def __init__(self,
dirpath,
img_size,
batch_size
, is_train=True
):
self.img_size = img_size
self.batch_size = batch_size
self.corners = []
self.dirpath=dirpath
self.tmp=[0]*8
# import pdb
# pdb.set_trace()
if is_train:
with open(os.path.join(self.dirpath, 'train.txt'), 'r') as f_val: #txt文档
self.f_list = f_val.readlines()
else:
with open(os.path.join(self.dirpath, 'test.txt'), 'r') as f_train:
self.f_list = f_train.readlines()
self.n=len(self.f_list)//self.batch_size
# import pdb
# pdb.set_trace()
# self.tmp=[]
def next_sample(self):
# load img and img anno
random_img = np.random.choice(self.f_list)
img_filename = str(random_img).strip()
# load img and img anno
img_filepath = join(self.dirpath,'img', img_filename+'.jpg') #遍历照片
#print(img_filepath)
img = cv2.imread(img_filepath)
h, w, channel = img.shape
#img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
img = cv2.resize(img, (self.img_size, self.img_size), interpolation=cv2.INTER_CUBIC) # ---- resize ---
img = img.astype(np.float32) # ------------ uint8 转换成 float32 -------
img /= 255 # ---- img 的每一个像素都除以255, 归一化 -------
json_filepath = join(self.dirpath,'label',img_filename+'.json') #遍历json文件
ann = json.load(codecs.open(json_filepath, 'r', encoding='utf-8')) #codecs.open读入json直接解码
#idcardcorners = np.float32([ann['lt']['x'], ann['lt']['y'], ann['rt']['x'], ann['rt']['y'], ann['rb']['x'], ann['rb']['y'], ann['lb']['x'], ann['lb']['y']])
#self.tmp.append(idcardcorners)
self.tmp[0] =(np.float32(self.img_size) / np.float32(w))*(np.float32(ann['lt']['x']))
self.tmp[2] = (np.float32(self.img_size) / np.float32(w))*(np.float32(ann['lt']['y']))
self.tmp[4] = (np.float32(self.img_size) / np.float32(w))*(np.float32(ann['rt']['x']))
self.tmp[6] = (np.float32(self.img_size) / np.float32(w))*(np.float32(ann['rt']['y']))
self.tmp[1] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['rb']['x']))
self.tmp[3] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['rb']['y']))
self.tmp[5] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['lb']['x']))
self.tmp[7] = (np.float32(self.img_size) / np.float32(h))*(np.float32(ann['lb']['y']))
#self.corners.append(self.tmp)
return img_filepath,img,self.tmp
def next_batch(self):
while True:
X_data = np.ones([self.batch_size, self.img_size, self.img_size, 3])
Y_data = np.ones([self.batch_size, 8]) # 初始化label
filenames = []
loss_out = np.zeros((self.batch_size, 1))
for i in range(self.batch_size):
filename, img, corner = self.next_sample()
X_data[i] = img # --------- 图片赋值 ------------
Y_data[i] = corner
filenames.append(filename)
inputs = {
'the_input': X_data,
'y_true': Y_data,
'filenames': filenames
}
outputs = {'loss_out': loss_out}
yield (inputs, outputs) #利用生成器,相当于一个步进器,每次见到yileld停止,下次从此处步进
#---------------------迭代一个batch,给出一些基本说明-----------------------
tiger = ImageGenerator(imagepath, image_size, 1)
for inp, out in tiger.next_batch(): #打印出训练的图片大小和标签
print('Text generator output (data which will be fed into the neutral network):')
print('1) the_input (image)', inp['the_input'][0].shape)
print('2) the_labels is {0}'.format(inp['y_true'][0]))
break
#--------------------------------------------------------------------------
# def lambda_loss_func(args): #定义损失函数
# y_pred, y_true = args
# return K.mean(K.square(y_pred - y_true))
def lambda_loss_func(args):
y_pred, y_true = args
abs_loss = tf.abs(y_true - y_pred)
sq_loss = 0.5 * (y_true - y_pred)**2
l1_loss = tf.where(tf.less(abs_loss, 1.0), sq_loss, abs_loss - 0.5)
return tf.reduce_sum(l1_loss, -1)
def train(resume=False):
saved_checkpoint_path = join(output_model_path, "weights.{epoch:02d}-{val_loss:.2f}.hdf5")
load_checkpoint_path = join(output_model_path, load_weight_filename)
# Input Parameters
# Network parameters
if K.image_data_format() == 'channels_first': #调整图片维度数据
input_shape = (3, image_size, image_size)
else:
input_shape = (image_size, image_size, 3)
batch_size = 64
tiger_train = ImageGenerator(imagepath, image_size, batch_size)
tiger_val = ImageGenerator(imagepath, image_size, batch_size,is_train=False)
# input_data = Input(name='the_input', shape=input_shape, dtype='float32')
#
# densenet121 = DenseNet121(input_tensor=input_data, weights='imagenet', include_top=False)
# inner = Flatten()(densenet121.output)
# inner=Conv2D(32, (3, 3), activation='relu', padding='same', name='block1_conv1')(inner)
# inner = Dense(512)(inner)
# inner = Dense(64)(inner)
# y_pred = Dense(8, name='y_pred')(inner)
# input_data = Input(name='the_input', shape=input_shape, dtype='float32')
# nasnetmobile = NASNetMobile(input_tensor=input_data, weights='imagenet', include_top=False)
# inner = GlobalAveragePooling2D()(nasnetmobile.output)
# inner = Dense(128)(inner)
# y_pred = Dense(8, name='y_pred')(inner)
#自己编写的模型
input_data,y_pred=create_dense_net(input_shape)
Model(inputs=input_data, outputs=y_pred).summary() #summary打印模型概述
y_true = Input(name='y_true', shape=[8], dtype='float32')
loss_out = Lambda(lambda_loss_func, output_shape=(1,), name='loss_out')([y_pred, y_true])
# clipnorm seems to speeds up convergence
#sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True, clipnorm=5)
model = Model(inputs=[input_data, y_true], outputs=loss_out)
if(gpu_count > 1):
parallel_model = multi_gpu_model(model, gpus=gpu_count)
elif(gpu_count == 1):
parallel_model = model
if resume:
parallel_model.load_weights(load_checkpoint_path) #Model类函数,加载HDM5文件
optimizer = Adam(lr=1.0)
parallel_model.compile(loss={'loss_out': lambda y_true, y_pred: y_pred}, optimizer=optimizer) #配置训练模型
early_stopping = EarlyStopping(monitor='val_loss', patience=20, verbose=1, mode='min') #patience没有进步的训练轮数,在这之后训练就会被停止
checkpoint = ModelCheckpoint(saved_checkpoint_path, monitor='val_loss', save_best_only=True, mode='min', save_weights_only=True)
#每个训练期之后保存模型
parallel_model.fit_generator(generator=tiger_train.next_batch(), #next_batch返回输入和输出
steps_per_epoch=tiger_train.n,
epochs=60,
callbacks=[early_stopping, checkpoint],
validation_data=tiger_val.next_batch(),
validation_steps=tiger_val.n,
verbose=1)
#Python 生成器逐批生成的数据,按批次训练模型。
return model
if not load_model_flag: #选择训练模式还是测试模式,load_model_flag=False为训练模式
model = train(resume=load_weight_flag)#是否加载预训练权重 ,load_weight_flag=True 为加载预训练权重
model.save(join(output_model_path, model_filename)) #保存模型和权重
else:
# -------- test --------
# load from a pre-trained model
model = load_model(join(output_model_path, model_filename), compile=False)
tiger_test = ImageGenerator(imagepath, image_size, 1)
#tiger_test.build_data()
valid_samples_count = tiger_test.n
sample_count = 0
print("valid samples: ", valid_samples_count)
net_inp = model.get_layer(name='the_input').input #根据索引值查找网络层
net_out = model.get_layer(name='dense_1').output
#net_out = model.get_layer(name='y_pred').output
for inp_value, _ in tiger_test.next_batch():
bs = inp_value['the_input'].shape[0]
X_data = inp_value['the_input']
pred_corners = sess.run(net_out, feed_dict={net_inp:X_data}) #从模型中输出预测的角点数据
labels = inp_value['y_true'] #真实的角点数据
filenames = inp_value['filenames']
for i in range(1):
sample_count += 1
print("filename:", filenames[i])
print("predict :", pred_corners[i])
print("true :", labels[i])
img = cv2.imread(filenames[i])
h, w, channel = img.shape
pred_corners[i][0] *= np.float32(w) / np.float32(image_size)
pred_corners[i][2] *= np.float32(w) / np.float32(image_size)
pred_corners[i][4] *= np.float32(w) / np.float32(image_size)
pred_corners[i][6] *= np.float32(w) / np.float32(image_size)
pred_corners[i][1] *= np.float32(h) / np.float32(image_size)
pred_corners[i][3] *= np.float32(h) / np.float32(image_size)
pred_corners[i][5] *= np.float32(h) / np.float32(image_size)
pred_corners[i][7] *= np.float32(h) / np.float32(image_size)
labels[i][0] *= np.float32(w) / np.float32(image_size)
labels[i][2] *= np.float32(w) / np.float32(image_size)
labels[i][4] *= np.float32(w) / np.float32(image_size)
labels[i][6] *= np.float32(w) / np.float32(image_size)
labels[i][1] *= np.float32(h) / np.float32(image_size)
labels[i][3] *= np.float32(h) / np.float32(image_size)
labels[i][5] *= np.float32(h) / np.float32(image_size)
labels[i][7] *= np.float32(h) / np.float32(image_size)
for idx in range(4): #图片上绘图
cv2.circle(img, (int(labels[i][2*idx]),int(labels[i][2*idx+1])), 3, (255,255,0), -1)
cv2.circle(img, (int(pred_corners[i][2*idx]), int(pred_corners[i][2*idx+1])), 3, (255,0,0), -1)
cv2.imwrite(os.path.join(os.path.split(filenames[i])[0], "predict_"+os.path.split(filenames[i])[1]), img)
if sample_count == valid_samples_count:
break
本质:
1、利用列表将数据一次性加载
2、每次训练从列表中直接记载数据
二、图片增强
import numpy as np
from random import shuffle
from .preprocessor import preprocess_input
from .preprocessor import _imread as imread
from .preprocessor import _imresize as imresize
from .preprocessor import to_categorical
import scipy.ndimage as ndi
import cv2
class ImageGenerator(object):
""" Image generator with saturation, brightness, lighting, contrast,
horizontal flip and vertical flip transformations. It supports
bounding boxes coordinates.
TODO:
- Finish support for not using bounding_boxes
- Random crop
- Test other transformations
"""
def __init__(self, ground_truth_data, batch_size, image_size,
train_keys, validation_keys,
ground_truth_transformer=None,
path_prefix=None,
saturation_var=0.5,
brightness_var=0.5,
contrast_var=0.5,
lighting_std=0.5,
horizontal_flip_probability=0.5,
vertical_flip_probability=0.5,
do_random_crop=False,
grayscale=False,
zoom_range=[0.75, 1.25],
translation_factor=.3):
self.ground_truth_data = ground_truth_data
self.ground_truth_transformer = ground_truth_transformer
self.batch_size = batch_size
self.path_prefix = path_prefix
self.train_keys = train_keys
self.validation_keys = validation_keys
self.image_size = image_size
self.grayscale = grayscale
self.color_jitter = []
if saturation_var:
self.saturation_var = saturation_var
self.color_jitter.append(self.saturation)
if brightness_var:
self.brightness_var = brightness_var
self.color_jitter.append(self.brightness)
if contrast_var:
self.contrast_var = contrast_var
self.color_jitter.append(self.contrast)
self.lighting_std = lighting_std
self.horizontal_flip_probability = horizontal_flip_probability
self.vertical_flip_probability = vertical_flip_probability
self.do_random_crop = do_random_crop
self.zoom_range = zoom_range
self.translation_factor = translation_factor
def _do_random_crop(self, image_array):
"""IMPORTANT: random crop only works for classification since the
current implementation does no transform bounding boxes"""
height = image_array.shape[0]
width = image_array.shape[1]
x_offset = np.random.uniform(0, self.translation_factor * width)
y_offset = np.random.uniform(0, self.translation_factor * height)
offset = np.array([x_offset, y_offset])
scale_factor = np.random.uniform(self.zoom_range[0],
self.zoom_range[1])
crop_matrix = np.array([[scale_factor, 0],
[0, scale_factor]])
image_array = np.rollaxis(image_array, axis=-1, start=0)
image_channel = [ndi.interpolation.affine_transform(image_channel,
crop_matrix, offset=offset, order=0, mode='nearest',
cval=0.0) for image_channel in image_array]
image_array = np.stack(image_channel, axis=0)
image_array = np.rollaxis(image_array, 0, 3)
return image_array
def do_random_rotation(self, image_array):
"""IMPORTANT: random rotation only works for classification since the
current implementation does no transform bounding boxes"""
height = image_array.shape[0]
width = image_array.shape[1]
x_offset = np.random.uniform(0, self.translation_factor * width)
y_offset = np.random.uniform(0, self.translation_factor * height)
offset = np.array([x_offset, y_offset])
scale_factor = np.random.uniform(self.zoom_range[0],
self.zoom_range[1])
crop_matrix = np.array([[scale_factor, 0],
[0, scale_factor]])
image_array = np.rollaxis(image_array, axis=-1, start=0)
image_channel = [ndi.interpolation.affine_transform(image_channel,
crop_matrix, offset=offset, order=0, mode='nearest',
cval=0.0) for image_channel in image_array]
image_array = np.stack(image_channel, axis=0)
image_array = np.rollaxis(image_array, 0, 3)
return image_array
def _gray_scale(self, image_array):
return image_array.dot([0.299, 0.587, 0.114])
def saturation(self, image_array):
gray_scale = self._gray_scale(image_array)
alpha = 2.0 * np.random.random() * self.brightness_var
alpha = alpha + 1 - self.saturation_var
image_array = (alpha * image_array + (1 - alpha) *
gray_scale[:, :, None])
return np.clip(image_array, 0, 255)
def brightness(self, image_array):
alpha = 2 * np.random.random() * self.brightness_var
alpha = alpha + 1 - self.saturation_var
image_array = alpha * image_array
return np.clip(image_array, 0, 255)
def contrast(self, image_array):
gray_scale = (self._gray_scale(image_array).mean() *
np.ones_like(image_array))
alpha = 2 * np.random.random() * self.contrast_var
alpha = alpha + 1 - self.contrast_var
image_array = image_array * alpha + (1 - alpha) * gray_scale
return np.clip(image_array, 0, 255)
def lighting(self, image_array):
covariance_matrix = np.cov(image_array.reshape(-1, 3) /
255.0, rowvar=False)
eigen_values, eigen_vectors = np.linalg.eigh(covariance_matrix)
noise = np.random.randn(3) * self.lighting_std
noise = eigen_vectors.dot(eigen_values * noise) * 255
image_array = image_array + noise
return np.clip(image_array, 0, 255)
def horizontal_flip(self, image_array, box_corners=None):
if np.random.random() < self.horizontal_flip_probability:
image_array = image_array[:, ::-1]
if box_corners is not None:
box_corners[:, [0, 2]] = 1 - box_corners[:, [2, 0]]
return image_array, box_corners
def vertical_flip(self, image_array, box_corners=None):
if (np.random.random() < self.vertical_flip_probability):
image_array = image_array[::-1]
if box_corners is not None:
box_corners[:, [1, 3]] = 1 - box_corners[:, [3, 1]]
return image_array, box_corners
def transform(self, image_array, box_corners=None):
shuffle(self.color_jitter)
for jitter in self.color_jitter:
image_array = jitter(image_array)
if self.lighting_std:
image_array = self.lighting(image_array)
if self.horizontal_flip_probability > 0:
image_array, box_corners = self.horizontal_flip(image_array,
box_corners)
if self.vertical_flip_probability > 0:
image_array, box_corners = self.vertical_flip(image_array,
box_corners)
return image_array, box_corners
def preprocess_images(self, image_array):
return preprocess_input(image_array)
def flow(self, mode='train'):
while True:
if mode == 'train':
shuffle(self.train_keys)
keys = self.train_keys
elif mode == 'val' or mode == 'demo':
shuffle(self.validation_keys)
keys = self.validation_keys
else:
raise Exception('invalid mode: %s' % mode)
inputs = []
targets = []
for key in keys:
image_path = self.path_prefix + key
image_array = imread(image_path)
image_array = imresize(image_array, self.image_size)
num_image_channels = len(image_array.shape)
if num_image_channels != 3:
continue
ground_truth = self.ground_truth_data[key]
if self.do_random_crop:
image_array = self._do_random_crop(image_array)
image_array = image_array.astype('float32')
if mode == 'train' or mode == 'demo':
if self.ground_truth_transformer is not None:
image_array, ground_truth = self.transform(
image_array,
ground_truth)
ground_truth = (
self.ground_truth_transformer.assign_boxes(
ground_truth))
else:
image_array = self.transform(image_array)[0]
if self.grayscale:
image_array = cv2.cvtColor(
image_array.astype('uint8'),
cv2.COLOR_RGB2GRAY).astype('float32')
image_array = np.expand_dims(image_array, -1)
inputs.append(image_array)
targets.append(ground_truth)
if len(targets) == self.batch_size:
inputs = np.asarray(inputs)
targets = np.asarray(targets)
# this will not work for boxes
targets = to_categorical(targets)
if mode == 'train' or mode == 'val':
inputs = self.preprocess_images(inputs)
yield self._wrap_in_dictionary(inputs, targets)
if mode == 'demo':
yield self._wrap_in_dictionary(inputs, targets)
inputs = []
targets = []
def _wrap_in_dictionary(self, image_array, targets):
return [{'input_1': image_array},
{'predictions': targets}]
三、分类任务
1、fit 函数 版本:其API从内存中读取数据
要求:
1).npy格式加载数据(图片+标签)生成
import cv2 import os import numpy as np files = os.listdir() final_dataset = [] for image in files: im = cv2.imread(image) flattened_im_list = list(im.flatten()) final_dataset.append(flattened_im_list) final_dataset = np.array(final_dataset) np.save("my_dataset.npy", final_dataset)
2)数据送入模型
def load_data():
# load the dataset (it's a normal numpy array, samples x features (features = channel x width x height))
dataset = np.load("dataset.npy")
# load the targets (they must be one hot encoded; np.array([1,0]) rather than 1
targets = np.load("targets.npy")
# select training and validation / test
x_train = src_dataset[:-1, :, :]
y_train = targets[:-1, :, :]
x_test = src_dataset[-1:, :, :]
y_test = targets[-1:, : , :]
y_train = np.reshape(y_train, (len(y_train), 1))
y_test = np.reshape(y_test, (len(y_test), 1))
return (x_train, y_train), (x_test, y_test)
3) 汇总1)和2):
def load_data(img_rows, img_cols):
num_classes = 1
img1=cv2.resize(cv2.imread('images/vehicle/image0451.png'), (img_rows, img_cols)).astype(np.float32)
img2=cv2.resize(cv2.imread('images/vehicle/image0452.png'), (img_rows, img_cols)).astype(np.float32)
img3=cv2.resize(cv2.imread('images/vehicle/image0453.png'), (img_rows, img_cols)).astype(np.float32)
for x in (img1,img2,img3):
x[:, :, 0] -= 103.939
x[:, :, 1] -= 116.779
x[:, :, 2] -= 123.68
X_train = np.array([img1,img2])
X_valid = np.array([img3])
Y_train = np.array([[0],[0]])
Y_valid = np.array([[0]])
# Transform targets to keras compatible format
Y_train = np_utils.to_categorical(Y_train, num_classes)
Y_valid = np_utils.to_categorical(Y_valid, num_classes)
return X_train, Y_train, X_valid, Y_valid
2、fit_generatpr版本:
#coding=utf-8 ''' Created on 2018-7-10 ''' import keras import math import os import cv2 import numpy as np from keras.models import Sequential from keras.layers import Dense class DataGenerator(keras.utils.Sequence): def __init__(self, datas, batch_size=1, shuffle=True): self.batch_size = batch_size self.datas = datas self.indexes = np.arange(len(self.datas)) self.shuffle = shuffle def __len__(self): #计算每一个epoch的迭代次数 return math.ceil(len(self.datas) / float(self.batch_size)) def __getitem__(self, index): #生成每个batch数据,这里就根据自己对数据的读取方式进行发挥了 # 生成batch_size个索引 batch_indexs = self.indexes[index*self.batch_size:(index+1)*self.batch_size] # 根据索引获取datas集合中的数据 batch_datas = [self.datas[k] for k in batch_indexs] # 生成数据 X, y = self.data_generation(batch_datas) return X, y def on_epoch_end(self): #在每一次epoch结束是否需要进行一次随机,重新随机一下index if self.shuffle == True: np.random.shuffle(self.indexes) def data_generation(self, batch_datas): images = [] labels = [] # 生成数据 for i, data in enumerate(batch_datas): #x_train数据 image = cv2.imread(data) image = list(image) images.append(image) #y_train数据 right = data.rfind("\\",0) left = data.rfind("\\",0,right)+1 class_name = data[left:right] if class_name=="dog": labels.append([0,1]) else: labels.append([1,0]) #如果为多输出模型,Y的格式要变一下,外层list格式包裹numpy格式是list[numpy_out1,numpy_out2,numpy_out3] return np.array(images), np.array(labels) # 读取样本名称,然后根据样本名称去读取数据 class_num = 0 train_datas = [] for file in os.listdir("D:/xxx"): file_path = os.path.join("D:/xxx", file) if os.path.isdir(file_path): class_num = class_num + 1 for sub_file in os.listdir(file_path): train_datas.append(os.path.join(file_path, sub_file)) # 数据生成器 training_generator = DataGenerator(train_datas) #构建网络 model = Sequential() model.add(Dense(units=64, activation='relu', input_dim=784)) model.add(Dense(units=2, activation='softmax')) model.compile(loss='categorical_crossentropy', optimizer='sgd', metrics=['accuracy']) model.compile(optimizer='sgd', loss='categorical_crossentropy', metrics=['accuracy']) model.fit_generator(training_generator, epochs=50,max_queue_size=10,workers=1)