Numpy实现BP神经网络(包含Dropout、BN等训练技巧)

BP神经网络

简介

本文主要通过在MNIST数据集上使用全连接神经网络对比有无归一化、有无Dropout以及有无Batch Normalization进行训练,可视化分析常用的深度网络训练技巧的原因及效果。

网络结构

网络结构类似下图,输入层定为784(输入图片特征数),隐藏层1有512个神经元(tanh激活),隐藏层2有512个神经元(tanh激活),输出层有10个神经元(softmax激活,得到10个类别的概率分布)。

在这里插入图片描述

训练Pipeline

输入数据

将输入的一个batch的数据的x处理为[b, 784],y通过onehot编码处理为[b, 10],

前向传播

前向计算,得到各层的输入和输出。

反向传播

按照BP规则,计算各个参数的梯度。

参数优化

按照Adam算法,对参数进行更新。

源码

由于模块较多,这里直接给出最为核心的model源码,其他模块可以在文末的Github找到。

"""
Author: Zhou Chen
Date: 2019/12/4
Desc: 构建模型
"""
import numpy as np
from initializers import xavier, zero
from utils import onehot
from activations import tanh, softmax, softmax_gradient, tanh_gradient
from losses import cross_entropy
from optimizers import SGD, Adam


def dropout(x, p):
    """
    以概率p丢弃神经元连接,为了处理方便采用反向Dropout思路,该方法无需修改测试网络
    """
    keep_prob = 1 - p
    # z这里写的时候犯了一个错误,就是不应该批量生成概率矩阵,而是生成的概率矩阵批量重复
    d_temp = np.random.binomial(1, keep_prob, size=x.shape[1:]) / keep_prob
    d_temp = d_temp.reshape(-1)
    x_dropout = x * d_temp
    return x_dropout, d_temp


