# -- encoding:utf-8 --
import config
from data_utils import check_directory
import os
import pickle
import numpy as np
from scipy import io
import tensorflow as tf
from tensorflow.contrib import slim
from tensorflow.python.ops import init_ops
from tensorflow.python.ops import nn_ops
from tensorflow.contrib.layers.python.layers import initializers
from sklearn.externals import joblib
from sklearn.svm import SVC
from sklearn import metrics
class AlexNet(object):
def __init__(self, alexnet_mat_file_path=None, is_training=True, is_svm=False, is_regression=False):
"""
:param is_training: 用于dropout的参数,训练时为真,预测时为假。
:param is_svm:
:param is_regression:
"""
self.image_height = config.IMAGE_HEIGHT
self.image_width = config.IMAGE_WIDTH
self.image_channel = config.IMAGE_CHANNEL
self.class_number = config.CLASS_NUMBER
if alexnet_mat_file_path is not None and os.path.exists(alexnet_mat_file_path):
self.load_train_weight = self.__reload_weights_and_biases_by_mat(alexnet_mat_file_path)
print("加载AlexNet的预训练网络!!")
else:
self.load_train_weight = False
# 代表模型参数是否被训练(有AlexNet权重加载下:为False;未有加载,为True)
self.train_weight = not self.load_train_weight
self.input_data = tf.placeholder(tf.float32, [None, self.image_height, self.image_width, self.image_channel],
name='input')
if is_regression and is_svm and not is_training:
self.svm_logits, self.regression_logits = self.__build_network(input=self.input_data,
output_dims=self.class_number,
is_training=is_training,
is_svm=is_svm,
is_regression=is_regression)
else:
self.logits = self.__build_network(input=self.input_data,
output_dims=self.class_number,
is_training=is_training,
is_svm=is_svm,
is_regression=is_regression)
if is_training:
# 获取这个损失函数
self.label = tf.placeholder(tf.float32, [None, self.class_number], name='label')
# 构造损失函数
self.__loss_layer(y_pred=self.logits, y_true=self.label)
# 获取所有的损失函数
self.total_loss = tf.losses.get_total_loss()
tf.summary.scalar('total_loss', self.total_loss)
# 获取准确率
self.accuracy = self.__get_accuracy(y_pred=self.logits, y_true=self.label)
def __build_network(self, input, output_dims, is_svm=False, scope='R-CNN',
is_training=True, keep_prob=0.5, is_regression=False):
"""
:param input:
:param output_dims:
:param is_svm:
:param scope:
:param is_training: dropout的参数,True代表训练,False为预测。
:param keep_prob:
:param is_regression:
:return:
"""
with tf.variable_scope(scope):
# slim.arg_scope: 功能,对于某些API中(with语句块内部如果使用到这个API,才会生效),设置一个模型的输入参数(默认)
conv_weights_regularizer = slim.l2_regularizer(0.005) if self.train_weight else None
with slim.arg_scope([slim.conv2d, slim.fully_connected],
activation_fn=nn_ops.relu,
weights_initializer=tf.truncated_normal_initializer(0.0, 0.001),
weights_regularizer=slim.l2_regularizer(0.005)):
with slim.arg_scope([slim.conv2d], weights_regularizer=conv_weights_regularizer):
with tf.variable_scope("Layer1"):
# 参数从左到右,分别:卷积输入的tensor对象、输出的通道数(卷积核数目)、窗口大小、窗口滑动步长大小、padding填充方式(卷积中默认为SAME)
net = slim.conv2d(input, 96, 11, 4, 'SAME', scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv1"),
biases_initializer=self.__get_biases_initializer("conv1"))
net = tf.nn.lrn(net, depth_radius=5, bias=1.0, alpha=0.0001, beta=0.75, name='lrn')
# 参数从左到右,分别:池化输入的tensor对象、窗口大小、窗口滑动步长大小、padding填充方式(池化中默认为VALID)
net = slim.max_pool2d(net, 3, 2, 'VALID', scope='pool')
with tf.variable_scope("Layer2"):
# 对96个通道的Image Tensor对象进行拆分为48 + 48通道的两个Net,分别做卷积操作
net1, net2 = tf.split(net, num_or_size_splits=2, axis=3, name='split')
# 分别做卷积+LRN+池化操作
with tf.variable_scope("branch1"):
net1 = slim.conv2d(net1, 128, 5, 1, scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv2_1"),
biases_initializer=self.__get_biases_initializer("conv2_1"))
net1 = tf.nn.lrn(net1, depth_radius=5, bias=1.0, alpha=0.0001, beta=0.75, name='lrn')
net1 = slim.max_pool2d(net1, 3, 2, 'VALID', scope='pool')
with tf.variable_scope("branch2"):
net2 = slim.conv2d(net2, 128, 5, 1, scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv2_2"),
biases_initializer=self.__get_biases_initializer("conv2_2"))
net2 = tf.nn.lrn(net2, depth_radius=5, bias=1.0, alpha=0.0001, beta=0.75, name='lrn')
net2 = slim.max_pool2d(net2, 3, 2, 'VALID', scope='pool')
# 合并两个分支的输出
net = tf.concat([net1, net2], axis=3, name='concat')
with tf.variable_scope("Layer3"):
net = slim.conv2d(net, 384, 3, 1, scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv3"),
biases_initializer=self.__get_biases_initializer("conv3"))
with tf.variable_scope("Layer4"):
# 对384个通道的Image Tensor对象进行拆分为192 + 192通道的两个Net,分别做卷积操作
net1, net2 = tf.split(net, num_or_size_splits=2, axis=3, name='split')
# 分别做卷积操作
with tf.variable_scope("branch1"):
net1 = slim.conv2d(net1, 192, 3, 1, scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv4_1"),
biases_initializer=self.__get_biases_initializer("conv4_1"))
with tf.variable_scope("branch2"):
net2 = slim.conv2d(net2, 192, 3, 1, scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv4_2"),
biases_initializer=self.__get_biases_initializer("conv4_2"))
with tf.variable_scope("Layer5"):
# 分别做卷积操作
with tf.variable_scope("branch1"):
net1 = slim.conv2d(net1, 128, 3, 1, scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv5_1"),
biases_initializer=self.__get_biases_initializer("conv5_1"))
with tf.variable_scope("branch2"):
net2 = slim.conv2d(net2, 128, 3, 1, scope='conv',
trainable=self.train_weight,
weights_initializer=self.__get_weights_initializer("conv5_2"),
biases_initializer=self.__get_biases_initializer("conv5_2"))
# 合并两个卷积的结果
net = tf.concat([net1, net2], axis=3, name='concat')
# 池化
net = slim.max_pool2d(net, 3, 2, 'VALID', scope='pool')
with tf.variable_scope("Flatten_Layer"):
net = slim.flatten(net, scope='flatten')
if is_regression:
regression_net = net
if is_svm:
# is_regression和is_svm都为真,表示同时返回 回归和分类
with tf.variable_scope("Layer6"):
net = slim.fully_connected(net, 4096, activation_fn=tf.tanh, scope='fc',
weights_initializer=self.__get_weights_initializer("FC1"),
biases_initializer=self.__get_biases_initializer("FC1"))
net = slim.dropout(net, keep_prob=keep_prob, is_training=is_training, scope='dropout')
with tf.variable_scope("Layer7"):
svm_net = slim.fully_connected(net, 4096, activation_fn=tf.tanh, scope='fc',
weights_initializer=self.__get_weights_initializer("FC2"),
biases_initializer=self.__get_biases_initializer("FC2"))
return svm_net, regression_net
else:
# 仅获取回归的高阶特征
return regression_net
else:
with tf.variable_scope("Layer6"):
net = slim.fully_connected(net, 4096, activation_fn=tf.tanh, scope='fc',
weights_initializer=self.__get_weights_initializer("FC1"),
biases_initializer=self.__get_biases_initializer("FC1"))
net = slim.dropout(net, keep_prob=keep_prob, is_training=is_training, scope='dropout')
with tf.variable_scope("Layer7"):
net = slim.fully_connected(net, 4096, activation_fn=tf.tanh, scope='fc',
weights_initializer=self.__get_weights_initializer("FC2"),
biases_initializer=self.__get_biases_initializer("FC2"))
if is_svm:
# 因为SVM中,使用第7个全连接层作为网络的输出
return net
else:
# 返回的是预训练(fine-tune)的模型结构
with tf.variable_scope("Layer8"):
net = slim.dropout(net, keep_prob=keep_prob, is_training=is_training, scope='dropout')
net = slim.fully_connected(net, output_dims, activation_fn=tf.nn.softmax, scope='fc')
return net
def __loss_layer(self, y_pred, y_true):
with tf.name_scope("CrossEntropy"):
# 对预测值做一个截断的操作,防止预测值为0或者1的情况
y_pred = tf.clip_by_value(y_pred,
clip_value_min=tf.cast(1e-10, dtype=tf.float32),
clip_value_max=tf.cast(1 - 1e-10, dtype=tf.float32))
# 计算交叉熵
cross_entropy = -tf.reduce_sum(y_true * tf.log(y_pred), axis=1)
# 计算损失函数
loss = tf.reduce_mean(cross_entropy)
# 在tf中,有一个专门用于损失函数操作的模型
# 添加损失
# slim.losses.add_loss(loss)
tf.losses.add_loss(loss)
# 添加可视化信息
tf.summary.scalar('loss', loss)
def __get_accuracy(self, y_pred, y_true):
y_pred_maxs = tf.argmax(y_pred, 1)
y_true_maxs = tf.argmax(y_true, 1)
accuracy = tf.reduce_mean(tf.cast(tf.equal(y_pred_maxs, y_true_maxs), tf.float32))
tf.summary.scalar('accuracy', accuracy)
return accuracy
def __get_weights_initializer(self, key):
"""
获取AlexNet预训练权重值作为初始值;反之:随机初始化权重,从头开始训练。
:param key:
"""
if self.load_train_weight and key in self.weights_dict:
value = self.weights_dict[key]
return tf.constant_initializer(value=value)
else:
return initializers.xavier_initializer()
def __get_biases_initializer(self, key):
if self.load_train_weight and key in self.bias_dict:
value = self.bias_dict[key]
return tf.constant_initializer(value=value)
else:
return init_ops.zeros_initializer()
def __reload_weights_and_biases_by_mat(self, alexnet_mat_file_path):
"""
加载预训练权重,并按照名字集中赋值给2个字典,分别为:weights_dict 和 bias_dict
"""
try:
weights_dict = {}
bias_dict = {}
# 1. 加载这个mat的文件
mdict = io.loadmat(alexnet_mat_file_path)
# 2. 获取层次结构信息
layers = mdict['layers'][0]
# 3. 将卷积的参数保存
weights_dict['conv1'] = layers[0][0][0][2][0][0]
bias_dict['conv1'] = layers[0][0][0][2][0][1]
# Mat文件中参数格式为[5,5,48,256]和[256,1]的形状,这里需要将其转换为[5,5,48,128]和[128,1]的两组参数
w1, w2 = np.split(layers[4][0][0][2][0][0], indices_or_sections=2, axis=-1)
b1, b2 = np.split(layers[4][0][0][2][0][1], 2, 0)
weights_dict['conv2_1'] = w1
weights_dict['conv2_2'] = w2
bias_dict['conv2_1'] = b1
bias_dict['conv2_2'] = b2
weights_dict['conv3'] = layers[8][0][0][2][0][0]
bias_dict['conv3'] = layers[8][0][0][2][0][1]
# Mat文件中参数格式为[3,3,192,384]和[384,1]的形状,这里需要将其转换为[3,3,192,192]和[192,1]的两组参数
w1, w2 = np.split(layers[10][0][0][2][0][0], 2, -1)
b1, b2 = np.split(layers[10][0][0][2][0][1], 2, 0)
weights_dict['conv4_1'] = w1
weights_dict['conv4_2'] = w2
bias_dict['conv4_1'] = b1
bias_dict['conv4_2'] = b2
# Mat文件中参数格式为[3,3,192,384]和[384,1]的形状,这里需要将其转换为[3,3,192,192]和[192,1]的两组参数
w1, w2 = np.split(layers[12][0][0][2][0][0], 2, -1)
b1, b2 = np.split(layers[12][0][0][2][0][1], 2, 0)
weights_dict['conv5_1'] = w1
weights_dict['conv5_2'] = w2
bias_dict['conv5_1'] = b1
bias_dict['conv5_2'] = b2
# 加载全连接的参数
weights_dict['FC1'] = np.reshape(layers[15][0][0][2][0][0], (-1, 4096))
bias_dict['FC1'] = np.reshape(layers[15][0][0][2][0][1], -1)
weights_dict['FC2'] = np.reshape(layers[17][0][0][2][0][0], (-1, 4096))
bias_dict['FC2'] = np.reshape(layers[17][0][0][2][0][1], -1)
# 4. 设置属性
self.weights_dict = weights_dict
self.bias_dict = bias_dict
return True
except:
return False
class SVMModel(object):
"""
svm分类器,取cnn网络训练的fc7作为输入,训练一个类别的分类器
"""
def __init__(self, is_training=True):
# 获取标签信息
check_directory(config.TRAIN_LABEL_DICT_FILE_PATH, created=False, error=True)
class_name_2_index_dict = pickle.load(open(config.TRAIN_LABEL_DICT_FILE_PATH, 'rb'))
self.labels = class_name_2_index_dict.values()
self.svm_model_dump_save_path = config.SVM_CHECKPOINT_FILE_PATH
self.label_2_models = {}
if is_training:
# 训练相关参数检查的操作
self.svm_higher_features_save_path = config.TRAIN_SVM_HIGHER_FEATURES_DATA_FILE_PATH
for label in self.labels:
check_directory(self.svm_higher_features_save_path.format(label),
created=False, error=True)
# 模型输出文件夹是否存在,如果不存在,进行创建
check_directory(os.path.dirname(self.svm_model_dump_save_path))
else:
# 检查模型是否存在,如果存在,进行恢复加载的操作(预测时用)
for label in self.labels:
filename = self.svm_model_dump_save_path.format(label)
check_directory(filename, created=False, error=True)
self.label_2_models[label] = joblib.load(filename)
def fetch_labels(self):
"""
获取具体有多少个类别
:return:
"""
return self.labels
def train(self):
"""
对每个类别均进行模型训练
:return:
"""
for label in self.labels:
print("Training type '{}' svm model .....".format(label))
# 1. 加载数据
data = np.load(self.svm_higher_features_save_path.format(label))
x, y = np.split(data, indices_or_sections=(np.shape(data)[1] - 1,), axis=1)
y = np.reshape(y, -1)
print(np.shape(x), np.shape(y))
# 2. 模型构建
algo = SVC(C=1.0, kernel='linear', random_state=28, max_iter=1000, probability=True)
# 3. 模型训练
algo.fit(x, y)
# 4. 模型效果评估
pred = algo.predict(x)
print("SVM accuracy on training data:{}".format(metrics.accuracy_score(y, pred)))
print("SVM confusion matrix on training data:\n{}".format(metrics.confusion_matrix(y, pred)))
# 5. 模型持久化
joblib.dump(algo, self.svm_model_dump_save_path.format(label))
self.label_2_models[label] = algo
def predict(self, x, label):
"""
使用给定类别的模型进行数据预测操作(返回预测的类别)
:param x:
:param label:
:return:
"""
if label in self.label_2_models:
# 1. 加载模型
algo = self.label_2_models[label]
# 2. 结果预测
return algo.predict(x)
else:
return None
def predict_proba(self, x, label):
"""
使用给定类别的模型进行数据预测操作, 返回样本属于当前类别的概率值
:param x:
:param label:
:return:
"""
if label in self.label_2_models:
# 1. 加载模型
algo = self.label_2_models[label]
# 2. 结果预测
return algo.predict_proba(x)[:, 1]
else:
return None
class RegressionNet(object):
"""
Bounding Box回归线性模型(神经网络)训练。使用Conv5作为输入。
"""
def __init__(self, is_training=True):
self.input_dimension = config.REGRESSION_INPUT_DIMENSION
# 输出维度为offset,即[tx_offset, ty_Offset, tw_Offset, th_Offset]
self.output_dimension = config.REGRESSION_OUTPUT_DIMENSION
self.input_data = tf.placeholder(tf.float32, [None, self.input_dimension], name='input')
self.logits = self.__build_network(input=self.input_data,
output_dims=self.output_dimension,
is_training=is_training)
if is_training:
# 获取这个损失函数
self.label = tf.placeholder(tf.float32, [None, self.output_dimension], name='label')
# 构造损失函数
self.__loss_layer(y_pred=self.logits, y_true=self.label)
# 获取所有的损失函数
self.total_loss = tf.losses.get_total_loss()
tf.summary.scalar('total_loss', self.total_loss)
def __build_network(self, input, output_dims, is_training=True, keep_prob=0.5, scope='Regression_BOX'):
with tf.variable_scope(scope):
# slim.arg_scope: 功能,对于某些API中(with语句块内部如果使用到这个API,才会生效),设置一个模型的输入参数(默认)
with slim.arg_scope([slim.fully_connected],
weights_initializer=tf.truncated_normal_initializer(0.0, 0.001),
weights_regularizer=slim.l2_regularizer(0.005)):
net = slim.fully_connected(input, 4096, activation_fn=nn_ops.relu, scope='fc1')
net = slim.dropout(net, keep_prob=keep_prob, is_training=is_training, scope='dropout2')
regression_out = slim.fully_connected(net, output_dims, activation_fn=nn_ops.relu, scope='fc2')
return regression_out
def __loss_layer(self, y_pred, y_true):
with tf.name_scope("MSE"):
# 计算损失函数
loss = tf.reduce_mean(tf.square(y_pred - y_true))
# 添加损失
tf.losses.add_loss(loss)
# 添加可视化信息
tf.summary.scalar('loss', loss)
if __name__ == '__main__':
# 可视化下模型图并查看。
alexnet = AlexNet(alexnet_mat_file_path='D:\\迅雷下载\\imagenet-caffe-alex.mat')
writer = tf.summary.FileWriter(logdir='./model/graph', graph=tf.get_default_graph())
writer.close()
CV-1-目标检测-03-RCNN-04-network
猜你喜欢
转载自blog.csdn.net/HJZ11/article/details/104734256
今日推荐
周排行