Attention Mechanism
- 4-1. Seq2Seq - Change Word
-
Paper Learning Phrase Representations using RNN Encoder–Decoder for Statistical Machine Translation(2014)
'''
code by Tae Hwan Jung(Jeff Jung) @graykode
reference : https://github.com/golbin/TensorFlow-Tutorials/blob/master/10%20-%20RNN/03%20-%20Seq2Seq.py
'''
import tensorflow as tf
import numpy as np
tf.reset_default_graph()
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
char_arr = [c for c in 'SEPabcdefghijklmnopqrstuvwxyz']
num_dic = {n: i for i, n in enumerate(char_arr)}
seq_data = [['man', 'women'], ['black', 'white'], ['king', 'queen'], ['girl', 'boy'], ['up', 'down'], ['high', 'low']]
# Seq2Seq Parameter
n_step = 5
n_hidden = 128
n_class = len(num_dic) # number of class(=number of vocab)
def make_batch(seq_data):
input_batch, output_batch, target_batch = [], [], []
for seq in seq_data:
for i in range(2):
seq[i] = seq[i] + 'P' * (n_step - len(seq[i]))
input = [num_dic[n] for n in seq[0]]
output = [num_dic[n] for n in ('S' + seq[1])]
target = [num_dic[n] for n in (seq[1] + 'E')]
input_batch.append(np.eye(n_class)[input])
output_batch.append(np.eye(n_class)[output])
target_batch.append(target)
return input_batch, output_batch, target_batch
# Model
enc_input = tf.placeholder(tf.float32, [None, None, n_class]) # [batch_size, max_len(=encoder_step), n_class]
dec_input = tf.placeholder(tf.float32, [None, None, n_class]) # [batch_size, max_len+1(=decoder_step) (becase of 'S' or 'E'), n_class]
targets = tf.placeholder(tf.int64, [None, None]) # [batch_size, max_len+1], not one-hot
with tf.variable_scope('encode'):
enc_cell = tf.nn.rnn_cell.BasicRNNCell(n_hidden)
enc_cell = tf.nn.rnn_cell.DropoutWrapper(enc_cell, output_keep_prob=0.5)
_, enc_states = tf.nn.dynamic_rnn(enc_cell, enc_input, dtype=tf.float32)
# encoder state will go to decoder initial_state, enc_states : [batch_size, n_hidden(=128)]
with tf.variable_scope('decode'):
dec_cell = tf.nn.rnn_cell.BasicRNNCell(n_hidden)
dec_cell = tf.nn.rnn_cell.DropoutWrapper(dec_cell, output_keep_prob=0.5)
outputs, _ = tf.nn.dynamic_rnn(dec_cell, dec_input, initial_state=enc_states, dtype=tf.float32)
# outputs : [batch_size, max_len+1, n_hidden(=128)]
model = tf.layers.dense(outputs, n_class, activation=None) # model : [batch_size, max_len+1, n_class]
cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=model, labels=targets))
optimizer = tf.train.AdamOptimizer(0.001).minimize(cost)
# Training
sess = tf.Session()
sess.run(tf.global_variables_initializer())
input_batch, output_batch, target_batch = make_batch(seq_data)
for epoch in range(5000):
_, loss = sess.run([optimizer, cost], feed_dict={enc_input: input_batch, dec_input: output_batch, targets: target_batch})
if (epoch + 1)%1000 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
# Test
def translate(word):
seq_data = [word, 'P' * len(word)]
input_batch, output_batch, _ = make_batch([seq_data])
prediction = tf.argmax(model, 2)
result = sess.run(prediction, feed_dict={enc_input: input_batch, dec_input: output_batch})
decoded = [char_arr[i] for i in result[0]]
end = decoded.index('E')
translated = ''.join(decoded[:end])
return translated.replace('P','')
print('test')
print('man ->', translate('man'))
print('mans ->', translate('mans'))
print('king ->', translate('king'))
print('black ->', translate('black'))
print('upp ->', translate('upp'))
- 4-2. Seq2Seq with Attention - Translate
-
Paper Neural Machine Translation by Jointly Learning to Align and Translate
代码块语法遵循标准markdown代码,例如:
# code by Tae Hwan Jung(Jeff Jung) @graykode
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
tf.reset_default_graph()
# S: Symbol that shows starting of decoding input
# E: Symbol that shows starting of decoding output
# P: Symbol that will fill in blank sequence if current batch data size is short than time steps
sentences = ['ich mochte ein bier P', 'S i want a beer', 'i want a beer E']
word_list = " ".join(sentences).split()
word_list = list(set(word_list))
word_dict = {w: i for i, w in enumerate(word_list)}
number_dict = {i: w for i, w in enumerate(word_list)}
n_class = len(word_dict) # vocab list
# Parameter
n_step = 5 # maxium number of words in one sentence(=number of time steps)
n_hidden = 128
def make_batch(sentences):
input_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[0].split()]]]
output_batch = [np.eye(n_class)[[word_dict[n] for n in sentences[1].split()]]]
target_batch = [[word_dict[n] for n in sentences[2].split()]]
return input_batch, output_batch, target_batch
# Model
enc_inputs = tf.placeholder(tf.float32, [None, None, n_class]) # [batch_size, n_step, n_class]
dec_inputs = tf.placeholder(tf.float32, [None, None, n_class]) # [batch_size, n_step, n_class]
targets = tf.placeholder(tf.int64, [1, n_step]) # [batch_size, n_step], not one-hot
# Linear for attention
attn = tf.Variable(tf.random_normal([n_hidden, n_hidden]))
out = tf.Variable(tf.random_normal([n_hidden * 2, n_class]))
def get_att_score(dec_output, enc_output): # enc_output [n_step, n_hidden]
score = tf.squeeze(tf.matmul(enc_output, attn), 0) # score : [n_hidden]
dec_output = tf.squeeze(dec_output, [0, 1]) # dec_output : [n_hidden]
return tf.tensordot(dec_output, score, 1) # inner product make scalar value
def get_att_weight(dec_output, enc_outputs):
attn_scores = [] # list of attention scalar : [n_step]
enc_outputs = tf.transpose(enc_outputs, [1, 0, 2]) # enc_outputs : [n_step, batch_size, n_hidden]
for i in range(n_step):
attn_scores.append(get_att_score(dec_output, enc_outputs[i]))
# Normalize scores to weights in range 0 to 1
return tf.reshape(tf.nn.softmax(attn_scores), [1, 1, -1]) # [1, 1, n_step]
model = []
Attention = []
with tf.variable_scope('encode'):
enc_cell = tf.nn.rnn_cell.BasicRNNCell(n_hidden)
enc_cell = tf.nn.rnn_cell.DropoutWrapper(enc_cell, output_keep_prob=0.5)
# enc_outputs : [batch_size(=1), n_step(=decoder_step), n_hidden(=128)]
# enc_hidden : [batch_size(=1), n_hidden(=128)]
enc_outputs, enc_hidden = tf.nn.dynamic_rnn(enc_cell, enc_inputs, dtype=tf.float32)
with tf.variable_scope('decode'):
dec_cell = tf.nn.rnn_cell.BasicRNNCell(n_hidden)
dec_cell = tf.nn.rnn_cell.DropoutWrapper(dec_cell, output_keep_prob=0.5)
inputs = tf.transpose(dec_inputs, [1, 0, 2])
hidden = enc_hidden
for i in range(n_step):
# time_major True mean inputs shape: [max_time, batch_size, ...]
dec_output, hidden = tf.nn.dynamic_rnn(dec_cell, tf.expand_dims(inputs[i], 1),
initial_state=hidden, dtype=tf.float32, time_major=True)
attn_weights = get_att_weight(dec_output, enc_outputs) # attn_weights : [1, 1, n_step]
Attention.append(tf.squeeze(attn_weights))
# matrix-matrix product of matrices [1, 1, n_step] x [1, n_step, n_hidden] = [1, 1, n_hidden]
context = tf.matmul(attn_weights, enc_outputs)
dec_output = tf.squeeze(dec_output, 0) # [1, n_step]
context = tf.squeeze(context, 1) # [1, n_hidden]
model.append(tf.matmul(tf.concat((dec_output, context), 1), out)) # [n_step, batch_size(=1), n_class]
trained_attn = tf.stack([Attention[0], Attention[1], Attention[2], Attention[3], Attention[4]], 0) # to show attention matrix
model = tf.transpose(model, [1, 0, 2]) # model : [n_step, n_class]
prediction = tf.argmax(model, 2)
cost = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(logits=model, labels=targets))
optimizer = tf.train.AdamOptimizer(0.001).minimize(cost)
# Training and Test
with tf.Session() as sess:
init = tf.global_variables_initializer()
sess.run(init)
for epoch in range(2000):
input_batch, output_batch, target_batch = make_batch(sentences)
_, loss, attention = sess.run([optimizer, cost, trained_attn],
feed_dict={enc_inputs: input_batch, dec_inputs: output_batch, targets: target_batch})
if (epoch + 1) % 400 == 0:
print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
predict_batch = [np.eye(n_class)[[word_dict[n] for n in 'P P P P P'.split()]]]
result = sess.run(prediction, feed_dict={enc_inputs: input_batch, dec_inputs: predict_batch})
print(sentences[0].split(), '->', [number_dict[n] for n in result[0]])
# Show Attention
fig = plt.figure(figsize=(5, 5))
ax = fig.add_subplot(1, 1, 1)
ax.matshow(attention, cmap='viridis')
ax.set_xticklabels([''] + sentences[0].split(), fontdict={'fontsize': 14})
ax.set_yticklabels([''] + sentences[2].split(), fontdict={'fontsize': 14})
plt.show()
'''
code by Tae Hwan Jung(Jeff Jung) @graykode
Reference : https://github.com/prakashpandey9/Text-Classification-Pytorch/blob/master/models/LSTM_Attn.py
'''
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
tf.reset_default_graph()
# Bi-LSTM(Attention) Parameters
embedding_dim = 2
n_hidden = 5 # number of hidden units in one cell
n_step = 3 # all sentence is consist of 3 words
n_class = 2 # 0 or 1
# 3 words sentences (=sequence_length is 3)
sentences = ["i love you", "he loves me", "she likes baseball", "i hate you", "sorry for that", "this is awful"]
labels = [1, 1, 1, 0, 0, 0] # 1 is good, 0 is not good.
word_list = " ".join(sentences).split()
word_list = list(set(word_list))
word_dict = {w: i for i, w in enumerate(word_list)}
vocab_size = len(word_dict)
input_batch = []
for sen in sentences:
input_batch.append(np.asarray([word_dict[n] for n in sen.split()]))
target_batch = []
for out in labels:
target_batch.append(np.eye(n_class)[out]) # ONE-HOT : To using Tensor Softmax Loss function
# LSTM Model
X = tf.placeholder(tf.int32, [None, n_step])
Y = tf.placeholder(tf.int32, [None, n_class])
out = tf.Variable(tf.random_normal([n_hidden * 2, n_class]))
embedding = tf.Variable(tf.random_uniform([vocab_size, embedding_dim]))
input = tf.nn.embedding_lookup(embedding, X) # [batch_size, len_seq, embedding_dim]
lstm_fw_cell = tf.nn.rnn_cell.LSTMCell(n_hidden)
lstm_bw_cell = tf.nn.rnn_cell.LSTMCell(n_hidden)
# output : [batch_size, len_seq, n_hidden], states : [batch_size, n_hidden]
output, final_state = tf.nn.bidirectional_dynamic_rnn(lstm_fw_cell,lstm_bw_cell, input, dtype=tf.float32)
# Attention
output = tf.concat([output[0], output[1]], 2) # output[0] : lstm_fw, output[1] : lstm_bw
final_hidden_state = tf.concat([final_state[1][0], final_state[1][1]], 1) # final_hidden_state : [batch_size, n_hidden * num_directions(=2)]
final_hidden_state = tf.expand_dims(final_hidden_state, 2) # final_hidden_state : [batch_size, n_hidden * num_directions(=2), 1]
attn_weights = tf.squeeze(tf.matmul(output, final_hidden_state), 2) # attn_weights : [batch_size, n_step]
soft_attn_weights = tf.nn.softmax(attn_weights, 1)
context = tf.matmul(tf.transpose(output, [0, 2, 1]), tf.expand_dims(soft_attn_weights, 2)) # context : [batch_size, n_hidden * num_directions(=2), 1]
context = tf.squeeze(context, 2) # [batch_size, n_hidden * num_directions(=2)]
model = tf.matmul(context, out)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=model, labels=Y))
optimizer = tf.train.AdamOptimizer(0.001).minimize(cost)
# Model-Predict
hypothesis = tf.nn.softmax(model)
predictions = tf.argmax(hypothesis, 1)
# Training
with tf.Session() as sess:
init = tf.global_variables_initializer()
sess.run(init)
for epoch in range(5000):
_, loss, attention = sess.run([optimizer, cost, soft_attn_weights], feed_dict={X: input_batch, Y: target_batch})
if (epoch + 1)%1000 == 0:
print('Epoch:', '%06d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
# Test
test_text = 'sorry hate you'
tests = [np.asarray([word_dict[n] for n in test_text.split()])]
predict = sess.run([predictions], feed_dict={X: tests})
result = predict[0][0]
if result == 0:
print(test_text,"is Bad Mean...")
else:
print(test_text,"is Good Mean!!")
fig = plt.figure(figsize=(6, 3)) # [batch_size, n_step]
ax = fig.add_subplot(1, 1, 1)
ax.matshow(attention, cmap='viridis')
ax.set_xticklabels([''] + ['first_word', 'second_word', 'third_word'], fontdict={'fontsize': 14}, rotation=90)
ax.set_yticklabels([''] + ['batch_1', 'batch_2', 'batch_3', 'batch_4', 'batch_5', 'batch_6'], fontdict={'fontsize': 14})
plt.show()