-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrnn.py
126 lines (90 loc) · 3.38 KB
/
rnn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import math
import numpy as np
from keras.layers import Input, LSTM, Dense
from keras.models import Model
import keras.backend as K
z_dim = 32
a_dim = 3
input_dim = z_dim + a_dim
lstm_units = 256
gaussian_mixtures = 5
mdn_units = gaussian_mixtures * 3 * z_dim # phi, mu, sigma
EPOCHS = 20
BATCH_SIZE = 32
def get_mixture_coef(output):
d = gaussian_mixtures * z_dim
seq_length = K.shape(output)[1]
pi = output[:,:,:d]
mu = output[:,:,d:(2*d)]
log_sigma = output[:,:,(2*d):(3*d)]
pi = K.reshape(pi, [-1, seq_length, gaussian_mixtures, z_dim])
mu = K.reshape(mu, [-1, seq_length, gaussian_mixtures, z_dim])
log_sigma = K.reshape(log_sigma, [-1, seq_length, gaussian_mixtures, z_dim])
# Pi put into softmax to ensure sum adds to one, and each
# mixture probability is positive
pi = K.exp(pi) / K.sum(K.exp(pi), axis=2, keepdims=True)
sigma = K.exp(log_sigma)
return pi, mu, sigma
oneDivSqrtTwoPI = 1 / math.sqrt(2*math.pi)
def pdf(y, mu, sigma, pi):
seq_length = K.shape(y)[1]
y = K.tile(y, (1, 1, gaussian_mixtures))
y = K.reshape(y, [-1, seq_length, gaussian_mixtures, z_dim])
# Calculate pdfs of z in individiual gaussians
pdfs = oneDivSqrtTwoPI*(1/sigma)*K.exp(-K.square(y-mu)/(2*K.square(sigma)))
# Take weighted sum of pdfs
return K.sum(pi*pdfs, axis=2)
def r_loss(y, output):
"""
Reconstruction loss
Log likelihood of generated prob dist "explaining" y
"""
pi, mu, sigma = get_mixture_coef(output)
return -K.mean(K.log(pdf(y, mu, sigma, pi)), axis=(1, 2))
def kl_loss(y, output):
"""
Kullback-Leibler divergence loss term
Measure difference between the distribution of z, to an IID gaussian
vector with mean = 0, var = 1
"""
pi, mu, sigma = get_mixture_coef(output)
return -0.5*K.mean(1+K.log(sigma)-K.square(mu)-sigma, axis=[1, 2, 3])
def loss(y, output):
return r_loss(y, output) + kl_loss(y, output)
def train_model():
# Training model
inputs = Input(shape=(None, input_dim))
lstm = LSTM(lstm_units, return_sequences=True, return_state=True)
lstm_out, _, _ = lstm(inputs)
outputs = Dense(mdn_units, name='rnn_mdn_out')(lstm_out)
rnn = Model(inputs, outputs)
# Prediction model
# inputs_h - hidden state
# inputs_c - last output from LSTM
#
# Agent should keep the last hidden state and output,
# and feed into this model to get the next hidden state
inputs_h = Input(shape=(lstm_units,))
inputs_c = Input(shape=(lstm_units,))
_, state_h, state_c = lstm(inputs, initial_state=[inputs_h, inputs_c])
forward = Model([inputs, inputs_h, inputs_c], [state_h, state_c])
rnn.compile('adam', loss)
for i in range(1, 2):
print('Loading batch %d...' % i)
z = np.load('./data/z-%i.npy' % i)
actions = np.load('./data/actions-%i.npy' % i)
X = []
Y = []
for seq_z, seq_a in zip(z, actions):
seq_za = []
for frame_z, frame_a in zip(seq_z, seq_a):
seq_za.append(np.hstack([frame_z, frame_a]))
# Store x_i as z_i + a_i, and y_i as z_i+1
X.append(seq_za[:-1])
Y.append(seq_z[1:])
X = np.array(X)
Y = np.array(Y)
rnn.fit(X, Y, shuffle=True, epochs=EPOCHS, batch_size=BATCH_SIZE,
validation_split=0.2)
rnn.save('mdn-rnn.h5')
forward.save('mdn-rnn-forward.h5')