NLP常见基本问题和代码汇总


 一、背景

在做项目或者打比赛的时候,经常会用到一些trick来提升模型表现,废话不多说,直接上干货

二、常见tricks介绍

1. 对抗训练

对抗训练的概念就是在原始输入样本上加一个扰动,得到对抗样本后,用其进行训练。常见的有FGM,PGD等策略,一般是比较稳定提分的方法。在NLP领域的对抗训练的扰动是加载embedding上的。其主要做法就是根据模型第一次foward后得到embedding的梯度,让embedding层沿着梯度上升方向走一小步,来实现最佳扰动。几个比较好的参考解释如下:

对抗训练浅谈:意义、方法和思考(附Keras实现) - 科学空间|Scientific Spaces

https://kexue.fm/archives/7234

Nicolas:【炼丹技巧】功守道:NLP中的对抗训练 + PyTorch实现

https://zhuanlan.zhihu.com/p/91269728

对抗训练的理解,以及FGM、PGD和FreeLB的详细介绍__illusion_的博客-CSDN博客_对抗训练

https://blog.csdn.net/weixin_41712499/article/details/110878322

bert4torch代码实现

https://github.com/Tongjilibo/bert4torch/blob/master/bert4torch/snippets.py#L887

class FGM():    '''对抗训练    '''    def __init__(self, model):        self.model = model        self.backup = {}
    def attack(self, epsilon=1., emb_name='word_embeddings', **kwargs):        # emb_name这个参数要换成你模型中embedding的参数名        # 例如,self.emb = nn.Embedding(5000, 100)        for name, param in self.model.named_parameters():            if param.requires_grad and emb_name in name:                self.backup[name] = param.data.clone()                norm = torch.norm(param.grad) # 默认为2范数                if norm != 0 and not torch.isnan(norm):  # nan是为了apex混合精度时:                    r_at = epsilon * param.grad / norm                    param.data.add_(r_at)
    def restore(self, emb_name='emb', **kwargs):        # emb_name这个参数要换成你模型中embedding的参数名        for name, param in self.model.named_parameters():            if param.requires_grad and emb_name in name:                 assert name in self.backup                param.data = self.backup[name]        self.backup = {}

class PGD():    '''对抗训练    '''    def __init__(self, model):        self.model = model        self.emb_backup = {}        self.grad_backup = {}
    def attack(self, epsilon=1., alpha=0.3, emb_name='word_embeddings', is_first_attack=False, **kwargs):        # emb_name这个参数要换成你模型中embedding的参数名        for name, param in self.model.named_parameters():            if param.requires_grad and emb_name in name:                if is_first_attack:                    self.emb_backup[name] = param.data.clone()                norm = torch.norm(param.grad)                if norm != 0 and not torch.isnan(norm):  # nan是为了apex混合精度时                    r_at = alpha * param.grad / norm                    param.data.add_(r_at)                    param.data = self.project(name, param.data, epsilon)
    def restore(self, emb_name='emb', **kwargs):        # emb_name这个参数要换成你模型中embedding的参数名        for name, param in self.model.named_parameters():            if param.requires_grad and emb_name in name:                 assert name in self.emb_backup                param.data = self.emb_backup[name]        self.emb_backup = {}            def project(self, param_name, param_data, epsilon):        r = param_data - self.emb_backup[param_name]        if torch.norm(r) > epsilon:            r = epsilon * r / torch.norm(r)        return self.emb_backup[param_name] + r            def backup_grad(self):        for name, param in self.model.named_parameters():            # 修复如pooling层参与foward,但是不参与backward过程时grad为空的问题            if param.requires_grad and (param.grad is not None):                self.grad_backup[name] = param.grad.clone()        def restore_grad(self):        for name, param in self.model.named_parameters():            if param.requires_grad and (param.grad is not None):                param.grad = self.grad_backup[name]

2. 虚拟对抗训练

抽取一个随机标准正态扰动,加到embedding上,并用KL散度计算扰动的梯度,然后用得到的梯度,计算对抗扰动,并进行对抗训练,实现方法跟FGM差不多。特别提到的一点是,因为其思路也有额外的一致性损失的loss,因此可以用于半监督学习,在无监督数据集合上计算一致性的loss。参考链接如下:

泛化性乱弹:从随机噪声、梯度惩罚到虚拟对抗训练 - 科学空间|Scientific Spaces

https://kexue.fm/archives/7466

bert4torch代码实现

https://link.zhihu.com/?target=https%3A//github.com/Tongjilibo/bert4torch/blob/master/bert4torch/snippets.py%23L960

