从参数定义,到网络模型定义,再到训练步骤,验证步骤,测试步骤,总结了一套较为直观的模板。目录如下:
目录
扫描二维码关注公众号,回复:
14577691 查看本文章
一、导入包以及设置随机种子
import numpy as np
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import random
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
注:random.seed不懂的同学可看看我这篇文章:random.seed()的用法_云隐雾匿的博客-CSDN博客
二、以类的方式定义超参数
class argparse():
pass
args = argparse()
args.epochs, args.learning_rate, args.patience = [30, 0.001, 4]
args.hidden_size, args.input_size= [40, 30]
args.device, = [torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),]
三、定义自己的模型
class Your_model(nn.Module):
def __init__(self):
super(Your_model, self).__init__()
pass
def forward(self,x):
pass
return x
四、定义早停类(此步骤可以省略)
class EarlyStopping():
def __init__(self,patience=7,verbose=False,delta=0):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_score = None
self.early_stop = False
self.val_loss_min = np.Inf
self.delta = delta
def __call__(self,val_loss,model,path):
print("val_loss={}".format(val_loss))
score = -val_loss
if self.best_score is None:
self.best_score = score
self.save_checkpoint(val_loss,model,path)
elif score < self.best_score+self.delta:
self.counter+=1
print(f'EarlyStopping counter: {self.counter} out of {self.patience}')
if self.counter>=self.patience:
self.early_stop = True
else:
self.best_score = score
self.save_checkpoint(val_loss,model,path)
self.counter = 0
def save_checkpoint(self,val_loss,model,path):
if self.verbose:
print(
f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}). Saving model ...')
torch.save(model.state_dict(), path+'/'+'model_checkpoint.pth')
self.val_loss_min = val_loss
五、定义自己的数据集Dataset,DataLoader
class Dataset_name(Dataset):
def __init__(self, flag='train'):
assert flag in ['train', 'test', 'valid']
self.flag = flag
self.__load_data__()
def __getitem__(self, index):
pass
def __len__(self):
pass
def __load_data__(self, csv_paths: list):
pass
print(
"train_X.shape:{}\ntrain_Y.shape:{}\nvalid_X.shape:{}\nvalid_Y.shape:{}\n"
.format(self.train_X.shape, self.train_Y.shape, self.valid_X.shape, self.valid_Y.shape))
train_dataset = Dataset_name(flag='train')
train_dataloader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)
valid_dataset = Dataset_name(flag='valid')
valid_dataloader = DataLoader(dataset=valid_dataset, batch_size=64, shuffle=True)
六、实例化模型,设置loss,优化器等
model = Your_model().to(args.device)
criterion = torch.nn.MSELoss()
optimizer = torch.optim.Adam(Your_model.parameters(),lr=args.learning_rate)
train_loss = []
valid_loss = []
train_epochs_loss = []
valid_epochs_loss = []
early_stopping = EarlyStopping(patience=args.patience,verbose=True)
七、开始训练以及调整lr
for epoch in range(args.epochs):
Your_model.train()
train_epoch_loss = []
for idx,(data_x,data_y) in enumerate(train_dataloader,0):
data_x = data_x.to(torch.float32).to(args.device)
data_y = data_y.to(torch.float32).to(args.device)
outputs = Your_model(data_x)
optimizer.zero_grad()
loss = criterion(data_y,outputs)
loss.backward()
optimizer.step()
train_epoch_loss.append(loss.item())
train_loss.append(loss.item())
if idx%(len(train_dataloader)//2)==0:
print("epoch={}/{},{}/{}of train, loss={}".format(
epoch, args.epochs, idx, len(train_dataloader),loss.item()))
train_epochs_loss.append(np.average(train_epoch_loss))
#=====================valid============================
Your_model.eval()
valid_epoch_loss = []
for idx,(data_x,data_y) in enumerate(valid_dataloader,0):
data_x = data_x.to(torch.float32).to(args.device)
data_y = data_y.to(torch.float32).to(args.device)
outputs = Your_model(data_x)
loss = criterion(outputs,data_y)
valid_epoch_loss.append(loss.item())
valid_loss.append(loss.item())
valid_epochs_loss.append(np.average(valid_epoch_loss))
#==================early stopping======================
early_stopping(valid_epochs_loss[-1],model=Your_model,path=r'c:\\your_model_to_save')
if early_stopping.early_stop:
print("Early stopping")
break
#====================adjust lr========================
lr_adjust = {
2: 5e-5, 4: 1e-5, 6: 5e-6, 8: 1e-6,
10: 5e-7, 15: 1e-7, 20: 5e-8
}
if epoch in lr_adjust.keys():
lr = lr_adjust[epoch]
for param_group in optimizer.param_groups:
param_group['lr'] = lr
print('Updating learning rate to {}'.format(lr))
八、绘图
plt.figure(figsize=(12,4))
plt.subplot(121)
plt.plot(train_loss[:])
plt.title("train_loss")
plt.subplot(122)
plt.plot(train_epochs_loss[1:],'-o',label="train_loss")
plt.plot(valid_epochs_loss[1:],'-o',label="valid_loss")
plt.title("epochs_loss")
plt.legend()
plt.show()
九、预测
# 此处可定义一个预测集的Dataloader。也可以直接将你的预测数据reshape,添加batch_size=1
Your_model.eval()
predict = Your_model(data)
十、运行实例参考(结合上述模型理解)
下面用18个数训练了一个分类网络,判断一个数字是否大于8(在dataset中设置),具有完整的训练和预测流程。
网络是最简单的全连接,输入为1,输出为2(2分类)。
import random
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
# 设置随机数种子保证论文可复现
seed = 42
torch.manual_seed(seed)
np.random.seed(seed)
random.seed(seed)
torch.cuda.manual_seed_all(seed)
# 以类的方式定义参数,还有很多方法,config文件等等
class Args:
def __init__(self) -> None:
self.batch_size = 1
self.lr = 0.001
self.epochs = 10
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.data_train = np.array([-2, -1, 0, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 18, 20])
self.data_val = np.array([15, 16, 17, 0.1, -3, -4])
args = Args()
# 定义一个简单的全连接
class Net(nn.Module):
def __init__(self, in_dim, n_hidden_1, n_hidden_2, out_dim):
super().__init__()
self.layer1 = nn.Sequential(
nn.Linear(in_dim, n_hidden_1), nn.ReLU(True))
self.layer2 = nn.Sequential(
nn.Linear(n_hidden_1, n_hidden_2), nn.ReLU(True))
self.layer3 = nn.Sequential(nn.Linear(n_hidden_2, out_dim))
def forward(self, x):
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
return x
# 定义数据集,判断一个数字是否大于8
class Dataset_num(Dataset):
def __init__(self, flag='train') -> None:
self.flag = flag
assert self.flag in ['train', 'val'], 'not implement!'
if self.flag == 'train':
self.data = args.data_train
else:
self.data = args.data_val
def __getitem__(self, index: int):
val = self.data[index]
if val > 8:
label = 1
else:
label = 0
return torch.tensor(label, dtype=torch.long), torch.tensor([val], dtype=torch.float32)
def __len__(self) -> int:
return len(self.data)
def train():
train_dataset = Dataset_num(flag='train')
train_dataloader = DataLoader(dataset=train_dataset, batch_size=args.batch_size, shuffle=True)
val_dataset = Dataset_num(flag='val')
val_dataloader = DataLoader(dataset=val_dataset, batch_size=args.batch_size, shuffle=True)
model = Net(1, 32, 16, 2).to(args.device) # 网路参数设置,输入为1,输出为2,即判断一个数是否大于8
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=args.lr) # , eps=1e-8)
train_epochs_loss = []
valid_epochs_loss = []
train_acc = []
val_acc = []
for epoch in range(args.epochs):
model.train()
train_epoch_loss = []
acc, nums = 0, 0
# =========================train=======================
for idx, (label, inputs) in enumerate(tqdm(train_dataloader)):
inputs = inputs.to(args.device)
label = label.to(args.device)
outputs = model(inputs)
optimizer.zero_grad()
loss = criterion(outputs, label)
loss.backward()
# torch.nn.utils.clip_grad_norm_(model.parameters(), 2.0) #用来梯度裁剪
optimizer.step()
train_epoch_loss.append(loss.item())
acc += sum(outputs.max(axis=1)[1] == label).cpu()
nums += label.size()[0]
train_epochs_loss.append(np.average(train_epoch_loss))
train_acc.append(100 * acc / nums)
print("train acc = {:.3f}%, loss = {}".format(100 * acc / nums, np.average(train_epoch_loss)))
# =========================val=========================
with torch.no_grad():
model.eval()
val_epoch_loss = []
acc, nums = 0, 0
for idx, (label, inputs) in enumerate(tqdm(val_dataloader)):
inputs = inputs.to(args.device) # .to(torch.float)
label = label.to(args.device)
outputs = model(inputs)
loss = criterion(outputs, label)
val_epoch_loss.append(loss.item())
acc += sum(outputs.max(axis=1)[1] == label).cpu()
nums += label.size()[0]
valid_epochs_loss.append(np.average(val_epoch_loss))
val_acc.append(100 * acc / nums)
print("epoch = {}, valid acc = {:.2f}%, loss = {}".format(epoch, 100 * acc / nums, np.average(val_epoch_loss)))
# =========================plot==========================
plt.figure(figsize=(12, 4))
plt.subplot(121)
plt.plot(train_epochs_loss[:])
plt.title("train_loss")
plt.subplot(122)
plt.plot(train_epochs_loss, '-o', label="train_loss")
plt.plot(valid_epochs_loss, '-o', label="valid_loss")
plt.title("epochs_loss")
plt.legend()
plt.show()
# =========================save model=====================
torch.save(model.state_dict(), 'model.pth')
def pred(val):
model = Net(1, 32, 16, 2)
model.load_state_dict(torch.load('model.pth'))
model.eval()
val = torch.tensor(val).reshape(1, -1).float()
# 需要转换成相应的输入shape,而且得带上batch_size,因此转换成shape=(1,1)这样的形状
res = model(val)
# real: tensor([[-5.2095, -0.9326]], grad_fn=<AddmmBackward0>) 需要找到最大值所在的列数,就是标签
res = res.max(axis=1)[1].item()
print("predicted label is {}, {} {} 8".format(res, val.item(), ('>' if res == 1 else '<')))
if __name__ == '__main__':
train()
pred(24)
pred(3.14)
pred(7.8) # 这个会预测错误,所以数据量对于深度学习很重要