提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
前言
提示:这里可以添加本文要记录的大概内容:
例如:随着人工智能的不断发展,机器学习这门技术也越来越重要,很多人都开启了学习机器学习,本文就介绍了机器学习的基础内容。
提示:以下是本篇文章正文内容,下面案例可供参考
一、闲聊机器人
常见的闲聊模型都是一种seq2seq的结构实现。
1.1 Seq2Seq模型
Sequence to sequence (seq2seq)
是由encoder(编码器)
和decoder(解码器)
两个RNN的组成的。其中encoder负责对输入句子的理解,转化为context vector
,decoder负责对理解后的句子的向量进行处理,解码,获得输出。上述的过程和我们大脑理解东西的过程很相似,听到一句话,理解之后,尝试组装答案,进行回答
decoder:语言生成模型
总之:Seq2seq模型中的encoder接受一个长度为M的序列,得到1个 context vector,之后decoder把这一个context vector转化为长度为N的序列作为输出,从而构成一个M to N
的模型,能够处理很多不定长输入输出的问题,比如:文本翻译,问答,文章摘要,关键字写诗等等
1.2 Seq2Seq模型的实现
- 文本转化为序列(数字序列,
torch.LongTensor
) - 使用序列,准备数据集,准备
Dataloader
- 完成编码器
- 完成解码器
- 完成seq2seq模型
- 完成模型训练的逻辑,进行训练
- 完成模型评估的逻辑,进行模型评估
1.2.1 数据处理
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import numpy as np
import torch
from typing import List
class NumSequence:
PAD_TAG = "PAD"
PAD = 0
UNK_TAG = "UNK"
UNK = 1
def __init__(self,use_padding:bool=True,max_len:int=20):
self.use_padding=use_padding
self.max_len=max_len
if use_padding:
self.dict = {
self.PAD_TAG: self.PAD, self.UNK_TAG: self.UNK}
else:
self.dict={
}
for i in range(10):
self.dict[str(i)] = len(self.dict)
self.inverse_dict = dict(zip(self.dict.values(), self.dict.keys()))
def transform(self, sentence: List):
"""
字符串转为序列
:param sentence:
:param max_len:
:return:
"""
if self.use_padding:
if len(sentence) > self.max_len:
sentence = sentence[:self.max_len]
else:
sentence = sentence + [self.PAD_TAG] * (self.max_len - len(sentence))
return [self.dict.get(i, self.UNK) for i in sentence]
def inverse_transform(self, indices):
"""
序列转为字符串
:param indices:
:return:
"""
return [self.inverse_dict.get(i, self.UNK_TAG) for i in indices]
class NumDataset(Dataset):
def __init__(self):
self.data = np.random.randint(0, 1e8, size=[500000])
def __getitem__(self, index):
input = list(str(self.data[index]))
label = input + ['0']
return label, input
def __len__(self):
return len(self.data)
num_sequence = NumSequence()
def collate_fn(batch):
batch = list(zip(*batch))
texts = [num_sequence.transform(i) for i in batch[1]]
labels = [num_sequence.transform(i) for i in batch[0]]
labels=torch.tensor(labels,dtype=torch.int32)
del batch
return labels, texts
train_data_loader = DataLoader(NumDataset(), batch_size=10, shuffle=True, collate_fn=collate_fn)
if __name__ == "__main__":
print(NumDataset().data[0])
for label, data in train_data_loader:
print(data)
print(label)
break
1.2.2 Encoder实现
import torch
import torch.nn as nn
import dataset
from torch import Tensor
from torch.utils.data import DataLoader
from dataset import NumDataset,NumSequence
from torch.nn.utils.rnn import pack_padded_sequence,pad_packed_sequence
num_sequence=NumSequence()
class Encoder(nn.Module):
def __init__(self):
super(Encoder,self).__init__()
self.embedding=nn.Embedding(len(num_sequence),100,padding_idx=num_sequence.PAD)
self.gru=nn.GRU(100,hidden_size=64,num_layers=1,bidirectional=False)
def forward(self,input:Tensor,input_length):
"""
:param input: batch_size,seq_len
:return:
"""
embedded=self.embedding(input) ## [batch_size,seq_len,embedding_dim]
embedded = torch.transpose(embedded, 0, 1)
embedded=pack_padded_sequence(embedded,lengths=input_length)
out,hidden=self.gru(embedded)
out,output_length=pad_packed_sequence(out,padding_value=num_sequence.PAD)
return out,hidden,output_length
def collate_fn(batch):
batch=sorted(batch,key=lambda x:x[3],reverse=True)
batch = list(zip(*batch))
texts = [num_sequence.transform(i) for i in batch[1]]
labels = [num_sequence.transform(i) for i in batch[0]]
labels = torch.tensor(labels, dtype=torch.int32)
texts=torch.tensor(texts,dtype=torch.int32)
input_length=batch[3]
del batch
return labels, texts,input_length
train_data_loader = DataLoader(NumDataset(), batch_size=128, shuffle=True, collate_fn=collate_fn)
if __name__=="__main__":
encode = Encoder()
print(encode)
for labels,input,input_length in train_data_loader:
out,hidden,output_length=encode(input,input_length)
print(out.size())
print(hidden.shape)
print(output_length)
break
补充:
- 在LSTM和GRU中,每个
time step
的输入会进行计算,得到结果,整个过程是一个和句子长度相关的一个循环,手动实现速度较慢
- pytorch中实现了
nn.utils.rnn.pack_padded_sequence
对padding后的句子进行打包的操作能够更快获得LSTM or GRU的结果- 同时实现了
nn.utils.rnn.pad_packed_sequence
对打包的内容进行解包的操作nn.utils.rnn.pack_padded_sequence
使用过程中需要对batch中的内容按照句子的长度降序排序
1.2.3 Decoder 解码器
加码器主要负责实现对编码之后结果的处理,得到预测值,为后续计算损失做准备。
此时需要考虑以下问题:
-
使用什么样的损失函数,预测值需要是什么格式的
- 结合之前的经验,我们可以理解为当前的问题是一个分类的问题,即每次的输出其实对选择一个概率最大的词
- 真实值的形状是
[batch_size,max_len]
,从而我们知道输出的结果需要是一个[batch_size,max_len,vocab_size]
的形状 - 即预测值的最后一个维度进行计算log_softmax,然后和真实值进行相乘,从而得到损失
-
如何把编码结果
[1,batch_size,hidden_size]
进行操作,得到预测值。解码器也是一个RNN,即也可以使用LSTM or GRU的结构,所以在解码器中:-
通过循环,每次计算的一个time step的内容
-
编码器的结果作为初始的隐层状态,定义一个
[batch_size,1]
的全为SOS
的数据作为最开始的输入,告诉解码器,要开始工作了 -
通过解码器预测一个输出
[batch_size,hidden_size]
(会进行形状的调整为[batch_size,vocab_size]
),把这个输出作为输入再使用解码器进行解码 -
上述是一个循环,循环次数就是句子的最大长度,那么就可以得到
max_len
个输出 -
把所有输出的结果进行concate,得到
[batch_size,max_len,vocab_size]
-
-
在RNN的训练过程中,使用前一个预测的结果作为下一个step的输入,可能会导致
一步错,步步错的结果
,如果提高模型的收敛速度?- 可以考虑在训练的过程中,把真实值作为下一步的输入,这样可以避免
步步错的局面
- 同时在使用真实值的过程中,仍然使用预测值作为下一步的输入,两种输入随机使用
- 上述这种机制我们把它称为
Teacher forcing
,就像是一个指导老师,在每一步都会对我们的行为进行纠偏,从而达到在多次训练之后能够需要其中的规律
- 可以考虑在训练的过程中,把真实值作为下一步的输入,这样可以避免
1.2.4 Decoder 解码器实现
import torch
import torch.nn as nn
from torch import Tensor
from dataset import NumSequence
import torch.nn.functional as F
num_sequece=NumSequence()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
class Decoder(nn.Module):
def __init__(self):
super(Decoder, self).__init__()
self.embedding=nn.Embedding(len(num_sequece.dict),100,padding_idx=num_sequece.PAD)
self.gru=nn.GRU(100,64,num_layers=1,bidirectional=False)
self.fc=nn.Linear(64,len(num_sequece.dict))
def forward(self, encoder_hidden: Tensor,target):
decoder_hidden=encoder_hidden
batch_size=target.size(0)
decoder_hidden.to(device)
decoder_input=torch.ones(size=(batch_size,1),dtype=torch.int32)*num_sequece.SOS
decoder_input.to(device)
decoder_outputs=torch.zeros(size=(num_sequece.max_len+1,batch_size,len(num_sequece)))
decoder_outputs.to(device)
for t in range(num_sequece.max_len+1):
decoder_hidden.to(device)
decoder_output_t,decoder_hidden=self.forward_step(decoder_input,decoder_hidden)
decoder_output_t.to(device)
decoder_outputs[t,:,:]=decoder_output_t
value,index=torch.topk(decoder_output_t,1,dim=-1)
decoder_input=index
return decoder_outputs,decoder_hidden
def forward_step(self,decoder_input,decoder_hidden):
decoder_hidden.to(device)
decoder_input.to(device)
embedded=self.embedding(decoder_input)
embedded.to(device)
## [seq_len,batch_size,embedding_dim]
embedded=torch.transpose(embedded,0,1).to(device)
embedded.to(device)
# pack_padded_sequence(embedded,lengths=input_length)
out,decoder_hidden=self.gru(embedded,decoder_hidden)
out.to(device)
decoder_hidden.to(device)
## [seq_len,batch_size,hidden_size]
out=out.squeeze(0) ## [batch_size,hidden_size]
out.to(device)
out=self.fc(out)
output=F.log_softmax(out,-1) ## [batch_size,vob_size]
return output,decoder_hidden
1.2.5 模型与训练的实现
from dataset import train_data_loader
import torch.nn as nn
from encoder import Encoder
from decoder import Decoder
class Seq2Seq(nn.Module):
def __init__(self):
super(Seq2Seq,self).__init__()
self.encoder=Encoder()
self.decoder=Decoder()
def forward(self,input,target,input_length):
encoder_outputs,encoder_hidden,output_lengths=self.encoder(input,input_length)
decoder_outputs,decoder_hidden=self.decoder(encoder_hidden,target)
return decoder_outputs,decoder_hidden
import torch
from dataset import train_data_loader
from encoder import Encoder
from decoder import Decoder
from seq2seq import Seq2Seq
from torch import Tensor
import torch.optim as optim
from tqdm import tqdm
from dataset import NumSequence
numsequence=NumSequence()
import torch.nn.functional as F
def train(epoches:int=10):
seq=Seq2Seq()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
seq.to(device)
optimizer=optim.Adam(seq.parameters(),lr=1e-3)
for epoch in range(epoches):
train_bar = tqdm(train_data_loader)
for data in train_bar:
label,input,input_length=data
optimizer.zero_grad()
decoder_outputs,_=seq(input.to(device),label.to(device),input_length)
decoder_outputs=torch.transpose(decoder_outputs.to(device),0,1)
decoder_outputs=decoder_outputs.contiguous().to(device)
decoder_outputs.to(device)
decoder_outputs=decoder_outputs.view(decoder_outputs.size(0)*decoder_outputs.size(1),-1)
label=label.view(-1).to(device)
loss=F.nll_loss(decoder_outputs.to(device),label.to(device),ignore_index=numsequence.PAD)
loss.backward()
optimizer.step()
train_bar.desc = 'train epoch:[{}/{}] loss={:.3f}'.format(epoch + 1, epoches, loss)
if __name__=="__main__":
train(10)