class VAT():    '''虚拟对抗训练 https://github.com/namisan/mt-dnn/blob/v0.2/alum/adv_masked_lm.py    '''    def __init__(self, model, emb_name='word_embeddings', noise_var=1e-5, noise_gamma=1e-6, adv_step_size=1e-3,                  adv_alpha=1, norm_type='l2', **kwargs):        self.model = model        self.noise_var = noise_var  # 噪声的方差        self.noise_gamma = noise_gamma # eps        self.adv_step_size = adv_step_size  # 学习率        self.adv_alpha = adv_alpha  # 对抗loss的权重        self.norm_type = norm_type  # 归一化方式        self.embed = None        for (name, module) in self.model.named_modules():            if emb_name in name:                module.register_forward_hook(hook=self.hook)
    def hook(self, module, fea_in, fea_out):        self.embed = fea_out        return None        def forward_(self, train_X, new_embed):        # 把原来的train_X中的token_ids换成embedding形式        if isinstance(train_X, (tuple, list)):            new_train_X = [new_embed] + train_X[1:]            adv_output = self.model.forward(*new_train_X) if self.model.forward.__code__.co_argcount >= 3 else self.model.forward(new_train_X)        elif isinstance(train_X, torch.Tensor):            adv_output = self.model.forward(new_embed)        return adv_output
    def virtual_adversarial_training(self, train_X, logits):        # 初始扰动 r        noise = self.embed.data.new(self.embed.size()).normal_(0, 1) * self.noise_var        noise.requires_grad_()        # x + r        new_embed = self.embed.data.detach() + noise        adv_output = self.forward_(train_X, new_embed)  # forward第一次        adv_logits = adv_output[0] if isinstance(adv_output, (list, tuple)) else adv_output        adv_loss = self.kl(adv_logits, logits.detach(), reduction="batchmean")        delta_grad, = torch.autograd.grad(adv_loss, noise, only_inputs=True)        norm = delta_grad.norm()        # 梯度消失,退出        if torch.isnan(norm) or torch.isinf(norm):            return None        # inner sum        noise = noise + delta_grad * self.adv_step_size        # projection        noise = self.adv_project(noise, norm_type=self.norm_type, eps=self.noise_gamma)        new_embed = self.embed.data.detach() + noise        new_embed = new_embed.detach()        # 在进行一次训练        adv_output = self.forward_(train_X, new_embed)  # forward第二次        adv_logits = adv_output[0] if isinstance(adv_output, (list, tuple)) else adv_output        adv_loss_f = self.kl(adv_logits, logits.detach())        adv_loss_b = self.kl(logits, adv_logits.detach())        # 在预训练时设置为10,下游任务设置为1        adv_loss = (adv_loss_f + adv_loss_b) * self.adv_alpha        return adv_loss        @staticmethod    def kl(inputs, targets, reduction="sum"):        """        计算kl散度        inputs:tensor,logits        targets:tensor,logits        """        loss = F.kl_div(F.log_softmax(inputs, dim=-1), F.softmax(targets, dim=-1), reduction=reduction)        return loss
    @staticmethod    def adv_project(grad, norm_type='inf', eps=1e-6):        """        L0,L1,L2正则,对于扰动计算        """        if norm_type == 'l2':            direction = grad / (torch.norm(grad, dim=-1, keepdim=True) + eps)        elif norm_type == 'l1':            direction = grad.sign()        else:            direction = grad / (grad.abs().max(-1, keepdim=True)[0] + eps)        return direction

3. 梯度惩罚

主要思路还是源于对抗训练,是论证了对输入样本施加对抗扰动,一定程度上等价于往loss里边加入“梯度惩罚”,使用方法其实就是在loss里面加入了对某些层(一般是embedding层)的梯度的平方项目

对抗训练浅谈:意义、方法和思考(附Keras实现) - 科学空间|Scientific Spaces

https://kexue.fm/archives/7234

4. R-drop

思路就是在监督学习loss外增加一个一致性损失(一般是kl散度),样本的构建方式也比较暴力,两次dropout(类似simcse的无监督训练),因此也可以用于半监督训练

bert4torch代码实现

https://github.com/Tongjilibo/bert4torch/blob/master/bert4torch/losses.py#L134

