嘿~全流程带你基于Pytorch手撸图片分类"框架"--HuClassify

一起养成写作习惯!这是我参与「掘金日新计划 · 4 月更文挑战」的第14天,点击查看活动详情

前言

鸽了两天,从星期二晚上就开始说要发布这篇文章,昨天还发布了一下关于这玩意的文章,今天早上果断删除,因为完整版来喽。好吧,其实看标题有点那啥了,其实我这次做的还是一个非常简单的小dome,一个非常小的分类dome,如果一定要说是框架的话,那么只能叫V0.1版本。使用的神经网络模型也算是非常简单的LeNet。 不过这个玩意我是仿造YoloV5的项目结构来写的,做一个快速了解嘛。

由于整个项目不复杂,而且项目也不是很完善,所以暂时不会考虑上传gayHub。如果要的话,评论区或者私信@我即可。

OK,那么咱们开始吧。

使用

先说说咱们的这个玩意长啥样,怎么式样吧。然后咱们再来说说具体的功能模块是如何实现的。

项目结构

在这里插入图片描述

和那个YOLO的项目长得是有点像哈。

那么作为用户,想要愉快玩耍这个项目,那么当然要做的就只有三个地方。 在这里插入图片描述

我们先一步一步说。

训练过程

准备数据集与配置

首先第一步当然是准备咱们的数据集啦。

这个数据集很简单,人家YOLO 有 YOLO 数据集,那么我们这里就有HU数据集。 这个数据集准备相当简单。 在这里插入图片描述 之后打开咱们的data的配置文件 在这里插入图片描述

进入训练

接下来移步到我们的训练文件。train.py 在这里插入图片描述 我们直接先找到咱们的超参数设置,这个乍一看和yolo还是有几分相像的。 在这里插入图片描述

我这里准备了一个数据集,就是分类数字,1 和 100。 你也可以准备多一点,这个无所谓。

训练显示

在你的参数都设置好了之后,那么你就可以开始你的训练了。 在这里插入图片描述 最后,你的结果会在这里 在这里插入图片描述 在这里插入图片描述 我们可以看到 log.txt 在这里插入图片描述

然后我这里设置了要tensorboard,这里咱们看看效果

tensorboard --logdir=runs/train/epx2/logs

复制代码

在这里插入图片描述 这样一个完整的训练过程走完了

使用模型

之后到了使用阶段。 在这里插入图片描述

这里有几个参数,设置都挺简单的,我就不在复述 了。 重点是这个。 在这里插入图片描述 例如这里,我叫它验证一张图片 在这里插入图片描述 在这里插入图片描述

编码实现

整个实现过程其实还是很简单的,就是前面调了一会儿。

配置文件

首先由于是V0.1版本嘛,所以读取的文件就是一个字典。 在这里插入图片描述


"""
参数设置
"""
import os

# 设置你的类别,训练集与验证集的类别要保持一致
Classes = 2
Classfiy = ["1","100"]

#数据集根目录 注意使用 /
Data_Root = r'F:\projects\PythonProject\MyClassfication\mydata'
#划分的训练集
Train = "train"
#划分的测试集
Valid = 'valid'

SaveWeights = "runs"


复制代码

读取配置文件

这里的话,就是工具类里面。 在这里插入图片描述

"""
第一个小dome,先从读取python字典开始走起。
"""

from data.ModelConfig import *

class ReadDict(object):

    @staticmethod
    def ReadModelClasses():

        label_dict=dict()
        index = 0
        #读取分类标签
        for label in Classfiy:
            label_dict[label]=index
            index+=1
        return label_dict

    @staticmethod
    def ReadDataRoot():

        return Data_Root
复制代码

HU数据集解析器

这个是咱们的第一个重点。 在这里插入图片描述 这里有两个,一个就是读取文件的嘛,读取数据集。


from  data.ModelConfig import *
import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import transforms
from utils.ReaderProcess.ReadDict import ReadDict