class Model(object):

    def __init__(self, num_layers, units_list=None, initializer=None, optimizer='adam'):
        self.weight_num = num_layers - 1
        # 根据传入的初始化方法初始化参数,本次实验只实现xavier和全0初始化
        self.params = xavier(num_layers, units_list) if initializer == 'xavier' else zero(num_layers, units_list)
        self.optimizer = Adam(weights=self.params, weight_num=self.weight_num) if optimizer == 'adam' else SGD()
        self.bn_param = {}

    def forward(self, x, dropout_prob=None):
        """
        前向传播,针对一个mini-batch处理
        """
        net_inputs = []  # 各层的输入
        net_outputs = []  # 各层激活后的输出
        net_d = []
        # 为了层号对应,将输入层直接添加
        net_inputs.append(x)
        net_outputs.append(x)
        net_d.append(np.ones(x.shape[1:]))  # 输入层无丢弃概率
        for i in range(1, self.weight_num):  # 参数数量比层数少1
            x = x @ self.params['w'+str(i)].T
            net_inputs.append(x)
            x = tanh(x)
            if dropout_prob:
                # 训练阶段丢弃
                x, d_temp = dropout(x, dropout_prob)
                net_d.append(d_temp)
            net_outputs.append(x)
        out = x @ self.params['w'+str(self.weight_num)].T
        net_inputs.append(out)
        out = softmax(out)
        net_outputs.append(out)
        return {'net_inputs': net_inputs, 'net_outputs': net_outputs, 'd': net_d}, out

    def backward(self, nets, y, pred, dropout_prob=None):
        """
        dz[out] = out - y
        dw[out] = dz[out] @ outputs[out-1].T
        db[out] = dz[out]
        dz[i] = W[i+1]dz[i+1] * grad(z[i])
        dw[i] = dz[i] @ outputs[i-1]
        db[i] = dz[i]sa

        """
        grads = dict()
        grads['dz'+str(self.weight_num)] = (pred - y)  # [b, 10]
        grads['dw'+str(self.weight_num)] = grads['dz'+str(self.weight_num)].T @ nets['net_outputs'][self.weight_num-1]  #[10, 512]

        for i in reversed(range(1, self.weight_num)):
            temp = grads['dz' + str(i + 1)] @ self.params['w' + str(i + 1)] * tanh_gradient(nets['net_inputs'][i])
            if dropout_prob:
                temp = temp * nets['d'][i] / (1-dropout_prob)
            grads['dz'+str(i)] = temp   # [b, 128]
            grads['dw'+str(i)] = grads['dz'+str(i)].T @ nets['net_outputs'][i-1]
        return grads

    def train(self, data_loader, valid_loader, epochs, learning_rate, dropout_prob=None):
        losses_train = []
        losses_valid = []
        for epoch in range(epochs):
            print("epoch", epoch)
            # 训练部分
            epoch_loss_train = 0
            for step, (x, y) in enumerate(data_loader):
                # x:[b, 28, 28] -> [b, 784] , y:[b, 1] -> [b, 10]
                x = x.reshape(-1, 28 * 28)
                y = onehot(y, 10)
                nets, pred = self.forward(x, dropout_prob)
                loss = cross_entropy(y, pred)
                epoch_loss_train += loss
                grads = self.backward(nets, y, pred, dropout_prob)
                # SGD更新参数
                # self.params = optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])
                self.params = self.optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])

                if step % 100 == 0:
                    print("epoch {} training step {} loss {:.4f}".format(epoch, step, loss))
            losses_train.append(epoch_loss_train)
            print(epoch_loss_train)
            data_loader.restart()
            # 验证部分,只进行前向传播
            epoch_loss_valid = 0
            for step, (x, y) in enumerate(valid_loader):
                x = x.reshape(-1, 28 * 28)
                y = onehot(y, 10)
                nets, pred = self.forward(x, dropout_prob)
                loss = cross_entropy(y, pred)
                epoch_loss_valid += loss

                if step % 100 == 0:
                    print("epoch {} validation step {} loss {:.4f}".format(epoch, step, loss))
            losses_valid.append(epoch_loss_valid)
            valid_loader.restart()
        his = {'train_loss': losses_train, 'valid_loss': losses_valid}
        return his

    def batch_norm(self, x, layer_index, mode):
        epsilon = 1e-6
        momentum = 0.9
        N, D = x.shape
        global_mean = self.bn_param.get('global_mean' + str(layer_index), np.zeros(D, dtype=x.dtype))
        global_var = self.bn_param.get('global_var' + str(layer_index), np.zeros(D, dtype=x.dtype))
        cache = None
        if mode == 'train':
            # 计算当前batch的均值和方差
            sample_mean = np.mean(x, axis=0)
            sample_var = np.var(x, axis=0)
            x_hat = (x - sample_mean) / np.sqrt(sample_var + epsilon)
            out = self.params['gamma' + str(layer_index)] * x_hat + self.params['beta' + str(layer_index)]  # bn结束
            global_mean = momentum * global_mean + (1 - momentum) * sample_mean
            global_var = momentum * global_var + (1 - momentum) * sample_var
            cache = {'x': x, 'x_hat': x_hat, 'sample_mean': sample_mean, 'sample_var': sample_var}
        else:
            # 测试模式,使用全局均值和方差标准化
            x_hat = (x - global_mean) / np.sqrt(global_var + epsilon)
            out = self.params['gamma' + str(layer_index)] * x_hat + self.params['beta' + str(layer_index)]

        self.bn_param['global_mean' + str(layer_index)] = global_mean
        self.bn_param['global_var' + str(layer_index)] = global_var
        return out, cache

    def forward_bn(self, x, bn_mode='train'):
        """
        带BN层的前向传播
        """

        net_inputs = []
        net_outputs = []
        caches = []
        net_inputs.append(x)
        net_outputs.append(x)
        caches.append(x)

        for i in range(1, self.weight_num):
            # 所有隐层的输入都进行BN,输入层和输出层不进行BN
            x = x = x @ self.params['w'+str(i)].T
            net_inputs.append(x)
            x, cache = self.batch_norm(x, i, bn_mode)  # 可以将BN理解为加在隐藏层神经元输入和输出间可训练的一层
            caches.append(cache)
            x = tanh(x)
            net_outputs.append(x)
        out = x @ self.params['w' + str(self.weight_num)].T
        net_inputs.append(out)
        out = softmax(out)
        net_outputs.append(out)

        return {'net_inputs': net_inputs, 'net_outputs': net_outputs, 'cache': caches}, out

    def backward_bn(self, nets, y, pred):
        """
        加入BN层的反向传播
        """
        epsilon = 1e-6
        momentum = 0.9
        grads = dict()
        # 求解输出层梯度,依据链式法则,无BN
        grads['dz' + str(self.weight_num)] = (pred - y)
        grads['dw' + str(self.weight_num)] = grads['dz' + str(self.weight_num)].T @ nets['net_outputs'][self.weight_num - 1]
        for i in reversed(range(1, self.weight_num)):
            N = nets['cache'][i]['x'].shape[0]
            grads['dz'+str(i)] = grads['dz' + str(i + 1)] @ self.params['w' + str(i + 1)]
            grads['dgamma'+str(i)] = np.sum(grads['dz'+str(i)] * nets['cache'][i]['x_hat'])
            grads['dbeta'+str(i)] = np.sum(grads['dz'+str(i)], axis=0)

            dx_hat = grads['dz'+str(i)] * self.params['gamma'+str(i)]
            dsigma = -0.5 * np.sum(dx_hat * (nets['cache'][i]['x'] - nets['cache'][i]['sample_mean']), axis=0) * np.power(nets['cache'][i]['sample_var'][i] + epsilon, -1.5)
            dmu = -np.sum(dx_hat / np.sqrt(nets['cache'][i]['sample_var'] + epsilon), axis=0) - 2 * dsigma * np.sum(nets['cache'][i]['x'] - nets['cache'][i]['sample_mean'], axis=0) / N
            dx = dx_hat / np.sqrt(nets['cache'][i]['sample_var'] + epsilon) + 2.0 * dsigma * (nets['cache'][i]['x'] - nets['cache'][i]['sample_mean']) / N + dmu / N
            temp = dx * tanh_gradient(nets['net_inputs'][i])
            grads['dw'+str(i)] = temp.T @ nets['net_outputs'][i-1]
        return grads

    def train_bn(self, data_loader, valid_loader, epochs, learning_rate):
        losses_train = []
        losses_valid = []
        for epoch in range(epochs):
            print("epoch", epoch)
            epoch_loss_train = 0
            # 重置全局均值和方差
            # 批量训练
            for step, (x, y) in enumerate(data_loader):
                # x:[b, 28, 28] -> [b, 784] , y:[b, 1] -> [b, 10]
                x = x.reshape(-1, 28 * 28)
                y = onehot(y, 10)
                nets, pred = self.forward_bn(x, bn_mode='train')
                grads = self.backward_bn(nets, y, pred)
                self.optimizer.optimize(self.weight_num, self.params, grads, y.shape[0])
                loss = cross_entropy(y, pred)
                epoch_loss_train += loss
                if step % 100 == 0:
                    print("epoch {} step {} loss {:.4f}".format(epoch, step, loss))
            losses_train.append(epoch_loss_train)
            data_loader.restart()
            print(epoch_loss_train)
            # 验证集测试
            epoch_loss_valid = 0
            for step, (x, y) in enumerate(valid_loader):
                x = x.reshape(-1, 28 * 28)
                y = onehot(y, 10)
                nets, pred = self.forward_bn(x, bn_mode='test')
                loss = cross_entropy(y, pred)
                epoch_loss_valid += loss
                if step % 100 == 0:
                    print("epoch {} step {} loss {:.4f}".format(epoch, step, loss))
            losses_valid.append(epoch_loss_valid)
            valid_loader.restart()
        his = {'train_loss': losses_train, 'valid_loss': losses_valid}

        return his

    def predict(self, data_loader, bn=False):
        labels = []
        pred = []
        losses = 0
        for (x, y) in data_loader:
            x = x.reshape(-1, 28 * 28)
            y = onehot(y, 10)
            if bn:
                _, out = self.forward_bn(x, 'test')
            else:
                _, out = self.forward(x)
            loss = cross_entropy(y, out)
            losses += loss
            out = list(np.argmax(out, axis=-1).flatten())
            y = list(np.argmax(y, axis=1).flatten())
            labels += y
            pred += out

        return np.array(pred).astype('int'), np.array(labels).astype('int')