class RDropLoss(nn.Module):    '''R-Drop的Loss实现,官方项目:https://github.com/dropreg/R-Drop    '''    def __init__(self, alpha=4, rank='adjacent'):        super().__init__()        self.alpha = alpha        # 支持两种方式,一种是奇偶相邻排列,一种是上下排列        assert rank in {'adjacent', 'updown'}, "rank kwarg only support 'adjacent' and 'updown' "        self.rank = rank        self.loss_sup = nn.CrossEntropyLoss()        self.loss_rdrop = nn.KLDivLoss(reduction='none')
    def forward(self, *args):        '''支持两种方式: 一种是y_pred, y_true, 另一种是y_pred1, y_pred2, y_true        '''        assert len(args) in {2, 3}, 'RDropLoss only support 2 or 3 input args'        # y_pred是1个Tensor        if len(args) == 2:            y_pred, y_true = args            loss_sup = self.loss_sup(y_pred, y_true)  # 两个都算
            if self.rank == 'adjacent':                y_pred1 = y_pred[1::2]                y_pred2 = y_pred[::2]            elif self.rank == 'updown':                half_btz = y_true.shape[0] // 2                y_pred1 = y_pred[:half_btz]                y_pred2 = y_pred[half_btz:]        # y_pred是两个tensor        else:            y_pred1, y_pred2, y_true = args            loss_sup = self.loss_sup(y_pred1, y_true)
        loss_rdrop1 = self.loss_rdrop(F.log_softmax(y_pred1, dim=-1), F.softmax(y_pred2, dim=-1))        loss_rdrop2 = self.loss_rdrop(F.log_softmax(y_pred2, dim=-1), F.softmax(y_pred1, dim=-1))        return loss_sup + torch.mean(loss_rdrop1 + loss_rdrop2) / 4 * self.

5. mix-up

之前在看美团一篇小样本的文章时候,提到mix-up是提分利器。mix-up源于cv领域,主要思路是在训练数据中抽取两个样本,构造混合样本和混合标签,用于新的增广数据。直观的看,其实就是希望模型输入未另外两个输入的线性组合时,输出也是各自输出的线性组合,即希望模型近似未一个线性系统,来防止过拟合。参考如下

小样本学习及其在美团场景中的应用

https://tech.meituan.com/2021/08/19/low-resource-learning.html

https://github.com/Tongjilibo/bert4torch/blob/master/bert4torch/layers.py#L1297

https://github.com/Tongjilibo/bert4torch/blob/master/bert4torch/layers.py#L1297

class MixUp(nn.Module):    '''mixup方法实现        method: embed, encoder分别表示在embedding和encoder层面做mixup, None表示mix后续处理, hidden表示对隐含层做mixup    '''    def __init__(self, method='encoder', alpha=1.0, layer_mix=None):        super().__init__()        assert method in {'embed', 'encoder', 'hidden', None}        self.method = method        self.alpha = alpha        self.perm_index = None        self.lam = 0        self.layer_mix = layer_mix  # 需要mix的隐含层index        def get_perm(self, inputs):        if isinstance(inputs, torch.Tensor):            return inputs[self.perm_index]        elif isinstance(inputs, (list, tuple)):            return [inp[self.perm_index] if isinstance(inp, torch.Tensor) else inp for inp in inputs]        def mix_up(self, output, output1):        if isinstance(output, torch.Tensor):            return self.lam * output + (1.0-self.lam) * output1        elif isinstance(output, (list, tuple)):            output_final = []            for i in range(len(output)):                if output[i] is None: # conditional_emb=None                    output_final.append(output[i])                elif (not output[i].requires_grad) and (output[i].dtype in {torch.long, torch.int}):                    # 不是embedding形式的                    output_final.append(torch.max(output[i], output1[i]))                else:                    output_final.append(self.lam * output[i] + (1.0-self.lam) * output1[i])            return output_final        else:            raise ValueError('Illegal model output')
    def encode(self, model, inputs):        batch_size = inputs[0].shape[0]        device = inputs[0].device        self.lam = np.random.beta(self.alpha, self.alpha)        self.perm_index = torch.randperm(batch_size).to(device)
        if self.method is None:            output = model(inputs)            output1 = self.get_perm(output)            return [output, output1]
        elif self.method == 'encoder':            output = model(inputs)            output1 = self.get_perm(output)            output_final = self.mix_up(output, output1)
        elif self.method == 'embed':            output = model.apply_embeddings(inputs)            output1 = self.get_perm(output)            output_final = self.mix_up(output, output1)            # Main            output_final = model.apply_main_layers(output_final)            # Final            output_final = model.apply_final_layers(output_final)                elif self.method == 'hidden':            if self.layer_mix is None:                # 这里暂时只考虑encoderLayer, 不考虑decoderLayer和seq2seq模型结构                try:                    layer_mix = random.randint(0, len(model.encoderLayer))                except:                    warnings.warn('LayerMix random failded')                    layer_mix = 0            else:                layer_mix = self.layer_mix                        def apply_on_layer_end(l_i, output):                if l_i == layer_mix:                    output1 = self.get_perm(output)                    return self.mix_up(output, output1)                else:                    return output            model.apply_on_layer_end = apply_on_layer_end            output_final = model(inputs)        return output_final        def forward(self, criterion, y_pred, y_true):        '''计算loss        '''        y_true1 = y_true[self.perm_index]        return self.lam * criterion(y_pred, y_true) + (1 - self.lam) * criterion(y_pred, y_true1)