class MyDataSet(Dataset):
    def __init__(self, data_dir, transform=None):

        self.label_name = ReadDict.ReadModelClasses()

        self.data_info = self.get_img_info(data_dir)
        self.transform = transform

    def __getitem__(self, index):
        path_img, label = self.data_info[index]
        img = Image.open(path_img).convert('RGB')

        if self.transform is not None:
            img = self.transform(img)
        return img, label

    def __len__(self):
        return len(self.data_info)

    @staticmethod
    def get_img_info(data_dir):
        data_info = list()
        label_dict=ReadDict.ReadModelClasses()

        for root, dirs, _ in os.walk(data_dir): #
            # 遍历类别

            for sub_dir in dirs:
                img_names = os.listdir(os.path.join(root, sub_dir))
                img_names = list(filter(lambda x: x.endswith('.jpg'), img_names))

                # 遍历图片
                for i in range(len(img_names)):
                    img_name = img_names[i]
                    path_img = os.path.join(root, sub_dir, img_name)
                    label = label_dict[sub_dir]
                    data_info.append((path_img, int(label)))

        return data_info


if __name__ == '__main__':


    train_dir=Data_Root+"\\"+Train
    train_transform = transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor(),

    ])

    train_data = MyDataSet(data_dir=train_dir, transform=train_transform)
    # 构建DataLoder
    train_loader = DataLoader(dataset=train_data, batch_size=16)

    for data in train_data:
        img , label = data
        print(img.shape)




复制代码

然后,还有一个,tensor转换器



import torchvision.transforms as transforms

#传说之中的转换器!这里主要有两个,一个是送入神经网络训练的,还有一个是负责验证的。

class TransFormAtions(object):

    norm_mean = [0.485, 0.456, 0.406]
    norm_std = [0.229, 0.224, 0.225]
    train_transform = transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.RandomCrop(32, padding=4),
        transforms.ToTensor(),
        transforms.Normalize(norm_mean, norm_std),
    ])

    valid_transform = transforms.Compose([
        transforms.Resize((32, 32)),
        transforms.ToTensor(),
        transforms.Normalize(norm_mean, norm_std),
    ])

复制代码

上面的参数,是嫖的(norm_mean,norm_std)。

其他

这个都是在utils里面的,那就一块说了吧。 Log.py日志输出

"""
这个主要负责日志,日志输出,后面其实可以在这里实现,模型终短后恢复权重训练
"""

import os

def PrintLog(EXP_Path):
    # 保存打印日志
    # Open a file
    if(os.path.exists(EXP_Path)):
        Save_Log_print = EXP_Path+"\\log.txt"
        return open(file=Save_Log_print,mode='w',encoding='utf-8')
    else:
        raise Exception("文件异常")



复制代码

一些模型训练的复杂方法 ModelUtils.py


import random
import numpy as np
import torch


# 设置一个随机种子,这样保证两次随机生成的数是一样的,主要是为了初始化

def set_seed(seed=1):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)

复制代码

模型保存,这个是哪个文件就不用我说了吧。

"""
负责保存模型
"""
from data.ModelConfig import *
import os
import torch

def CreatRun(idx):
    Save_Path_Root = SaveWeights + "\\train" + "\\epx"+str(idx)
    if(not os.path.exists(Save_Path_Root)):
        os.makedirs(Save_Path_Root)

        return Save_Path_Root
    else:
        return CreatRun(idx+1)


def Save_Model(EXP_Path,weight_best,weight_last):

    if(os.path.exists(EXP_Path)):

        save_path_root = EXP_Path+"\\weights"
        if(not os.path.exists(save_path_root)):
            os.makedirs(save_path_root)

        save_path_best = save_path_root+"\\best.pth"
        save_path_last = save_path_root+"\\last.pth"
        torch.save(weight_best,save_path_best)
        torch.save(weight_last,save_path_last)
        print()
        print("best:",save_path_best)
        print("last:",save_path_last)

    else:
        raise Exception("保存地址异常")


if __name__ == '__main__':
    Save_Model(0,'1',"best")
复制代码

在这里插入图片描述

使用模型 LeNet

