【问题尚未完全解决】
首先是I/O模块
读取数据并做简单的处理
# io_data.py
import os
import numpy as np
import random
from keras.utils.np_utils import to_categorical
import pandas as pd
import csv
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
data_dir = os.path.join(base_dir,'data')
def read_dataset(mode='train',isFeat=True):
"""
Return:
# features: (int. list) list
# labels: int32 2D array
data_ids: int. list
"""
# num_data = 0
datas = []
with open(os.path.join(data_dir,'{}.csv'.format(mode))) as file:
#file = pd.read_csv(os.path.join(data_dir,'{}.csv'.format(mode)), sep=',', header=0)
#print(file.readlines()[0:1])
for line_id,line in enumerate(file.readlines()[1:]):
#print('line : ', line, line_id)
if isFeat:
label, feat=line.split(',')
else:
_,feat = line.split(',')
#print(feat)
feat = np.fromstring(feat,dtype=int,sep=' ')
# 归一化
feat = feat / 255
#print(feat)
feat = np.reshape(feat,(48,48,1))
if isFeat:
datas.append((feat,int(label),line_id))
else:
datas.append(feat)
random.shuffle(datas) # shuffle outside
if isFeat:
feats,labels,line_ids = zip(*datas)
else:
feats = datas
feats = np.asarray(feats)
if isFeat:
labels = to_categorical(np.asarray(labels,dtype=np.int32))
return feats,labels,line_ids
else:
return feats
def dump_history(store_path,logs):
with open(os.path.join(store_path,'train_loss'),'a') as f:
for loss in logs.tr_losses:
f.write('{}\n'.format(loss))
with open(os.path.join(store_path,'train_accuracy'),'a') as f:
for acc in logs.tr_accs:
f.write('{}\n'.format(acc))
with open(os.path.join(store_path,'valid_loss'),'a') as f:
for loss in logs.val_losses:
f.write('{}\n'.format(loss))
with open(os.path.join(store_path,'valid_accuracy'),'a') as f:
for acc in logs.val_accs:
f.write('{}\n'.format(acc))
if __name__ == "__main__":
read_dataset()
然后,设计model,原代码给出了一个easy model,training之后的正确率卡在40%左右
我在网上找了一个据说是不错的model(simple model)
# model.py
import os
from keras.models import Sequential
from keras.layers import Input,Dense,Dropout,Flatten,Activation
from keras.layers import Convolution2D,MaxPooling2D
from keras.optimizers import SGD
import tensorflow as tf
from record import *
nb_class = 7
def build_model(mode):
"""Return the Keras model for training
Keyword arguments:
mode: model name specified in training and predicting script
"""
model = Sequential()
if mode == 'easy':
# CNN part (you can repeat this part several times)
model.add(Convolution2D(8,3,3,border_mode='valid',input_shape=(48,48,1)))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.8))
# Fully connected part
model.add(Flatten())
model.add(Dense(16))
model.add(Activation('relu'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
opt = SGD(lr=0.01,decay=0.0)
if mode == 'simple':
model.add(Convolution2D(32, (1, 1), strides=1, padding='same', input_shape=(48, 48, 1)))
model.add(Activation('relu'))
model.add(Convolution2D(32, (5, 5), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Convolution2D(32, (3, 3), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Convolution2D(64, (5, 5), padding='same'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(2048))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(1024))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.summary()
#opt = SGD(lr=0.01, decay=0.0)
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
model.summary() # show the whole model in terminal
return model
然后开始训练数据
# train.py
#!/usr/bin/env python
# -- coding: utf-8 --
import argparse
import os
import model
from utils import *
from io_data import *
from record import *
import os
import tensorflow as tf
import keras.backend.tensorflow_backend as KTF
# 进行配置GPU
os.environ["CUDA_VISIBLE_DEVICES"]="0,1" # 使用编号为1的GPU
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.8 # 每个GPU现存上届控制在80%以内
session = tf.Session(config=config)
# 设置session
KTF.set_session(session )
base_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
exp_dir = os.path.join(base_dir,'exp')
def main():
parser = argparse.ArgumentParser(prog='train.py',
description='ML-Assignment training script.')
parser.add_argument('--model', type=str,default='easy',choices=['easy','simple','strong'],
metavar='<model>')
parser.add_argument('--epoch',type=int,metavar='<#epoch>',default=40)
parser.add_argument('--batch',type=int,metavar='<batch_size>',default=64)
args = parser.parse_args()
print(args)
# set the path
dir_cnt = 0
log_path = "{}_epoch{}".format(args.model,str(args.epoch))
log_path += '_'
store_path = os.path.join(exp_dir,log_path+str(dir_cnt))
while dir_cnt < 30:
if not os.path.isdir(store_path):
os.mkdir(store_path)
break
else:
dir_cnt += 1
store_path = os.path.join(exp_dir, log_path + str(dir_cnt))
emotion_classifier = model.build_model(args.model)
tr_feats,tr_labels,_ = read_dataset('train')
dev_feats,dev_labels,_ = read_dataset('valid') # 10% of original training data
history = History()
emotion_classifier.fit(x = tr_feats,y = tr_labels,
batch_size=args.batch,epochs=args.epoch,validation_split=0.1,
callbacks=[history])
#validation_data=(dev_feats,dev_labels)
# 算出正确率
train_result = emotion_classifier.evaluate(tr_feats, tr_labels)
print('\n train Acc: ', train_result[1])
dump_history(store_path,history)
emotion_classifier.save(os.path.join(store_path,'model.h5'))
if __name__ == "__main__":
main()
经过漫长的等待后,得到的在training set上的正确率大概为95%,但是在valid date中仅为55%。其中是有问题的。
训练过程中,将 acc 和 loss 记录下来
# record.py
from keras.callbacks import Callback
class History(Callback):
def on_train_begin(self,logs={}):
self.tr_losses=[]
self.val_losses=[]
self.tr_accs=[]
self.val_accs=[]
def on_epoch_end(self,epoch,logs={}):
self.tr_losses.append(logs.get('loss'))
self.val_losses.append(logs.get('val_loss'))
self.tr_accs.append(logs.get('acc'))
self.val_accs.append(logs.get('val_acc'))
对 testing data 进行预测
# predict.py
#!/usr/bin/env python
# -- coding: utf-8 --
import argparse
from keras.models import load_model
from termcolor import colored,cprint
from io_data import *
import os
import tensorflow as tf
import keras.backend.tensorflow_backend as KTF
# 进行配置GPU
os.environ["CUDA_VISIBLE_DEVICES"]="0,1" # 使用编号为1的GPU
config = tf.ConfigProto()
config.gpu_options.per_process_gpu_memory_fraction = 0.6 # 每个GPU现存上届控制在60%以内
session = tf.Session(config=config)
# 设置session
KTF.set_session(session)
base_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
exp_dir = os.path.join(base_dir,'exp')
def main():
parser = argparse.ArgumentParser(prog='predict.py',
description='ML-Assignment3 testing script.')
parser.add_argument('--model',type=str,default='simple',choices=['easy','simple','strong'],
metavar='<model>')
parser.add_argument('--epoch',type=int,metavar='<#epoch>',default=40)
parser.add_argument('--batch',type=int,metavar='<batch_size>',default=64)
parser.add_argument('--idx',type=int,metavar='<suffix>', default=0)
args = parser.parse_args()
store_path = "{}_epoch{}_{}".format(args.model,args.epoch,args.idx)
print(colored("\nLoading model from {}".format(store_path),'yellow',attrs=['bold']))
model_path = os.path.join(exp_dir,store_path,'model.h5')
emotion_classifier = load_model(model_path)
emotion_classifier.summary()
te_feats = read_dataset('test',False)
ans = emotion_classifier.predict_classes(te_feats,batch_size=args.batch)
with open('Answer','w') as f:
f.write('id,label\n')
for idx,a in enumerate(ans):
f.write('{},{}\n'.format(idx,a))
if __name__ == "__main__":
main()
结果保存在 Answer 文件中
画出model
# plot_model.py
#!/usr/bin/env python
# -- coding: utf-8 --
import os
from termcolor import colored, cprint
import argparse
from keras.models import load_model
from keras.utils.vis_utils import plot_model
import matplotlib.pyplot as plt
from PIL import Image
base_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
exp_dir = os.path.join(base_dir,'exp')
def main():
parser = argparse.ArgumentParser(prog='plot_model.py',
description='Plot the model.')
parser.add_argument('--model',type=str,default='simple',choices=['simple','easy','strong'],
metavar='<model>')
parser.add_argument('--epoch',type=int,metavar='<#epoch>',default=40)
parser.add_argument('--batch',type=int,metavar='<batch_size>',default=64)
parser.add_argument('--idx',type=int,metavar='<suffix>', default=0)
args = parser.parse_args()
store_path = "{}_epoch{}_{}".format(args.model,args.epoch,args.idx)
print(colored("Loading model from {}".format(store_path),'yellow',attrs=['bold']))
model_path = os.path.join(exp_dir,store_path,'model.h5')
emotion_classifier = load_model(model_path)
emotion_classifier.summary()
plot_model(emotion_classifier,to_file='ml_plot.png')
if __name__ == "__main__":
main()
最后把分好类的图片加载出来看一看
# arr_pic.py
from PIL import Image
import numpy as np
import scipy.misc
import os
base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.realpath(__file__))))
data_dir = os.path.join(base_dir,'data')
def output_pic(idx, num):
with open(os.path.join(data_dir, 'test.csv')) as file:
for line_id, line in enumerate(file.readlines()[1:200]):
_idx, feat = line.split(',')
if int(_idx) == idx:
print(idx)
feat = np.fromstring(feat, dtype=int, sep=' ')
feat = np.reshape(feat, (48, 48))
new_im = Image.fromarray(feat)
#new_im.show()
scipy.misc.imsave('C:/Users/liky/Desktop/pic_3/' + "test" + str(num) + ".jpg", new_im)
print('save succ')
break
if __name__ == '__main__':
with open('F:/pycharmFile/CNN_1/Answer') as f:
num = 0
for line in f.readlines()[1:200] :
if int(line.split(',')[1]) == 3: # 高兴的图片
output_pic(int(line.split(',')[0]), num)
num += 1
print(line , ' ', num)