-
Notifications
You must be signed in to change notification settings - Fork 0
/
Week 3 - RNN.py
135 lines (103 loc) · 5.3 KB
/
Week 3 - RNN.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
import torch
import numpy as np
from torch.autograd import Variable
import matplotlib.pyplot as plt
#data precprocessing
#open our text file and read all the data into the rawtxt variable
with open('lyrics', 'r') as file:
rawtxt = file.read()
#turn all of the text into lowercase as it will make it easier for our algorithm to learn
rawtxt = rawtxt.lower()
#returns a dictionary that allows us to map from a unique number to a unique character in our text
def create_map(rawtxt):
letters = list(set(rawtxt)) #returns the list of unique characters in our raw text
lettermap = dict(enumerate(letters)) #created the dictionary mapping
return lettermap
num_to_let = create_map(rawtxt) #store the dictionary mapping from numbers to characters in a variable
let_to_num = dict(zip(num_to_let.values(), num_to_let.keys())) #create the reverse mapping so we can map from a character to a unique number
#print(num_to_let)
#takes in a text file and applies the dictionary mapping passed in as a parameter
def maparray(txt, mapdict):
txt = list(txt)
#iterate through our text and change the value for each character to its mapped value
for k, letter in enumerate(txt):
txt[k] = mapdict[letter]
txt = np.array(txt)
return txt
#map our raw text into our input variables using the function defined earlier and passing in the mapping from letters to numbers
X = maparray(rawtxt, let_to_num)
Y = np.roll(X, -1, axis=0) #our label is the next character so roll shifts our array by one timestep
#conver to torch tensors so we can use them in our torch model
X = torch.LongTensor(X)
Y = torch.LongTensor(Y)
#return a random batch for training
def random_chunk(chunk_size):
k = np.random.randint(0, len(X)-chunk_size)
return X[k:k+chunk_size], Y[k:k+chunk_size]
nchars = len(num_to_let) #number of unique characters in our text file
#define our model which takes in variables defining its structure as parameters
class rnn(torch.nn.Module):
def __init__(self, input_size, hidden_size, output_size, n_layers=1):
super().__init__()
#store input parameters in the object so we can use them later on
self.input_size = input_size
self.hidden_size = hidden_size
self.output_size = output_size
self.n_layers = n_layers
#required functions for model
self.encoder = torch.nn.Embedding(input_size, hidden_size)
self.rnn = torch.nn.RNN(hidden_size, hidden_size, n_layers, batch_first=True)
self.decoder = torch.nn.Linear(hidden_size, output_size)
def forward(self, x, hidden):
x = self.encoder(x.view(1, -1)) #encode our input into a vector embedding
output, hidden = self.rnn(x.view(1, 1, -1), hidden) #calculate the output from our rnn based on our input and previous hidden state
output = self.decoder(output.view(1, -1)) #calculate our output based on output of rnn
return output, hidden
def init_hidden(self):
return Variable(torch.zeros(self.n_layers, 1, self.hidden_size)) #initialize our hidden state to a matrix of 0s
#hyper-params
lr = 0.003
no_epochs = 50
chunk_size = 100 #the length of the sequences which we will optimize over
myrnn = rnn(nchars, 50, nchars, 2) #instantiate our model from the class defined earlier
criterion = torch.nn.CrossEntropyLoss() #define our cost function
optimizer = torch.optim.Adam(myrnn.parameters(), lr=lr) #choose optimizer
#for plotting costs
costs = []
plt.ion()
fig = plt.figure()
ax = fig.add_subplot(111)
ax.set_xlabel('Epoch')
ax.set_ylabel('Cost')
ax.set_xlim(0, no_epochs)
plt.show()
#training loop
for epoch in range(no_epochs):
totcost = 0 #stored the cost per epoch
generated = '' #stores the text generated by our model each epoch
#given our chunk size, how many chunks do we need to optimizer over to have gone thorough our whole dataset
for _ in range(len(X)//chunk_size):
h = myrnn.init_hidden() #initialize our hidden state to 0s
cost = 0 #cost for this chunk
x, y = random_chunk(chunk_size) #get a random sequence chunk to train
x, y = Variable(x), Variable(y) #turn into variables to be used with our model
#sequentially input each character in our sequence and calculate loss
for i in range(chunk_size):
out, h = myrnn.forward(x[i], h) #calculate outputs based on input and previous hidden state
#based on our output, what character does our network predict is next?
_, outl = out.data.max(1)
letter = num_to_let[outl[0]]
generated+=letter #add the predicted letter to our generated sequence
cost += criterion(out, y[i]) #add the cost for this input to the cost for this current chunk
#based on the sum of the cost for this sequence - backpropagate through time - calculating the gradients and updating our weights
optimizer.zero_grad()
cost.backward()
optimizer.step()
totcost+=cost #add the cost of this sequence to the cost of this epoch
totcost /= len(X)//chunk_size #divide by the number of chunks per epoch to get average cost per epoch
#append the cost to the array and plot
costs.append(totcost.data[0])
ax.plot(costs, 'b')
fig.canvas.draw()
print('Epoch ', epoch, ' Avg cost/chunk: ', totcost)
print('Generated text: ', generated[0:750], '\n')