前面说过,深度学习呢,要具备两个条件,一个是天赋,一个是时间。 天赋就是咱们的神经网络结构,这里咱们选择一个简单的。

这里我们选择的网络为LeNet。 结构如下: 在这里插入图片描述

这块咱们搞一个2分类的来看看(数据集不好搞)。

图片输入假设是 3 x 32 x 32 batch size 为 4


import torch
from torch import nn
from torch.nn import Conv2d, MaxPool2d, Flatten, Linear, Sequential, CrossEntropyLoss
import torch.nn.functional as F


class LeNet(nn.Module):
    def __init__(self,classes):
        super().__init__()

        self.feature = Sequential(
            nn.Conv2d(3,6,kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2),
            nn.Conv2d(6,16,5), # A
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2)
        )

        self.classifiar = nn.Sequential(
            nn.Linear(16*5*5,120), # B
            nn.ReLU(),
            nn.Linear(120,84),
            nn.ReLU(),
            nn.Linear(84,classes)
        )
    def forward(self,x):
        x = self.feature(x)
        x = x.view(x.size()[0],-1)
        x = self.classifiar(x)
        return x

if __name__ == '__main__':
    net = LeNet(3)
    data = torch.randn(4,3,32,32)
    out = net(data)


复制代码

这个在咱们这个项目里面是这个 在这里插入图片描述


import torch
from torch import nn
from torch.nn import Conv2d, MaxPool2d, Flatten, Linear, Sequential, CrossEntropyLoss
import torch.nn.functional as F


class LeNet(nn.Module):
    def __init__(self,classes):
        super().__init__()

        self.feature = Sequential(
            nn.Conv2d(3,6,kernel_size=5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2),
            nn.Conv2d(6,16,5),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2)
        )

        self.classifiar = nn.Sequential(
            nn.Linear(16*5*5,120), # B
            nn.ReLU(),
            nn.Linear(120,84),
            nn.ReLU(),
            nn.Linear(84,classes)
        )
    def forward(self,x):
        x = self.feature(x)
        x = x.view(x.size()[0],-1)
        x = self.classifiar(x)
        return x

    def initialize_weights(self):
        #参数初始化,随便给点权重,这样的话会加快一点速度(训练)
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.xavier_normal_(m.weight.data)
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                nn.init.normal_(m.weight.data, 0, 0.1)
                m.bias.data.zero_()


if __name__ == '__main__':
    net = LeNet(3)
    data = torch.randn(4,3,32,32)
    out = net(data)
复制代码

训练实现

看到红色框哈 在这里插入图片描述

之后是咱们的重点。


import argparse
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from utils.DataSet import MyDataSet
from models.LeNet import LeNet
import torch.optim as optim
from utils import ModelUtils
from data.ModelConfig import *
from utils.DataSet.MyDataSet import MyDataSet
from utils.DataSet.TransformAtions import TransFormAtions
from utils.ReaderProcess.ReadDict import ReadDict
import os
from utils import SaveModel
from utils import Log
from torch.utils.tensorboard import SummaryWriter

