建立语言模型的目的是为了计算一个句子出现的概率,利用语言模型,可以确定哪个单词序列出现的可能性更大,或者给定若干个单词,预测下一个最可能出现的词语。语言模型的常用评价指标是复杂度,刻画的是通过某一个语言模型估计的一句话出现的概率。例如当已经知道(w1,w2,w3,…,w(m)这句话出现在语料库之中,那么通过语言模型得到这句话的概率越高越好,也就是复杂度越小越好。复杂度perplexity表示的概念是平均分支系数,即模型预测下一个词时的平均可选择数量。计算perplexity值的公式如下:
相比乘积开根号的方式,另一种计算方式使用加法的形式加速计算,也能有效避免概率为0时导致整个计算结果为0的问题:
PTB文本数据集是语言模型学习中目前最广泛使用的数据集,TensorFlow提供了两个函数来帮助实现数据的预处理,将原始数据中的单词转化为单词ID:
ptb_raw_data(DATA_PATH):读取原始数据
ptb_producer(raw_data,batch_size,num_steps):用于将数据组织成大小为 batch_size,长度为 num_steps 的数据组
以下是这两个函数的示例:
#读取数据并转化为单词ID
import tensorflow as tf
from tensorflow.models.tutorials.rnn.ptb import reader
#数据存放路径
DATA_PATH = r"F:\学校事务\论文资料\tensorflow\PTB数据集\simple-examples\data"
#读取原始数据
train_data, valid_data, test_data, _ = reader.ptb_raw_data(DATA_PATH)
print(len(train_data))
print(train_data[:100])
#将训练数据组织成batch大小为4,截断为5的数据组。要放在开启多线程之前.
batch = reader.ptb_producer(train_data, 4, 5)
with tf.Session() as sess:
tf.global_variables_initializer().run()
#开启多线程
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(coord=coord)
# 读取前两个batch,其中包括每个时刻的输入和对应的答案,ptb_producer()会自动迭代
for i in range(2):
x,y = sess.run(batch)
print('X:',x)
print('Y:',y)
# 关闭多线程
coord.request_stop()
coord.join(threads)
输出:
因为ptb_producer() 函数中使用了 tf.train.range_input_producer() 函数,所以需要开启多线程。
以下是完整的TensorFlow样例程序,通过循环神经网络实现语言模型:
import numpy as np
import tensorflow as tf
from tensorflow.models.tutorials.rnn.ptb import reader
#读取存放路径
DATA_PATH = r"F:\学校事务\论文资料\tensorflow\PTB数据集\simple-examples\data"
HIDDEN_SIZE = 200 #隐藏层规模
NUM_LAYERS = 2 #LSTM结构的层数
VOCAB_SIZE = 10000 #词典规模,包含语句结束标识符和稀有单词标识符
LEARNING_RATE = 1.0 #学习速率
TRAIN_BATCH_SIZE = 20 #训练数据batch的大小
TRAIN_NUM_STEP = 35 #训练数据截断长度
#在测试时不需要使用截断,可以将测试数据看成一个超长的序列。
EVAL_BATCH_SIZE = 1 #测试数据batch的大小
EVAL_NUM_STEP = 1 #测试数据截断长度
NUM_EPOCH = 2 #使用训练数据的轮数
KEEP_PROB = 0.5 #节点不被dropout的概率
MAX_GRAD_NORM = 5 #用于控制梯度膨胀的参数
#通过一个PTBModel类来描述模型,这样方便维护循环神经网络中的状态
class PTBModel(object):
def __init__(self, is_training, batch_size, num_steps):
#记录使用的batch大小和截断长度
self.batch_size = batch_size
self.num_steps = num_steps
#定义输入层,输入层维度为batch_size*num_steps,这和ptb_iterator函数输出的训练数据batch一致。
self.input_data = tf.placeholder(tf.int32, [batch_size, num_steps])
#定义预期输出,维度和ptb_iterator函数输出的正确答案维度也是一样的。
self.targets = tf.placeholder(tf.int32, [batch_size, num_steps])
#定义使用LSTM结构为循环体结构且使用dropout的深层循环神经网络。output_keep_prob可以用来控制输出的dropout概率。
lstm_cell = tf.contrib.rnn.BasicLSTMCell(HIDDEN_SIZE)
if is_training:
lstm_cell = tf.contrib.rnn.DropoutWrapper(lstm_cell,output_keep_prob=KEEP_PROB)
cell = tf.contrib.rnn.MultiRNNCell([lstm_cell]*NUM_LAYERS)
#初始化最初的状态,也就是全零的向量。
self.initial_state = cell.zero_state(batch_size, tf.float32)
#将单词id转化为单词向量,总共有VOCAB_SIZE个单词,每个单词向量的维度为HIDDEN_SIZE,所以embedding参数的维度
#为VOCAB_SIZE*HIDDEN_SIZE
embedding = tf.get_variable('embedding',[VOCAB_SIZE,HIDDEN_SIZE])
#将原本batch_size*num_steps个单词ID 转化为单词向量,转化后的输入层维度为batch_size*num_steps*HIDDEN_SIZE
inputs = tf.nn.embedding_lookup(embedding, self.input_data)
#只在训练时使用dropout
if is_training: inputs = tf.nn.dropout(inputs, KEEP_PROB)
#定义输出列表,先将不同时刻LSTM结构的输出收集起来,再通过一个个全连接层得到最终输出。
outputs = []
#state存储不同LSTM中的状态,将其初始化为0.
state = self.initial_state
with tf.variable_scope("RNN"):
for time_step in range(num_steps):
if time_step > 0: tf.get_variable_scope().reuse_variables()
#从输入数据中获取当前时刻获得输入并传入LSTM结构
cell_output, state = cell(inputs[:, time_step, :],state)
#将当前输出加入输出队列
outputs.append(cell_output)
#把输出队列展开成[batch, hidden_size*num_steps]的形状,然后再reshape成[batch*num_steps, hidden_size]的形状
#第1个维度拼接
output = tf.reshape(tf.concat(outputs,1), [-1, HIDDEN_SIZE])
#将从LSTM中得到的输出再经过一个全连接层后得到最后的预测结果,最终的预测结果在每个时刻上都是个长度为VOCAB_SIZE
#的数组,经过softmax层之后表示下一个位置是不同单词的概率。
weight = tf.get_variable("weight", [HIDDEN_SIZE,VOCAB_SIZE])
bias = tf.get_variable("bias", [VOCAB_SIZE])
logits = tf.matmul(output, weight) + bias
#定义交叉熵损失函数,sequence_loss_by_example函数可以计算一个序列交叉熵的和。
loss = tf.contrib.legacy_seq2seq.sequence_loss_by_example(
#预测的结果
[logits],
#期待的正确答案,
[tf.reshape(self.targets,[-1])],
#这里将[batch_size*num_steps]二维数组压缩为一维。
#损失的权重。在这里所有的权重都为1,也就是说不同batch和不同时刻的重要程度是一样的。
[tf.ones([batch_size*num_steps],dtype=tf.float32)])
#计算得到每个batch的平均损失。
self.cost = tf.reduce_sum(loss)/batch_size
self.final_state = state
#只在训练模型时定义反向传播操作。
if not is_training: return
trainable_variables = tf.trainable_variables()
#通过clip_by_global_norm函数控制梯度的大小,避免梯度膨胀问题。
grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, trainable_variables), MAX_GRAD_NORM)
#定义优化方法
optimizer = tf.train.GradientDescentOptimizer(LEARNING_RATE)
#定义训练步骤,zip()函数用于将可迭代的对象作为参数,将对象中对应的元素打包成一个个元组,然后返回由这些元组组成的列表。
self.train_op = optimizer.apply_gradients(zip(grads, trainable_variables))
#使用给定的模型model在数据data上运行train_op并返回在全部数据上的perplexity值。
def run_epoch(session, model, data, train_op, output_log, epoch_size):
#计算perplexity的辅助变量
total_costs = 0.0
iters = 0
state = session.run(model.initial_state)
# 使用当前数据训练或测试模型
for step in range(epoch_size):
# 生成输入和答案
feed_dict = {}
x, y = session.run(data)
feed_dict[model.input_data] = x
feed_dict[model.targets] = y
# 将状态转为字典
for i, (c, h) in enumerate(model.initial_state):
feed_dict[c] = state[i].c
feed_dict[h] = state[i].h
# 获取损失值和下一个状态
cost, state, _ = session.run(
[model.cost, model.final_state, train_op], feed_dict=feed_dict)
#将不同时刻,不同batch的概率加起来就可以得到第二个perplexity公式等号右边的部分,再将这个和做指数运算
#就可以得到perplexity值。
total_costs += cost
iters += model.num_steps
#只有在训练时输出日志
if output_log and step % 100 == 0:
print("在%d轮后,复杂度是%.3f"%(step,np.exp(total_costs/iters)))
#返回给定模型在给定数据上的perplexity值
return np.exp(total_costs/iters)
def main(_):
#读取原始数据
train_data, valid_data, test_data, _ = reader.ptb_raw_data(DATA_PATH)
# 计算一个epoch需要训练的次数
train_data_len = len(train_data) # 数据集的大小
train_batch_len = train_data_len // TRAIN_BATCH_SIZE # batch的个数
train_epoch_size = (train_batch_len - 1) // TRAIN_NUM_STEP # 该epoch的训练次数
valid_data_len = len(valid_data)
valid_batch_len = valid_data_len // EVAL_BATCH_SIZE
valid_epoch_size = (valid_batch_len - 1) // EVAL_NUM_STEP
test_data_len = len(test_data)
test_batch_len = test_data_len // EVAL_BATCH_SIZE
test_epoch_size = (test_batch_len - 1) // EVAL_NUM_STEP
#定义初始化函数,生成均匀分布的随机数,参数有四个(minval=0, maxval=None, seed=None,
#dtype=dtypes.float32),分别用于指定最小值,最大值,随机数种子和类型。
initializer = tf.random_uniform_initializer(-0.05,0.05)
#定义训练用的循环神经网络
with tf.variable_scope("language_model", reuse=None, initializer=initializer):
train_model = PTBModel(True,TRAIN_BATCH_SIZE, TRAIN_NUM_STEP)
#定义评测用的循环神经网络
with tf.variable_scope("language_model",reuse=True,initializer=initializer):
eval_model = PTBModel(False,EVAL_BATCH_SIZE,EVAL_NUM_STEP)
# 生成数据队列,必须放在开启多线程之前
train_queue = reader.ptb_producer(train_data, train_model.batch_size,
train_model.num_steps)
valid_queue = reader.ptb_producer(valid_data, eval_model.batch_size,
eval_model.num_steps)
test_queue = reader.ptb_producer(test_data, eval_model.batch_size,
eval_model.num_steps)
with tf.Session() as session:
tf.global_variables_initializer().run()
# 开启多线程从而支持ptb_producer()使用tf.train.range_input_producer()
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=session, coord=coord)
#使用训练数据训练模型
for i in range(NUM_EPOCH):
print("In iteration: %d" %(i+1))
#在所有训练数据上训练循环神经网络模型
run_epoch(session, train_model, train_queue, train_model.train_op, True, train_epoch_size)
#使用验证数据评测模型效果
valid_perplexity = run_epoch(session, eval_model, valid_queue, tf.no_op(), False, valid_epoch_size)
print("Epoch: %d Validation Perplexity : %.3f" % (i + 1, valid_perplexity))
#最后使用测试数据测试模型效果
test_perplexity = run_epoch(session, eval_model, test_queue, tf.no_op(), False, test_epoch_size)
print("Test Perplexity:%.3f" % test_perplexity)
# 停止所有线程
coord.request_stop()
coord.join(threads)
if __name__ == "__main__":
tf.app.run()
输入如下:
可以看到,经过模型训练后,训练数据上的复杂度大幅下降。
参考文献:https://blog.csdn.net/White_Idiot/article/details/78881261