In this blog, we will generate sequence as string and train a modol to learn to add two numbers. Like input string will be "125+50" and output will be "175".
We will use Short-Term Memory (LSTM) architecture to solve general sequence to sequence problems.
The idea is to use one LSTM to read the input sequence, one timestep at a time, to obtain large fixed-dimensional vector representation, and then to use another LSTM to extract the output sequence from that vector. The second LSTM is essentially a recurrent neural network language model except that it is conditioned on the input sequence. The LSTM’s ability to successfully learn on data with long range temporal dependencies makes it a natural choice for this application due to the considerable time lag between the inputs and their corresponding outputs.
Our model reads an input sentence "ABC" and produces "WXYZ" as the output sentence. The model stops making predictions after outputting the end-of-sentence token. Note that the LSTM reads the input sentence in reverse, because doing so introduces many short term dependencies in the data that make the optimization problem much easier.
An encoder/decoder in deep learning is a technique used mainly in text generation. This global method is called "sequence to sequence". As you have an input sequence (in french for example) and you will translate it in english, so you generate another sequence. Generally, the encoder encodes the input sequence and summarizes the information in something called the internal state vectors or context vector(in case of LSTM these are called the hidden state and cell state vectors). That is used by the decoder to generate the output sequence.
The decoder is an LSTM whose initial states are initialized to the final states of the Encoder LSTM, i.e. the context vector of the encoder’s final cell is input to the first cell of the decoder network. Using these initial states, the decoder starts generating the output sequence, and these outputs are also taken into consideration for future outputs.
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
# Parameters for the model and dataset.
TRAINING_SIZE = 50000
#TRAINING_SIZE = 100
DIGITS = 3
REVERSE = True
If we add three digit numbers then maximum length of input will be 7 (DIGITS + 1 + DIGITS). Ex. "125+950".
MAXLEN = DIGITS + 1 + DIGITS
MAXLEN
class GenerateSequence:
def __init__(self):
pass
pass
gs = GenerateSequence()
print(gs)
class GenerateSequence:
def __init__(self, chars):
self.chars = chars
pass
def __str__(self):
s = " Characters: " + str(self.chars)
return s
pass
pass
# All the numbers, plus sign and space for padding.
chars = "0123456789+ "
gs = GenerateSequence(chars)
print(gs)
class GenerateSequence:
def __init__(self, chars):
self.chars = chars
self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
pass
def __str__(self):
s = "Characters: " + str(self.chars) + \
"\nChar Indices: " + str(self.char_indices) + \
"\nIndices Char: " + str(self.indices_char)
return s
pass
pass
# All the numbers, plus sign and space for padding.
chars = "0123456789+ "
gs = GenerateSequence(chars)
print(gs)
encode function will encode the given sentence in the one-hot encoding. And decode function will give vector or 2D array to their character output. Input parameters of decode function are vector and calc_argmax.
calc_argmax: Whether to find the character index with maximum probability, defaults to "True"
class GenerateSequence:
def __init__(self, chars):
self.chars = chars
self.char_indices = dict((c, i) for i, c in enumerate(self.chars))
self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
pass
def encode(self, sentence, MAXLEN):
x = np.zeros((MAXLEN, len(self.chars)))
for i, s in enumerate(sentence):
x[i, self.char_indices[s]] = 1
return x
pass
def decode(self, x, calc_argmax = True):
if calc_argmax:
x = x.argmax(axis = -1)
return "".join(self.indices_char[x] for x in x)
pass
def __str__(self):
s = "Characters: " + str(self.chars) + \
"\nChar Indices: " + str(self.char_indices) + \
"\nIndices Char: " + str(self.indices_char)
return s
pass
pass
# All the numbers, plus sign and space for padding.
chars = "0123456789+ "
gs = GenerateSequence(chars)
print(gs)
questions = []
expected = []
existed = set()
while len(questions) < TRAINING_SIZE:
f = lambda: int(
"".join(
np.random.choice(list("0123456789"))
for i in range(np.random.randint(1, DIGITS + 1))
)
)
a, b = f(), f()
## Skip any addition questions which is already existed. To skip any such that x+Y == Y+x, so nsorting).
key = tuple(sorted((a, b)))
if key in existed:
continue
existed.add(key)
# Pad the data with spaces such that it is always MAXLEN.
q = "{}+{}".format(a, b)
query = q + " " * (MAXLEN - len(q))
ans = str(a + b)
# Answers can be of maximum size DIGITS + 1.
ans += " " * (DIGITS + 1 - len(ans))
if REVERSE:
# Reverse the query, e.g., '12+345 ' becomes ' 543+21'. (Note the
# space used for padding.)
query = query[::-1]
questions.append(query)
expected.append(ans)
print("Total questions:", len(questions))
print(questions[:20])
print(expected[:20])
x = np.zeros((len(questions), MAXLEN, len(chars)), dtype = bool)
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype = bool)
x.shape
y.shape
for i, sentence in enumerate(questions):
x[i] = gs.encode(sentence, MAXLEN)
pass
for i, sentence in enumerate(expected):
y[i] = gs.encode(sentence, DIGITS + 1)
pass
indices = np.arange(len(y))
#print(indices)
np.random.shuffle(indices)
#print(indices)
x = x[indices]
y = y[indices]
x.shape
y.shape
print(x[0][0])
print(y[0][0])
from sklearn.model_selection import train_test_split
x_train, x_validation, y_train, y_validation = train_test_split(x,y, test_size = 0.2, random_state = 50)
print(x_train.shape)
print(y_train.shape)
print(x_validation.shape)
print(y_validation.shape)
This is our training model. It leverages three key features of Keras LSTM:
The return_state contructor argument, configuring a RNN layer to return a list where the first entry is the outputs and the next entries are the internal RNN states. This is used to recover the states of the encoder.
The inital_state call argument, specifying the initial state(s) of a RNN. This is used to pass the encoder states to the decoder as initial states.
The return_sequences constructor argument, configuring a RNN to return its full sequence of outputs (instead of just the last output but all the outputs in form of (num_samples, timesteps, output_dim). This is used in the decoder. This is necessary as TimeDistributed in the below expects the first dimension to be the timesteps.
num_layers = 1
model = keras.Sequential()
model.add(layers.LSTM(128, input_shape=(MAXLEN, len(chars))))
model.add(layers.RepeatVector(DIGITS + 1))
# The decoder RNN can be multiple layers stacked or a single layer.
for _ in range(num_layers):
model.add(layers.LSTM(128, return_sequences=True))
model.add(layers.Dense(len(chars), activation="softmax"))
model.compile(loss="categorical_crossentropy", optimizer="adam", metrics=["accuracy"])
model.summary()
epochs = 20
batch_size = 32
for epoch in range(1, epochs):
print("\nIteration", epoch)
model.fit(
x_train,
y_train,
batch_size = batch_size,
epochs = 1,
validation_data=(x_validation, y_validation),
)
# Print 10 samples from the validation set at random so we can visualize the error
for i in range(10):
ind = np.random.randint(0, len(x_validation))
x_val, y_val = x_validation[np.array([ind])], y_validation[np.array([ind])]
predictions = np.argmax(model.predict(x_val), axis=-1)
#print("predictions: ", predictions)
question = gs.decode(x_val[0])
correct = gs.decode(y_val[0])
prediction = gs.decode(predictions[0], calc_argmax = False)
print("Q: ", question[::-1] if REVERSE else question, end=" ")
print("T: ", correct, end=" ")
if correct == prediction:
print("☑ " + prediction)
else:
print("☒ " + prediction)
Now we have got the 99% validation accuracy.