【深度学习】CAM (Class Activation Mapping) MNIST 基于 tensorflow
前言
在很多时候,我们可能想知道分类器是根据什么位置进行的分类,分类的重点有没有在正确的位置上。
这时候我们就要用到 CAM。
CAM
CAM(Class Activation Mapping) 的具体内容读者自行百度,这里负责给出部分理解以及在 MNIST 上应用的代码。CAM 的操作就是把全连接替换成 GAP(Global Average Pooling) 。
下图是原本的 LeNet 用于手写数字识别。
接下来我们要把全接连层去掉,然后把全连接的前一层的通道数改成类别数,也就是一个通道对应一个类别,然后进行平均池化变成 [1, 1, num] 的大小,然后就可以把它当做是 OUTPUT 层。
然后我们把在改好后的网络下训练好的模型保存下来。然后我们将池化层和最后一个卷积层对应相乘,然后相加。
就是下面这个样子,对应相乘然后相加。
对应相乘相加后,得到一个单通道的矩阵,然后我们把它 resize 成原图大小,再用 plt 的 jet 色图,把图片调成半透明,叠加在原图上面就是一张热图啦。
但是这个后面的过程在训练代码上实现起来比较麻烦,所以我们把它设置另一个代码,每次只加载一张图片,就是令 Batch = 1,然后输出图片。
所以 CAM 的缺点在于要更改网络,所以后来有了 Grad-CAM。
训练代码
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
import numpy as np
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.system("rm -r logs")
import tensorflow as tf
# get_ipython().run_line_magic('matplotlib', 'inline')
import matplotlib.pyplot as plt
from PIL import Image
import multiprocessing
# In[ ]:
TrainPath = '/home/winsoul/disk/MNIST/data/tfrecord/train.tfrecords'
ValPath = '/home/winsoul/disk/MNIST/data/tfrecord/test.tfrecords'
# BatchSize = 64
model_path = '/home/winsoul/disk/MNIST/CAM/model/'
# In[ ]:
def read_tfrecord(TFRecordPath):
with tf.Session() as sess:
feature = {
'image': tf.FixedLenFeature([], tf.string),
'label': tf.FixedLenFeature([], tf.int64)
}
# filename_queue = tf.train.string_input_producer([TFRecordPath], num_epochs = 1)
filename_queue = tf.train.string_input_producer([TFRecordPath])
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(serialized_example, features = feature)
image = tf.decode_raw(features['image'], tf.float32)
image = tf.reshape(image, [28, 28, 1])
label = tf.cast(features['label'], tf.int32)
return image, label
# In[ ]:
def conv_layer(X, k, s, channels_in, channels_out, name = 'CONV'):
with tf.name_scope(name):
W = tf.Variable(tf.truncated_normal([k, k, channels_in, channels_out], stddev = 0.1));
b = tf.Variable(tf.constant(0.1, shape = [channels_out]))
conv = tf.nn.conv2d(X, W, strides = [1, s, s, 1], padding = 'SAME')
result = tf.nn.relu(conv + b)
tf.summary.histogram('weights', W)
tf.summary.histogram('biases', b)
tf.summary.histogram('activations', result)
return result
# In[ ]:
def pool_layer(X, k, s, strr = 'SAME', pool_type = 'MAX', name = 'pool'):
if pool_type == 'MAX':
result = tf.nn.max_pool(X,
ksize = [1, k, k, 1],
strides = [1, s, s, 1],
padding = strr,
name = name)
else:
result = tf.nn.avg_pool(X,
ksize = [1, k, k, 1],
strides = [1, s, s, 1],
padding = strr,
name = name)
return result
# In[ ]:
def fc_layer(X, neurons_in, neurons_out, last = False, name = 'FC'):
with tf.name_scope(name):
W = tf.Variable(tf.truncated_normal([neurons_in, neurons_out], stddev = 0.1))
b = tf.Variable(tf.constant(0.1, shape = [neurons_out]))
tf.summary.histogram('weights', W)
tf.summary.histogram('biases', b)
if last == False:
result = tf.nn.relu(tf.matmul(X, W) + b)
else:
result = tf.nn.softmax(tf.matmul(X, W) + b)
tf.summary.histogram('activations', result)
return result
# In[ ]:
def Network(BatchSize, learning_rate):
tf.reset_default_graph()
with tf.Session() as sess:
in_training = tf.placeholder(tf.bool, name = 'in_training')
keep_prob = tf.placeholder('float32', name = 'keep_prob')
image_train, label_train = read_tfrecord(TrainPath)
image_val, label_val = read_tfrecord(ValPath)
image_train_Batch, label_train_Batch = tf.train.shuffle_batch([image_train, label_train],
batch_size = BatchSize,
capacity = BatchSize*3 + 200,
min_after_dequeue = BatchSize)
image_val_Batch, label_val_Batch = tf.train.shuffle_batch([image_val, label_val],
batch_size = BatchSize,
capacity = BatchSize*3 + 200,
min_after_dequeue = BatchSize)
image_Batch = tf.cond(in_training, lambda: image_train_Batch, lambda: image_val_Batch)
label_Batch = tf.cond(in_training, lambda: label_train_Batch, lambda: label_val_Batch)
label_Batch = tf.one_hot(label_Batch, depth = 10)
X = tf.identity(image_Batch)
y = tf.identity(label_Batch)
# X = image_Batch
# y = label_Batch
with tf.name_scope('input_reshape'):
tf.summary.image('input', X, 32)
conv1_1 = conv_layer(X, 3, 1, 1, 32, name = "conv1_1")
conv1_2 = conv_layer(conv1_1, 3, 1, 32, 32, name = "conv1_2")
pool1 = pool_layer(conv1_2, 2, 2, "SAME", "MAX", name = "pool1")
conv2_1 = conv_layer(pool1, 3, 1, 32, 64, name = 'conv2_1')
conv2_2 = conv_layer(conv2_1, 3, 1, 64, 64, name = 'conv2_2')
pool2 = pool_layer(conv2_2, 2, 2, "SAME", "MAX", name = "pool2")
print(pool2.shape)
conv3 = conv_layer(pool2, 3, 1, 64, 10, name = 'conv3')
pool3 = pool_layer(conv3, 7, 7, "SAME", "MEAN", name = "pool3")
y_result = tf.reshape(pool3, [-1, 10])
with tf.name_scope('summaries'):
cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = y_result, labels = y))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
#train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(cross_entropy)
corrent_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_result, 1))
accuracy = tf.reduce_mean(tf.cast(corrent_prediction, 'float', name = 'accuracy'))
tf.summary.scalar("loss", cross_entropy)
tf.summary.scalar("accuracy", accuracy)
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord = coord)
merge_summary = tf.summary.merge_all()
summary_writer = tf.summary.FileWriter("./logs/train" , sess.graph)
summary_writer_test = tf.summary.FileWriter("./logs/test")
saver = tf.train.Saver()
try:
batch_index = 1
while not coord.should_stop():
sess.run([train_step], feed_dict = {keep_prob: 0.5, in_training: True})
if batch_index % 10 == 0:
summary, _, acc, loss = sess.run([merge_summary, train_step, accuracy, cross_entropy], feed_dict = {keep_prob: 0.5, in_training: True})
summary_writer.add_summary(summary, batch_index)
print(str(batch_index) + ' : ' + str(acc) + ' ' + str(loss))
summary, _, acc, loss = sess.run([merge_summary, train_step, accuracy, cross_entropy], feed_dict = {keep_prob: 1.0, in_training: False})
summary_writer_test.add_summary(summary, batch_index)
print(str(batch_index) + ' : ' + str(acc) + ' ' + str(loss))
if batch_index % 200 == 0:
save_path = saver.save(sess, model_path + 'Model__Step_{:08d}'.format(batch_index))
batch_index += 1
except tf.errors.OutOfRangeError:
print("OutofRangeError!")
finally:
print("Finish")
coord.request_stop()
coord.join(threads)
sess.close()
# In[ ]:
def main():
Network(512, 0.0001)
# In[ ]:
if __name__ == '__main__':
main()
CAM
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import numpy as np
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
# os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
# os.system("rm -r logs")
import tensorflow as tf
get_ipython().run_line_magic('matplotlib', 'inline')
import matplotlib.pyplot as plt
from PIL import Image
# import multiprocessing
from multiprocessing import Process
import threading
import time
import random
import cv2
from skimage import transform
# In[2]:
# TrainPath = '/home/winsoul/disk/MyML/data/tfrecord/train.tfrecords'
# ValPath = '/home/winsoul/disk/MyML/data/tfrecord/val.tfrecords'
# BatchSize = 64
dataPath = '/home/winsoul/disk/MNIST/data/'
trainPath = '/home/winsoul/disk/MNIST/data/train/'
epoch = 10
DisplayStep = 20
SaveModelStep = 1000
model_path = '/home/winsoul/disk/MNIST/CAM/model/'
# In[3]:
def conv_layer(X, k, s, channels_in, channels_out, is_training = False, name = 'CONV'):
with tf.name_scope(name):
W = tf.Variable(tf.truncated_normal([k, k, channels_in, channels_out], stddev = 0.1));
b = tf.Variable(tf.constant(0.1, shape = [channels_out]))
conv = tf.nn.conv2d(X, W, strides = [1, s, s, 1], padding = 'SAME')
conv_b = tf.nn.bias_add(conv, b)
# bn = tf.layers.batch_normalization(conv_b, training = is_training)
result = tf.nn.relu(conv_b)
tf.summary.histogram('weights', W)
tf.summary.histogram('biases', b)
tf.summary.histogram('activations', result)
return result
def pool_layer(X, k, s, strr = 'SAME', pool_type = 'MAX', name = 'pool'):
if pool_type == 'MAX':
result = tf.nn.max_pool(X,
ksize = [1, k, k, 1],
strides = [1, s, s, 1],
padding = strr,
name = name)
else:
result = tf.nn.avg_pool(X,
ksize = [1, k, k, 1],
strides = [1, s, s, 1],
padding = strr,
name = name)
return result
def fc_layer(X, neurons_in, neurons_out, last = False, name = 'FC'):
with tf.name_scope(name):
W = tf.Variable(tf.truncated_normal([neurons_in, neurons_out], stddev = 0.1))
b = tf.Variable(tf.constant(0.1, shape = [neurons_out]))
tf.summary.histogram('weights', W)
tf.summary.histogram('biases', b)
if last == False:
result = tf.nn.sigmoid(tf.matmul(X, W) + b)
else:
result = tf.matmul(X, W) + b
tf.summary.histogram('activations', result)
return result
# In[4]:
def Network(BatchSize, learning_rate, image, label):
tf.reset_default_graph()
with tf.Session() as sess:
is_training = tf.placeholder(dtype = tf.bool, shape=())
keep_prob = tf.placeholder('float32', name = 'keep_prob')
X = tf.placeholder('float32', shape = ([None, 28, 28, 1]))
y = tf.placeholder('int32', shape = ([None, 10]))
with tf.name_scope('input_reshape'):
tf.summary.image('input', X, 2)
#########################################################################################################
## Network
conv1_1 = conv_layer(X, 3, 1, 1, 32, name = "conv1_1")
conv1_2 = conv_layer(conv1_1, 3, 1, 32, 32, name = "conv1_2")
pool1 = pool_layer(conv1_2, 2, 2, "SAME", "MAX", name = "pool1")
conv2_1 = conv_layer(pool1, 3, 1, 32, 64, name = 'conv2_1')
conv2_2 = conv_layer(conv2_1, 3, 1, 64, 64, name = 'conv2_2')
pool2 = pool_layer(conv2_2, 2, 2, "SAME", "MAX", name = "pool2")
print(pool2.shape)
conv3 = conv_layer(pool2, 3, 1, 64, 10, name = 'conv3')
pool3 = pool_layer(conv3, 7, 7, "SAME", "MEAN", name = "pool3")
y_result = tf.reshape(pool3, [-1, 10])
#########################################################################################################
T = tf.image.resize_images(conv3, [28, 28], method = 0)
print(T.shape)
w = y_result
print(w.shape)
with tf.name_scope('summaries'):
update_ops = tf.get_collection(tf.GraphKeys.UPDATE_OPS)
with tf.control_dependencies(update_ops):
cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits = y_result, labels = y))
train_step = tf.train.AdamOptimizer(learning_rate).minimize(cross_entropy)
#train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(cross_entropy)
prediction = tf.argmax(y_result, 1)
corrent_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_result, 1))
accuracy = tf.reduce_mean(tf.cast(corrent_prediction, 'float', name = 'accuracy'))
tf.summary.scalar("loss", cross_entropy)
tf.summary.scalar("accuracy", accuracy)
init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer())
sess.run(init_op)
# variables_to_restore = tf.contrib.framework.get_variables_to_restore(exclude = ['CONV/Variable', 'CONV_1/Variable'])
# saver = tf.train.Saver(variables_to_restore)
saver = tf.train.Saver()
saver.restore(sess, model_path + 'Model__Step_00002000')
T, w, predic = sess.run([T, w, prediction], feed_dict = {keep_prob: 1.0, is_training: False, X: image, y: label})
return T, w, predic
# In[5]:
def get_heatmap(image, label):
rate = 0.0001
image = image.astype(np.float32)
image = image/255 - 0.5
image = image.reshape(1, 28, 28, 1)
label_onehot = np.zeros([1, 10])
label_onehot[0][int(label)] = 1
# print('label ', label_onehot)
T, w, predic = Network(1, rate, image, label_onehot)
return T, w, predic
# In[6]:
def get_infoList():
infoList = os.listdir(trainPath)
return infoList
# In[7]:
def main():
infoList = get_infoList()
random.shuffle(infoList)
print(len(infoList))
for op in range(len(infoList)):
label, name = infoList[op].split('_')
image = cv2.imread(trainPath + label + '_' + name, cv2.IMREAD_GRAYSCALE)
T, w, predic = get_heatmap(image, label)
if int(label) == predic:
T = T.reshape([28 * 28, 10])
T = T.transpose()
heatmap = np.matmul(w, T)
heatmap = heatmap.reshape([28, 28])
plt.imshow(image)
plt.imshow(heatmap, cmap = plt.cm.jet, alpha = 0.5, interpolation='bilinear')
plt.show()
print(label, predic)
# In[8]:
if __name__ == '__main__':
main()