训练效果

对比有无归一化

主要用途

调整输入层数值尺度,以便统一使用梯度下降优化时统一各层参数优化的学习率。(不然输入层需要使用较低学习率)

使用效果

收敛更快 即损失下降更快。

训练集

在这里插入图片描述

验证集
在这里插入图片描述

测试集

在这里插入图片描述在这里插入图片描述

对比有无Dropout

主要用途

随机关闭神经元以减少神经元之间的相关度,从而逼迫神经网络进行更加复杂的学习,有效抑制过拟合。

使用效果

训练收敛变慢,测试效果变好。

训练集

在这里插入图片描述

验证集

在这里插入图片描述

测试集

在这里插入图片描述在这里插入图片描述

对比有无Batch Normalization

主要用途

将各层网络的输入统一为标准正态分布,加快网络的学习,有效解决训练的相关问题。

使用效果

训练加速。

训练集

在这里插入图片描述
验证集

在这里插入图片描述

测试集

在这里插入图片描述在这里插入图片描述

补充说明

本案例均使用Numpy手写神经网络的训练,如有疏漏之处欢迎之处。源码开源于我的Github,欢迎star和fork。

发布了225 篇原创文章 · 获赞 109 · 访问量 13万+

猜你喜欢

转载自blog.csdn.net/zhouchen1998/article/details/103601138