def train():


    # 固定一个随机种子,
    # 固定住深度模型训练的过程,使得每次从头开始训练模型初始化方式和数据读取方式保持一致
    ModelUtils.set_seed()
    #初始化驱动
    device=None
    if (torch.cuda.is_available()):
        if(not opt.device=='cpu'):
            div = "cuda:"+opt.device
            # 这边后面还得做一个检测,看看有没有坑货,乱输入
            device = torch.device(div)
        print("\033[0;31;0m使用GPU训练中:{}\033[0m".format(torch.cuda.get_device_name()))
    else:
        device = torch.device("cpu")
        print("\033[0;31;40m使用CPU训练\033[0m")

    #创建 runs exp 文件
    EPX_Path = SaveModel.CreatRun(0)
    #日志相关的准备工作
    wirter = None
    openTensorboard=opt.tensorboardopen
    path_board=None
    if(openTensorboard):
        path_board = EPX_Path+"\\logs"
        wirter = SummaryWriter(path_board)

    fo = Log.PrintLog(EPX_Path)


    #准备数据集
    transformations = TransFormAtions()
    train_data_dir=opt.train_dir
    if(not train_data_dir):
        
        train_data_dir = ReadDict.ReadDataRoot()+"\\"+Train
        if(not os.path.exists(train_data_dir)):
            raise Exception("训练集路径错误")        

    train_data = MyDataSet(data_dir=train_data_dir, transform=transformations.train_transform)
    valid_data_dir = opt.valid_dir
    if(not valid_data_dir):
        valid_data_dir = ReadDict.ReadDataRoot()+"\\"+Valid
        if(not os.path.exists(valid_data_dir)):
            raise Exception("测试集路径错误")
    
    valid_data = MyDataSet(data_dir=valid_data_dir, transform=transformations.valid_transform)

    # 构建DataLoder
    train_loader = DataLoader(dataset=train_data, batch_size=opt.batch_size,num_workers=opt.works, shuffle=True)
    valid_loader = DataLoader(dataset=valid_data, batch_size=opt.batch_size)
    
    #开始进入网络训练
    #1 开始初始化网络,设置参数啥的
    #1.1 初始化网络
    net = LeNet(Classes)
    net.initialize_weights()
    net = net.to(device)

    #1.2选择交叉熵损失函数,做分类问题一般是选择这个损失函数的
    criterion = nn.CrossEntropyLoss() 

    #1.3设置优化器
    optimizer = optim.SGD(net.parameters(), lr=opt.lr, momentum=0.09)  # 选择优化器
    # 设置学习率下降策略,默认的也可以,那就不设置嘛,主要是不断去自动调整学习的那个速度
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.01)
    
    #2 开始进入训练步骤
    #2.1 进入网络训练

    Best_weight=None
    Best_Acc=0.0


    for epoch in range(opt.epochs):


        loss_mean = 0.0
        correct = 0.0
        total = 0.0
        current_Acc=0.0
        net.train()
        print("正在进行第{}轮训练".format(epoch + 1))
        for i, data in enumerate(train_loader):

            # forward
            inputs, labels = data
            inputs,labels = inputs.to(device),labels.to(device)

            outputs = net(inputs)

            # backward
            optimizer.zero_grad()
            loss = criterion(outputs, labels)
            loss.backward()

            # update weights
            optimizer.step()

            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).squeeze().sum()

            # 打印训练信息
            loss_mean += loss.item()
            current_Acc = correct / total
            if (i + 1) % opt.log_interval == 0:
                loss_mean = loss_mean / opt.log_interval

                info = "训练:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}"\
                .format\
                (
                    epoch, opt.epochs, i + 1, len(train_loader), loss_mean, current_Acc
                )
                print(info,file=fo)



                if(opt.show_log_console):
                    info_print = "\033[0;33;0m"+info+"\033[0m"
                    print(info_print)

                loss_mean = 0.0

        #tensorboard 绘图
        if (wirter):
            wirter.add_scalar("训练准确率",current_Acc, (epoch))
            wirter.add_scalar("训练损失均值",loss_mean,(epoch))

        # 保存效果最好的玩意
        if (current_Acc > Best_Acc):
            Best_weight = net.state_dict()

        scheduler.step()  # 更新学习率

        #2.2 进入训练对比阶段

        if (epoch + 1) % opt.val_interval == 0:
            correct_val = 0.0
            total_val = 0.0
            loss_val = 0.0
            current_Acc_val=0.0
            net.eval()
            with torch.no_grad():
                for j, data in enumerate(valid_loader):
                    inputs, labels = data
                    inputs, labels = inputs.to(device), labels.to(device)
                    outputs = net(inputs)
                    loss = criterion(outputs, labels)
                    loss_val += loss.item()
                    _, predicted = torch.max(outputs.data, 1)
                    total_val += labels.size(0)
                    correct_val += (predicted == labels).squeeze().sum()
                    current_Acc_val = correct_val / total_val


                info_val = "测试:\tEpoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format\
                        (
                        epoch, opt.epochs, j + 1, len(valid_loader), loss_val, current_Acc_val
                        )
                print(info_val,file=fo)
                if(opt.show_log_console):
                    info_print_val = "\033[0;31;0m"+info_val+"\033[0m"
                    print(info_print_val)

                if (wirter):
                    wirter.add_scalar("测试准确率", current_Acc_val, (epoch))
                    wirter.add_scalar("测试损失总值", loss_val, (epoch))

    # 最后一次的权重
    Last_weight = net.state_dict()

    #保存模型
    SaveModel.Save_Model(EPX_Path,Best_weight,Last_weight)

    fo.close()
    if(wirter):
        print("tensorboard dir is:",path_board)
    wirter.close()



