LSTM模型文件
import torch.nn as nn
import torch.nn.functional as F
class LSTM(nn.Module):
def __init__(self, num_classes, input_size, hidden_size, num_layers, dropout=0.8):
super(LSTM, self).__init__()
self.num_classes = num_classes
self.input_size = input_size
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
num_layers=num_layers, batch_first=True, dropout=dropout)
self.fc = nn.Linear(hidden_size, num_classes)
# self.softmax = F.softmax(128, num_classes)
def forward(self, x):
# h_0 = torch.zeros(
# self.num_layers,
# BATCH_SIZE, self.hidden_size
# )
# c_0 = torch.zeros(
# self.num_layers, BATCH_SIZE, self.hidden_size
# )
output, (h_n, c_n) = self.lstm(x, None)
h_out = F.relu(output[:, -1, :])
# h_n.view(-1, self.hidden_size)
out = self.fc(h_out)
return out
数据工具包Utils文件
import numpy as np
import torch
from torch.utils.data import DataLoader, TensorDataset
def read_dataset(file):
"""
读取数据集
:param file: 数据集路径
:return: 数据集字符列表(全小写)
"""
letters = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', ' ']
# 打开原始文件
with open(file, 'r') as f:
raw_text = f.readlines()
# 将每一行转换为小写
raw_text = [line.lower() for line in raw_text]
# 创建一个包含整个文本的字符串
text_string = ''
for line in raw_text:
text_string += line.strip()
# 。创建一个字符数组
text = list()
for char in text_string:
text.append(char)
# 去掉所有的符号,只保留字母
text = [char for char in text if char in letters]
return text
def create_dictionary(text):
"""
建立字符映射字典
:param text: 数据集列表
:return: 数据集中字符的字典映射
"""
char_to_idx = dict()
idx_to_char = dict()
idx = 0
for char in text:
if char not in char_to_idx.keys():
# 构建字典
char_to_idx[char] = idx
idx_to_char[idx] = char
idx += 1
return char_to_idx, idx_to_char
def build_sequences(text, char_to_idx, window):
"""
通过滑动窗口建立训练数据集
:param text: 字符串
:param char_to_idx: 字符映射字典
:param window: 滑动窗口大小
:return:
"""
x = list()
y = list()
for i in range(len(text) - window):
# 从文本中获取字符窗口
# 将其转换为其idx表示
sequence = text[i:i + window]
sequence = [char_to_idx[char] for char in sequence]
# 得到target
# 转换到它的idx表示
target = text[i + window]
target = char_to_idx[target]
# index = char_to_idx[target]
# label = [0 if k != index else 1 for k in range(len(char_to_idx))]
# 保存sequence和target
x.append(sequence)
# y.append(label)
y.append(target)
x = np.array(x)
y = np.array(y)
return x, y
# 建立DataLoader
def data_generator(x_train, y_train, x_test, y_test, batch_size):
train_dataset = TensorDataset(torch.from_numpy(x_train).to(torch.float32),
torch.from_numpy(y_train).to(torch.long))
test_dataset = TensorDataset(torch.from_numpy(x_test).to(torch.float32), torch.from_numpy(y_test).to(torch.float32))
train_loader = DataLoader(dataset=train_dataset,
batch_size=batch_size,
shuffle=False)
test_Loader = DataLoader(dataset=test_dataset,
batch_size=batch_size,
shuffle=False)
return train_loader, test_Loader
Trainer文件
import numpy as np
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from torch import nn, optim
import torch.nn.functional as F
from torch.autograd import Variable
from sklearn import metrics
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from Models import LSTM
from utils import *
# 训练模型
def train():
iter = 0
for epoch in range(2):
for i, (batch_x, batch_y) in enumerate(train_loader):
batch_x = Variable(torch.reshape(batch_x, (len(batch_x), -1, 1)))
batch_y = Variable(batch_y)
outputs = model(batch_x)
loss = F.cross_entropy(outputs, batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
iter += 1
if iter % 100 == 0:
train_acc = metrics.accuracy_score(batch_y.detach().numpy(), outputs.detach().numpy().argmax(axis=1))
msg = 'Iter: {
0:>6}, Train Loss: {
1:>5.2}, Train Acc: {
2:>6.2%}'
print(msg.format(iter, loss.item(), train_acc))
text_str = read_dataset("./data/shakespeare.txt")
char_to_idx_dict, idx_to_char_dict = create_dictionary(text_str)
categories = len(char_to_idx_dict)
print(char_to_idx_dict)
x, y = build_sequences(text_str, char_to_idx_dict, 10)
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)
train_loader, test_loader = data_generator(x_train, y_train, x_test, y_test, 64)
model = LSTM(num_classes=categories, input_size=1,
hidden_size=128, num_layers=2)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
train()
目前训练预测 的结果不是很理想,可以考虑引入预训练词向量或者尝试更复杂的模型