-
Notifications
You must be signed in to change notification settings - Fork 0
/
RNN_Music_Generator.py
252 lines (181 loc) · 8.1 KB
/
RNN_Music_Generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
!pip install comet_ml > /dev/null 2>&1
import comet_ml
COMET_API_KEY = "Use your own API key for comet.com"
# Import Tensorflow 2.0
import tensorflow as tf
# Download and import the MIT Introduction to Deep Learning package
!pip install mitdeeplearning --quiet
import mitdeeplearning as mdl
# Import all remaining packages
import numpy as np
import os
import time
import functools
from IPython import display as ipythondisplay
from tqdm import tqdm
from scipy.io.wavfile import write
!apt-get install abcmidi timidity > /dev/null 2>&1
# Download the dataset
songs = mdl.lab1.load_training_data()
# Join our list of song strings into a single string containing all songs
songs_joined = "\n\n".join(songs)
# Find all unique characters in the joined string
vocab = sorted(set(songs_joined))
print("There are", len(vocab), "unique characters in the dataset")
### Define numerical representation of text ###
# Create a mapping from character to unique index.
# For example, to get the index of the character "d",
# we can evaluate `char2idx["d"]`.
char2idx = {u:i for i, u in enumerate(vocab)}
# Create a mapping from indices to characters. This is
# the inverse of char2idx and allows us to convert back
# from unique index to the character in our vocabulary.
idx2char = np.array(vocab)
### Vectorize the songs string ###
'''
NOTE: the output of the `vectorize_string` function
should be a np.array with `N` elements, where `N` is
the number of characters in the input string
'''
def vectorize_string(string):
vectorized_songs = np.array([char2idx[song] for song in string ])
return vectorized_songs
vectorized_songs = vectorize_string(songs_joined)
### Batch definition to create training examples ###
def get_batch(vectorized_songs, seq_length, batch_size):
# the length of the vectorized songs string
n = vectorized_songs.shape[0] - 1
# randomly choose the starting indices for the examples in the training batch
idx = np.random.choice(n-seq_length, batch_size)
'''TODO: construct a list of input sequences for the training batch'''
input_batch = [vectorized_songs[i:i+seq_length] for i in idx]
'''TODO: construct a list of output sequences for the training batch'''
output_batch = [vectorized_songs[i+1:i+seq_length+1] for i in idx]
# x_batch, y_batch provide the true inputs and targets for network training
x_batch = np.reshape(input_batch, [batch_size, seq_length])
y_batch = np.reshape(output_batch, [batch_size, seq_length])
return x_batch, y_batch
x_batch, y_batch = get_batch(vectorized_songs, seq_length=5, batch_size=1)
def LSTM(rnn_units):
return tf.keras.layers.LSTM(
rnn_units,
return_sequences=True,
recurrent_initializer='glorot_uniform',
recurrent_activation='sigmoid',
stateful=True,
)
### Defining the RNN Model ###
def build_model(vocab_size, embedding_dim, rnn_units, batch_size):
model = tf.keras.Sequential([
# Layer 1: Embedding layer to transform indices into dense vectors
# of a fixed embedding size
tf.keras.layers.Embedding(vocab_size, embedding_dim, batch_input_shape=[batch_size, None]),
# Layer 2: LSTM with `rnn_units` number of units.
LSTM(rnn_units),
# Layer 3: Dense (fully-connected) layer that transforms the LSTM output
# into the vocabulary size.
tf.keras.layers.Dense(units = vocab_size)
])
return model
# Build a simple model with default hyperparameters. You will get the
# chance to change these later.
model = build_model(len(vocab), embedding_dim=256, rnn_units=1024, batch_size=32)
x, y = get_batch(vectorized_songs, seq_length=100, batch_size=32)
pred = model(x)
### Defining the loss function ###
def compute_loss(labels, logits):
loss = tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=True)
return loss
example_batch_loss = compute_loss(y, pred)
### Hyperparameter setting and optimization ###
vocab_size = len(vocab)
# Model parameters:
params = dict(
num_training_iterations = 10000, # Increase this to train longer
batch_size = 16, # Experiment between 1 and 64
seq_length = 125, # Experiment between 50 and 500
learning_rate = 5e-3, # Experiment between 1e-5 and 1e-1
embedding_dim = 256,
rnn_units = 512, # Experiment between 1 and 2048
)
# Checkpoint location:
checkpoint_dir = './training_checkpoints'
checkpoint_prefix = os.path.join(checkpoint_dir, "my_ckpt")
### Create a Comet experiment to track our training run ###
def create_experiment():
# end any prior experiments
if 'experiment' in locals():
experiment.end()
# initiate the comet experiment for tracking
experiment = comet_ml.Experiment(
api_key=COMET_API_KEY,
project_name="6S191_Lab1_Part2")
# log our hyperparameters, defined above, to the experiment
for param, value in params.items():
experiment.log_parameter(param, value)
experiment.flush()
return experiment
### Define optimizer and training operation ###
model = build_model(len(vocab), embedding_dim=params["embedding_dim"], rnn_units=params["rnn_units"], batch_size=params["batch_size"])
optimizer = tf.keras.optimizers.Adam(learning_rate=params["learning_rate"])
@tf.function
def train_step(x, y):
# Use tf.GradientTape()
with tf.GradientTape() as tape:
y_hat = model(x)
loss = compute_loss(y, y_hat)
# Now, compute the gradients
grads = tape.gradient(loss, model.trainable_variables)
# Apply the gradients to the optimizer so it can update the model accordingly
optimizer.apply_gradients(zip(grads, model.trainable_variables))
return loss
##################
# Begin training!#
##################
history = []
plotter = mdl.util.PeriodicPlotter(sec=2, xlabel='Iterations', ylabel='Loss')
experiment = create_experiment()
if hasattr(tqdm, '_instances'): tqdm._instances.clear() # clear if it exists
for iter in tqdm(range(params["num_training_iterations"])):
# Grab a batch and propagate it through the network
x_batch, y_batch = get_batch(vectorized_songs, params["seq_length"], params["batch_size"])
loss = train_step(x_batch, y_batch)
# log the loss to the Comet interface! we will be able to track it there.
experiment.log_metric("loss", loss.numpy().mean(), step=iter)
# Update the progress bar and also visualize within notebook
history.append(loss.numpy().mean())
plotter.plot(history)
# Update the model with the changed weights!
if iter % 100 == 0:
model.save_weights(checkpoint_prefix)
# Save the trained model and the weights
model.save_weights(checkpoint_prefix)
experiment.flush()
model = build_model(vocab_size, embedding_dim=params["embedding_dim"], rnn_units=params["rnn_units"], batch_size=1)
#model = build_model(vocab_size, embedding_dim, rnn_units, batch_size=1)
# Restore the model weights for the last checkpoint after training
model.load_weights(tf.train.latest_checkpoint(checkpoint_dir))
model.build(tf.TensorShape([1, None]))
model.summary()
### Prediction of a generated song ###
def generate_text(model, start_string, generation_length=1000):
# Evaluation step (generating ABC text using the learned RNN model)
input_eval = [char2idx[s] for s in start_string]
input_eval = tf.expand_dims(input_eval, 0)
# Empty string to store our results
text_generated = []
# Here batch size == 1
model.reset_states()
tqdm._instances.clear()
for i in tqdm(range(generation_length)):
predictions = model(input_eval)
# Remove the batch dimension
predictions = tf.squeeze(predictions, 0)
predicted_id = tf.random.categorical(predictions, num_samples=1)[-1,0].numpy()
# Pass the prediction along with the previous hidden state
# as the next inputs to the model
input_eval = tf.expand_dims([predicted_id], 0)
# Hint: consider what format the prediction is in vs. the output
text_generated.append(idx2char[predicted_id])
return (start_string + ''.join(text_generated))
generated_text = generate_text(model, start_string="X", generation_length=30000)