6. UDA

笔者认为,UDA, VAT, R-drop的思路都是在正常的监督学习损失外,增加一个一致性损失,只是其构造一致性损失样本的方式不太一样,VAT是对embedding层加扰动,R-drop是两次dropout, UDA则是直接在文本层面,利用EDA、回译等策略来构造相似样本,三者都可以应用在半监督学习中

bert4torch代码实现

https://github.com/Tongjilibo/bert4torch/blob/master/bert4torch/losses.py#L172

​​​​​​​​​​​​​​

class UDALoss(nn.Module):    '''UDALoss,使用时候需要继承一下,因为forward需要使用到global_step和total_steps    https://arxiv.org/abs/1904.12848    '''    def __init__(self, tsa_schedule=None, total_steps=None, start_p=0, end_p=1, return_all_loss=True):        super().__init__()        self.loss_sup = nn.CrossEntropyLoss()        self.loss_unsup = nn.KLDivLoss(reduction='batchmean')        self.tsa_schedule = tsa_schedule        self.start = start_p        self.end = end_p        if self.tsa_schedule:            assert self.tsa_schedule in {'linear_schedule', 'exp_schedule', 'log_schedule'}, 'tsa_schedule config illegal'        self.return_all_loss = return_all_loss
    def forward(self, y_pred, y_true_sup, global_step, total_steps):        sup_size = y_true_sup.size(0)        unsup_size = (y_pred.size(0) - sup_size) // 2
        # 有监督部分, 用交叉熵损失        y_pred_sup = y_pred[:sup_size]        if self.tsa_schedule is None:            loss_sup = self.loss_sup(y_pred_sup, y_true_sup)        else:  # 使用tsa来去掉预测概率较高的有监督样本            threshold = self.get_tsa_threshold(self.tsa_schedule, global_step, total_steps, self.start, self.end)            true_prob = torch.gather(F.softmax(y_pred_sup, dim=-1), dim=1, index=y_true_sup[:, None])            sel_rows = true_prob.lt(threshold).sum(dim=-1).gt(0)  # 仅保留小于阈值的样本            loss_sup = self.loss_sup(y_pred_sup[sel_rows], y_true_sup[sel_rows]) if sel_rows.sum() > 0 else 0
        # 无监督部分,这里用KL散度,也可以用交叉熵        y_true_unsup = y_pred[sup_size:sup_size+unsup_size]        y_true_unsup = F.softmax(y_true_unsup.detach(), dim=-1)        y_pred_unsup = F.log_softmax(y_pred[sup_size+unsup_size:], dim=-1)        loss_unsup = self.loss_unsup(y_pred_unsup, y_true_unsup)        if self.return_all_loss:            return loss_sup + loss_unsup, loss_sup, loss_unsup        else:            return loss_sup + loss_unsup
    @ staticmethod    def get_tsa_threshold(schedule, global_step, num_train_steps, start, end):        training_progress = global_step / num_train_steps        if schedule == "linear_schedule":            threshold = training_progress        elif schedule == "exp_schedule":            scale = 5            threshold = math.exp((training_progress - 1) * scale)        elif schedule == "log_schedule":            scale = 5            threshold = 1 - math.exp((-training_progress) * scale)        return threshold * (end - start) + start

三、实验对比

下面是以情感分类为例,cls分类的结果,这里取结果是取valid最优的那个epoch,从结果来看,大部分策略是有点效果的

四、代码

talk is cheap, show me the code。全部代码测试都是基于bert4torch框架,这是一个基于pytorch的训练框架,前期以效仿和实现bert4keras的主要功能为主,特点是尽量简洁轻量,提供丰富示例,有兴趣的小伙伴可以试用,欢迎star。

bert4torch

https://github.com/Tongjilibo/bert4torch/tree/master/examples/training_trick

猜你喜欢

转载自blog.csdn.net/qq_38735017/article/details/126228081