if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--epochs', type=int, default=10)
    parser.add_argument('--batch-size', type=int, default=8)
    parser.add_argument('--lr', type=float, default=0.01)
    parser.add_argument('--log_interval', type=int, default=10)
    # 训练几轮测试一次
    parser.add_argument('--val_interval', type=int, default=1)
    parser.add_argument('--train_dir', type=str, default='')
    parser.add_argument('--valid_dir', type=str, default='')
    #如果是Mac系注意这个参数可能需要设置为1,本地训练,不推荐MAC
    parser.add_argument('--works',type=int,default=2)
    parser.add_argument('--show_log_console',type=bool,default=True)
    parser.add_argument('--device',type=str,default="0",help="默认使用显卡加速训练参数选择:0,1,2...or cpu")
    parser.add_argument('--tensorboardopen',type=bool,default=True)
    opt = parser.parse_args()
    train()
复制代码

识别使用实现

import argparse
from PIL import Image
from utils.DataSet.MyDataSet import MyDataSet
from utils.DataSet.TransformAtions import TransFormAtions

"""
这里不想写那么多东西,就是简单地去做一个测试就ok了。
其实做法就是在那个train里面的训练
"""

import argparse
import torch
from torch.utils.data import DataLoader
from models.LeNet import LeNet
from data.ModelConfig import *
import outProcess
def detect():


    ways = opt.valid_imgs
    transformations = TransFormAtions()

    net = LeNet(classes=Classes)
    state_dict_load = torch.load(opt.path_state_dict)
    net.load_state_dict(state_dict_load)

    if(ways):

        test_data = MyDataSet(data_dir=opt.valid_dir, transform=transformations.valid_transform)
        valid_loader = DataLoader(dataset=test_data, batch_size=1)

        net.eval()
        with torch.no_grad():
            for i, data in enumerate(valid_loader):
                # forward
                inputs, labels = data
                outputs = net(inputs)
                _, predicted = torch.max(outputs.data, 1)
                # 输出处理器

                outProcess.Function(predicted.numpy()[0])
    else:
        #指定的是单张图片,少给我来奇奇怪怪的输入,这个版本容错很差滴!!!
        path_img = opt.valid_dir
        if(".jpg" not in path_img):
            raise Exception("小爷打不开这图片")
        image = Image.open(path_img)
        image = transformations.valid_transform(image)
        image = torch.reshape(image, (1, 3, 32, 32))

        net.eval()
        with torch.no_grad():
            out = net(image)

            outProcess.Function(out.argmax(1).item())


if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    # False表示识别单张图片,True表示多张图片,此时指定路径即可。
    parser.add_argument('--valid_imgs',type=bool,default=False)
    parser.add_argument('--valid_dir', type=str, default=r'F:\projects\PythonProject\MyClassfication\mydata\train\100\1.jpg')
    parser.add_argument('--path_state_dict', type=str, default='runs/train/epx2/weights/best.pth')
    opt = parser.parse_args()
    detect()
复制代码

之后是咱们的处理器

from data.ModelConfig import *



def Function(out):
    print("类别为:", Classfiy[out])
复制代码

这个的其实就是个后置处理器,没啥

总结

到这里完整的代码都给出来,仔细看博客其实都可以直接复现出来。 下次咱们基于这格玩意做一个小游戏玩玩。结合 mediapipe做个玩玩~

猜你喜欢

转载自juejin.im/post/7086351389219094541