Headliner是一个类似于tensorflow_addons、fairseq、OpenNMT的用于构建Seq2Seq模型的开源库,它整体的代码基于tensorflow2.0构建,并提供了关于数据预处理、基准模型和模型评估的API,极大地方便用户快速的构建基准模型,并且允许用户可以根据自己的要求修改源代码定制化模型,从而避免重复造轮子所浪费时间。
下面就来看一下整个库的重要部分是如何实现的。Headliner的源代码部分主要分为以下几个部分:
- callbacks:回调函数的定义,包含了模型训练、验证和评估三个阶段
- evaluation:模型的评价指标,目前只有BLEU
- model:基准模型的实现,包含标准的Encoder-Decoder、Encoder-Decoder+Attention和Transformer
- preprocessing:数据预处理部分,包含了常用的预处理操作
- utils:打log部分
- embeddings.py:预训练词嵌入的使用
- losses.py:损失函数,这里只定义了常用的交叉熵
- trainer.py:启动训练的相关部分
下面主要对感兴趣的model、preprocessing和三个.py文件进行简单的解读。
model
- model_basic.py:标准的Encoder-Decoder模型,两者使用单层双向的LSTM
class Encoder(tf.keras.Model):
def __init__(self,
embedding_shape: Tuple[int, int],
lstm_size=50,
embedding_weights=None, # 是否使用预训练的词嵌入
embedding_trainable=True) -> None: # 词嵌入向量是否可训练,这里embedding_weights = None,表示从头根据语料库训练词嵌入向量
super(Encoder, self).__init__()
vocab_size, vec_dim = embedding_shape # embedding shape = (vocab_size, embedding_dim),Embedding层的初始化参数
weights = None if embedding_weights is None else [embedding_weights]
self.embedding = tf.keras.layers.Embedding(vocab_size,
vec_dim,
weights=weights,
trainable=embedding_trainable)
self.lstm = tf.keras.layers.LSTM(lstm_size, return_sequences=True, return_state=True, go_backwards=True) # go_backwards = True, 双向LSTM
self.lstm_size = lstm_size
def call(self,
sequence: tf.Tensor,
states: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
embed = self.embedding(sequence)
output, state_h, state_c = self.lstm(embed, initial_state=states)
return output, state_h, state_c # 返回Encoder的输出和对应的隐状态
# Encoder的状态初始化
def init_states(self, batch_size: int) -> Tuple[tf.Tensor, tf.Tensor]:
return tf.zeros([batch_size, self.lstm_size]), tf.zeros([batch_size, self.lstm_size])
class Decoder(tf.keras.Model):
def __init__(self,
embedding_shape: Tuple[int, int],
lstm_size=50,
embedding_weights=None,
embedding_trainable=True) -> None:
super(Decoder, self).__init__()
self.lstm_size = lstm_size
vocab_size, vec_dim = embedding_shape
weights = None if embedding_weights is None else [embedding_weights]
self.embedding = tf.keras.layers.Embedding(vocab_size,
vec_dim,
weights=weights,
trainable=embedding_trainable)
self.lstm = tf.keras.layers.LSTM(lstm_size, return_sequences=True, return_state=True)
self.dense = tf.keras.layers.Dense(vocab_size) # 由于需要逐词输入,这里使用大小为vocab_size的Dense层
def call(self, sequence: tf.Tensor, state: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
embed = self.embedding(sequence)
lstm_out, state_h, state_c = self.lstm(embed, state)
logits = self.dense(lstm_out) # 预测值
return logits, state_h, state_c
- model_attention.py:在标准的Encoder-Decoder模型中引入了注意力机制,这里使用的是LuongAttention
# Encoder
...
# Attention
class LuongAttention(tf.keras.Model):
def __init__(self, rnn_size):
super(LuongAttention, self).__init__()
self.wa = tf.keras.layers.Dense(rnn_size)
def call(self, decoder_output, encoder_output):
score = tf.matmul(decoder_output, self.wa(encoder_output), transpose_b=True) # 注意力得分
alignment = tf.nn.softmax(score, axis=2) # 对齐矩阵
context = tf.matmul(alignment, encoder_output) # 根据注意力计算的新的context vector
return context, alignment
# Decoder
...
- model_tranformer.py:标准的Transformer的定义
from typing import Tuple
import numpy as np
import tensorflow as tf
# positional encoding计算的相关内容
def get_angles(pos, i, embedding_size):
angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(embedding_size))
return pos * angle_rates
# positional encoding
def positional_encoding(position, embedding_size):
angle_rads = get_angles(np.arange(position)[:, np.newaxis],
np.arange(embedding_size)[np.newaxis, :],
embedding_size)
angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])
pos_encoding = angle_rads[np.newaxis, ...]
return tf.cast(pos_encoding, dtype=tf.float32)
# padding mask ,避免模型关注到补齐的无意义部分
def create_padding_mask(seq):
seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
return seq[:, tf.newaxis, tf.newaxis, :]
# look ahead mask,避免模型看到将要预测的部分
def create_look_ahead_mask(size):
mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
return mask
# scaled dot product attention
def scaled_dot_product_attention(q, k, v, mask):
matmul_qk = tf.matmul(q, k, transpose_b=True)
dk = tf.cast(tf.shape(k)[-1], tf.float32)
scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)
if mask is not None:
scaled_attention_logits += (mask * -1e9)
attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
output = tf.matmul(attention_weights, v)
return output, attention_weights
# 对应的Dense层
def point_wise_feed_forward_network(embedding_size, feed_forward_dim):
return tf.keras.Sequential([
tf.keras.layers.Dense(feed_forward_dim, activation='relu'),
tf.keras.layers.Dense(embedding_size)
])
def create_masks(inp, tar):
enc_padding_mask = create_padding_mask(inp)
dec_padding_mask = create_padding_mask(inp)
look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
dec_target_padding_mask = create_padding_mask(tar)
combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)
return enc_padding_mask, combined_mask, dec_padding_mask
# 多头注意力的实现
class MultiHeadAttention(tf.keras.layers.Layer):
def __init__(self,
embedding_size: int,
num_heads: int) -> None: # 头数的定义
super(MultiHeadAttention, self).__init__()
self.num_heads = num_heads
self.embedding_size = embedding_size
assert embedding_size % self.num_heads == 0
self.depth = embedding_size // self.num_heads
self.wq = tf.keras.layers.Dense(embedding_size) # q
self.wk = tf.keras.layers.Dense(embedding_size) # k
self.wv = tf.keras.layers.Dense(embedding_size) #v
self.dense = tf.keras.layers.Dense(embedding_size)
#分头
def split_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, v, k, q, mask):
batch_size = tf.shape(q)[0]
q = self.wq(q)
k = self.wk(k)
v = self.wv(v)
q = self.split_heads(q, batch_size)
k = self.split_heads(k, batch_size)
v = self.split_heads(v, batch_size)
scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask) # 注意力分数计算
scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.embedding_size))
output = self.dense(concat_attention)
return output, attention_weights
# Encoder layer的定义,可根据需要堆叠多层
class EncoderLayer(tf.keras.layers.Layer):
def __init__(self,
embedding_size: int,
num_heads: int,
feed_forward_dim: int,
dropout_rate=0.1) -> None:
super(EncoderLayer, self).__init__()
self.mha = MultiHeadAttention(embedding_size, num_heads)
self.ffn = point_wise_feed_forward_network(embedding_size, feed_forward_dim)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6) # layer normalization layer
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(dropout_rate) #dropout layer
self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
def call(self, x, training, mask):
attn_output, _ = self.mha(x, x, x, mask) # (batch_size, input_seq_len, embedding_size)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(x + attn_output) # (batch_size, input_seq_len, embedding_size)
ffn_output = self.ffn(out1) # (batch_size, input_seq_len, embedding_size)
ffn_output = self.dropout2(ffn_output, training=training)
out2 = self.layernorm2(out1 + ffn_output) # (batch_size, input_seq_len, embedding_size)
return out2
# Decoder layer的定义,可根据需要堆叠多层
class DecoderLayer(tf.keras.layers.Layer):
def __init__(self,
embedding_size: int,
num_heads: int,
feed_forward_dim: int,
dropout_rate=0.1) -> None:
super(DecoderLayer, self).__init__()
self.mha1 = MultiHeadAttention(embedding_size, num_heads)
self.mha2 = MultiHeadAttention(embedding_size, num_heads)
self.ffn = point_wise_feed_forward_network(embedding_size, feed_forward_dim)
self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.layernorm3 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = tf.keras.layers.Dropout(dropout_rate)
self.dropout2 = tf.keras.layers.Dropout(dropout_rate)
self.dropout3 = tf.keras.layers.Dropout(dropout_rate)
def call(self,
x,
enc_output,
training,
look_ahead_mask,
padding_mask):
attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
attn1 = self.dropout1(attn1, training=training)
out1 = self.layernorm1(attn1 + x)
attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
attn2 = self.dropout2(attn2, training=training)
out2 = self.layernorm2(attn2 + out1)
ffn_output = self.ffn(out2)
ffn_output = self.dropout3(ffn_output, training=training)
out3 = self.layernorm3(ffn_output + out2)
return out3, attn_weights_block1, attn_weights_block2
# Encoder
class Encoder(tf.keras.layers.Layer):
def __init__(self,
num_layers: int,
num_heads: int,
feed_forward_dim: int,
embedding_shape: Tuple[int, int],
embedding_trainable=True,
embedding_weights=None,
dropout_rate=0.1) -> None:
super(Encoder, self).__init__()
self.num_layers = num_layers
vocab_size, vec_dim = embedding_shape
weights = None if embedding_weights is None else [embedding_weights]
self.embedding_size = vec_dim
self.embedding = tf.keras.layers.Embedding(vocab_size,
vec_dim,
weights=weights,
trainable=embedding_trainable)
self.pos_encoding = positional_encoding(vocab_size, self.embedding_size)
self.enc_layers = [EncoderLayer(vec_dim, num_heads, feed_forward_dim, dropout_rate)
for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(dropout_rate)
def call(self, x, training, mask):
seq_len = tf.shape(x)[1]
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.embedding_size, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x = self.enc_layers[i](x, training, mask)
return x
# Decoder
class Decoder(tf.keras.layers.Layer):
def __init__(self,
num_layers: int,
num_heads: int,
feed_forward_dim: int,
embedding_shape: Tuple[int, int],
embedding_trainable=True,
embedding_weights=None,
dropout_rate=0.1) -> None:
super(Decoder, self).__init__()
self.num_layers = num_layers
vocab_size, vec_dim = embedding_shape
weights = None if embedding_weights is None else [embedding_weights]
self.embedding_size = vec_dim
self.embedding = tf.keras.layers.Embedding(vocab_size,
vec_dim,
weights=weights,
trainable=embedding_trainable)
self.pos_encoding = positional_encoding(vocab_size, vec_dim)
self.dec_layers = [DecoderLayer(vec_dim, num_heads, feed_forward_dim, dropout_rate)
for _ in range(num_layers)]
self.dropout = tf.keras.layers.Dropout(dropout_rate)
def call(self,
x,
enc_output,
training,
look_ahead_mask,
padding_mask):
seq_len = tf.shape(x)[1]
attention_weights = {}
x = self.embedding(x)
x *= tf.math.sqrt(tf.cast(self.embedding_size, tf.float32))
x += self.pos_encoding[:, :seq_len, :]
x = self.dropout(x, training=training)
for i in range(self.num_layers):
x, block1, block2 = self.dec_layers[i](x, enc_output, training,
look_ahead_mask, padding_mask)
attention_weights['decoder_layer{}_block1'.format(i + 1)] = block1
attention_weights['decoder_layer{}_block2'.format(i + 1)] = block2
return x, attention_weights
# Transformer模型的定一部分
class Transformer(tf.keras.Model):
def __init__(self,
num_layers: int,
num_heads: int,
feed_forward_dim: int,
embedding_shape_encoder: Tuple[int, int],
embedding_shape_decoder: Tuple[int, int],
embedding_encoder_trainable=True,
embedding_decoder_trainable=True,
embedding_weights_encoder=None,
embedding_weights_decoder=None,
dropout_rate=0.1) -> None:
super(Transformer, self).__init__()
self.encoder = Encoder(num_layers,
num_heads,
feed_forward_dim,
embedding_shape_encoder,
embedding_trainable=embedding_encoder_trainable,
embedding_weights=embedding_weights_encoder,
dropout_rate=dropout_rate)
self.decoder = Decoder(num_layers,
num_heads,
feed_forward_dim,
embedding_shape_decoder,
embedding_trainable=embedding_decoder_trainable,
embedding_weights=embedding_weights_decoder,
dropout_rate=dropout_rate)
self.final_layer = tf.keras.layers.Dense(embedding_shape_decoder[0])
def call(self, inp, tar, training, enc_padding_mask,
look_ahead_mask, dec_padding_mask):
enc_output = self.encoder(inp, training, enc_padding_mask)
dec_output, attention_weights = self.decoder(
tar, enc_output, training, look_ahead_mask, dec_padding_mask)
final_output = self.final_layer(dec_output) # (batch_size, tar_seq_len, target_vocab_size)
return final_output, attention_weights
Transformer
淺談神經機器翻譯 & 用 Transformer 與 TensorFlow 2 英翻中 [强推!!!]
上面主要定义了三种常用的模型,整体代码很简洁明了,值得学习~
下面主要是三种Summarizer的定义,包括:
- Summarizer_basic
- Summarizer_attention
- Summarizer_transformer
由于三种模型在定义上会有很多的部分是一致的,因此这里首先在Summarizer.py中使用abc定义了抽象类:
import abc
from abc import abstractmethod
from typing import Callable, Dict, Union
import numpy as np
import tensorflow as tf
from headliner.preprocessing import Preprocessor, Vectorizer
class Summarizer(abc.ABC):
def __init__(self):
self.vectorizer: Union[Vectorizer, None] = None
self.preprocessor: Union[Preprocessor, None] = None
self.embedding_size: Union[int, None] = None
@abstractmethod
def init_model(self,
preprocessor: Preprocessor,
vectorizer: Vectorizer,
embedding_weights_encoder=None,
embedding_weights_decoder=None) -> None:
pass
@abstractmethod
def predict(self, text: str) -> str:
pass
@abstractmethod
def predict_vectors(self, input_text: str, target_text: str) -> Dict[str, Union[str, np.array]]:
pass
@abstractmethod
def new_train_step(self,
loss_function: Callable[[tf.Tensor], tf.Tensor],
batch_size: int,
apply_gradients=True) -> Callable[[tf.Tensor, tf.Tensor], float]:
pass
@abstractmethod
def save(self, out_path: str) -> None:
pass
@staticmethod
@abstractmethod
def load(in_path: str):
pass
- Summarizer_basic:
class SummarizerBasic(Summarizer):
# 参数定义
def __init__(self, lstm_size=50,
max_prediction_len=20,
embedding_size=50,
embedding_encoder_trainable=True, # 不使用预训练词嵌入
embedding_decoder_trainable=True):
super().__init__()
self.lstm_size = lstm_size
self.max_prediction_len = max_prediction_len
self.embedding_size = embedding_size
self.embedding_encoder_trainable = embedding_encoder_trainable
self.embedding_decoder_trainable = embedding_decoder_trainable
self.encoder = None
self.decoder = None
self.optimizer = None
self.embedding_shape_in = None
self.embedding_shape_out = None
# 模型初始化
def init_model(self,
preprocessor: Preprocessor,
vectorizer: Vectorizer,
embedding_weights_encoder=None,
embedding_weights_decoder=None) -> None:
self.preprocessor = preprocessor
self.vectorizer = vectorizer
self.embedding_shape_in = (self.vectorizer.encoding_dim, self.embedding_size)
self.embedding_shape_out = (self.vectorizer.decoding_dim, self.embedding_size)
self.encoder = Encoder(self.embedding_shape_in,
self.lstm_size,
embedding_trainable=self.embedding_encoder_trainable,
embedding_weights=embedding_weights_encoder) # Encoder的初始化
self.decoder = Decoder(self.embedding_shape_out,
self.lstm_size,
embedding_trainable=self.embedding_decoder_trainable,
embedding_weights=embedding_weights_decoder) # Decoder初始化
self.optimizer = SummarizerBasic._new_optimizer()
# 模型编译
self.encoder.compile(optimizer=self.optimizer)
self.decoder.compile(optimizer=self.optimizer)
def __getstate__(self):
""" Prevents pickle from serializing encoder and decoder. """
state = self.__dict__.copy()
del state['encoder']
del state['decoder']
del state['optimizer']
return state
# 根据给定文本预测对应的summary
def predict(self, text: str) -> str:
return self.predict_vectors(text, '')['predicted_text']
# 预测
def predict_vectors(self, input_text: str, target_text: str) -> Dict[str, Union[str, np.array]]:
text_preprocessed = self.preprocessor((input_text, target_text)) # 文本预处理
en_inputs, _ = self.vectorizer(text_preprocessed) # 向量化
en_initial_states = self.encoder.init_states(1)
en_outputs = self.encoder(tf.constant([en_inputs]), en_initial_states) # Encoder的处理部分
start_end_seq = self.vectorizer.encode_output(
' '.join([self.preprocessor.start_token, self.preprocessor.end_token])) # 首尾分别添加start token和end token
de_start_index, de_end_index = start_end_seq[:1], start_end_seq[-1:] # start token和end token对应的索引
de_input = tf.constant([de_start_index]) # 将start token做为Decoder的输入,要求预测后续每个时刻的输出
de_state_h, de_state_c = en_outputs[1:] # Encoder的隐状态输出
output = {'preprocessed_text': text_preprocessed,
'logits': [],
'alignment': [],
'predicted_sequence': []}
for _ in range(self.max_prediction_len):
de_output, de_state_h, de_state_c = self.decoder(de_input, (de_state_h, de_state_c)) # Decoder的处理部分
de_input = tf.argmax(de_output, -1) # 将前一时刻的输入做为下一时刻的输入
pred_token_index = de_input.numpy()[0][0]
if pred_token_index != 0:
output['logits'].append(np.squeeze(de_output.numpy()))
output['predicted_sequence'].append(pred_token_index)
if pred_token_index == de_end_index:
break
output['predicted_text'] = self.vectorizer.decode_output(output['predicted_sequence']) # 预测文本序列
return output
# 模型训练部分
def new_train_step(self,
loss_function: Callable[[tf.Tensor], tf.Tensor],
batch_size: int,
apply_gradients=True) -> Callable[[tf.Tensor, tf.Tensor], float]:
train_step_signature = [
tf.TensorSpec(shape=(batch_size, None), dtype=tf.int32),
tf.TensorSpec(shape=(batch_size, None), dtype=tf.int32),
]
encoder = self.encoder
decoder = self.decoder
optimizer = self.optimizer
@tf.function(input_signature=train_step_signature)
def train_step(source_seq: tf.Tensor,
target_seq: tf.Tensor) -> float:
en_initial_states = self.encoder.init_states(source_seq.get_shape()[0])
with tf.GradientTape() as tape:
en_outputs = encoder(source_seq, en_initial_states)
en_states = en_outputs[1:]
de_states = en_states
de_outputs = decoder(target_seq[:, :-1], de_states)
logits = de_outputs[0]
loss = loss_function(target_seq[:, 1:], logits)
if apply_gradients is True:
variables = encoder.trainable_variables + decoder.trainable_variables
gradients = tape.gradient(loss, variables)
optimizer.apply_gradients(zip(gradients, variables))
return float(loss)
return train_step
# 保存模型为pickle文件
def save(self, out_path):
if not os.path.exists(out_path):
os.mkdir(out_path)
summarizer_path = os.path.join(out_path, 'summarizer.pkl')
encoder_path = os.path.join(out_path, 'encoder')
decoder_path = os.path.join(out_path, 'decoder')
with open(summarizer_path, 'wb+') as handle:
pickle.dump(self, handle, protocol=pickle.HIGHEST_PROTOCOL)
self.encoder.save_weights(encoder_path, save_format='tf')
self.decoder.save_weights(decoder_path, save_format='tf')
# 加载训练好的模型
@staticmethod
def load(in_path):
summarizer_path = os.path.join(in_path, 'summarizer.pkl')
encoder_path = os.path.join(in_path, 'encoder')
decoder_path = os.path.join(in_path, 'decoder')
with open(summarizer_path, 'rb') as handle:
summarizer = pickle.load(handle)
summarizer.encoder = Encoder(summarizer.embedding_shape_in,
summarizer.lstm_size,
embedding_trainable=summarizer.embedding_encoder_trainable)
summarizer.decoder = Decoder(summarizer.embedding_shape_out,
summarizer.lstm_size,
embedding_trainable=summarizer.embedding_decoder_trainable)
optimizer = SummarizerBasic._new_optimizer()
summarizer.encoder.compile(optimizer=optimizer)
summarizer.decoder.compile(optimizer=optimizer)
summarizer.encoder.load_weights(encoder_path)
summarizer.decoder.load_weights(decoder_path)
summarizer.optimizer = summarizer.encoder.optimizer
return summarizer
@staticmethod
def _new_optimizer() -> tf.keras.optimizers.Optimizer:
return tf.keras.optimizers.Adam() # 使用常用的Adam
后面的两个Summarizer大同小异,只是在模型的选取部分有些不同。