-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrbm.py
390 lines (257 loc) · 15.4 KB
/
rbm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
from util import *
class RestrictedBoltzmannMachine():
'''
For more details : A Practical Guide to Training Restricted Boltzmann Machines https://www.cs.toronto.edu/~hinton/absps/guideTR.pdf
'''
def __init__(self, ndim_visible, ndim_hidden, is_bottom=False, image_size=[28,28], is_top=False, n_labels=10, batch_size=10):
"""
Args:
ndim_visible: Number of units in visible layer.
ndim_hidden: Number of units in hidden layer.
is_bottom: True only if this rbm is at the bottom of the stack in a deep belief net. Used to interpret visible layer as image data with dimensions "image_size".
image_size: Image dimension for visible layer.
is_top: True only if this rbm is at the top of stack in deep beleif net. Used to interpret visible layer as concatenated with "n_label" unit of label data at the end.
n_label: Number of label categories.
batch_size: Size of mini-batch.
"""
self.ndim_visible = ndim_visible
self.ndim_hidden = ndim_hidden
self.is_bottom = is_bottom
if is_bottom : self.image_size = image_size
self.is_top = is_top
if is_top : self.n_labels = 10
self.batch_size = batch_size
self.delta_bias_v = 0
self.delta_weight_vh = 0
self.delta_bias_h = 0
self.bias_v = np.random.normal(loc=0.0, scale=0.01, size=(self.ndim_visible))
self.weight_vh = np.random.normal(loc=0.0, scale=0.01, size=(self.ndim_visible,self.ndim_hidden))
self.bias_h = np.random.normal(loc=0.0, scale=0.01, size=(self.ndim_hidden))
self.delta_weight_v_to_h = 0
self.delta_weight_h_to_v = 0
self.weight_v_to_h = None
self.weight_h_to_v = None
self.learning_rate = 0.01
self.momentum = 0.7
self.print_period = 5000
self.rf = { # receptive-fields. Only applicable when visible layer is input data
"period" : 5000, # iteration period to visualize
"grid" : [5,5], # size of the grid
"ids" : np.random.randint(0,self.ndim_hidden,25) # pick some random hidden units
}
return
def cd1(self, visible_trainset, n_iterations=10000):
"""Contrastive Divergence with k=1 full alternating Gibbs sampling
Args:
visible_trainset: training data for this rbm, shape is (size of training set, size of visible layer)
n_iterations: number of iterations of learning (each iteration learns a mini-batch)
"""
print ("learning CD1")
n_samples = visible_trainset.shape[0]
n_mini_batches = n_samples//self.batch_size
iteration_errors = []
for it in range(n_iterations):
idx_samples = np.arange(n_samples)
np.random.shuffle(idx_samples)
for i_mini_batch in range(n_mini_batches):
low_limit_idx_mini_batch = i_mini_batch*self.batch_size
up_limit_idx_mini_batch = (i_mini_batch+1)*self.batch_size
idx_mini_batch = idx_samples[low_limit_idx_mini_batch:up_limit_idx_mini_batch]
visible_trainset_mini_batch = visible_trainset[idx_mini_batch]
# [TODO TASK 4.1] run k=1 alternating Gibbs sampling : v_0 -> h_0 -> v_1 -> h_1.
# you may need to use the inference functions 'get_h_given_v' and 'get_v_given_h'.
# note that inference methods returns both probabilities and activations (samples from probablities) and you may have to decide when to use what.
v_0 = visible_trainset_mini_batch
_, h_0 = self.get_h_given_v(v_0)
prob_v_1, v_1 = self.get_v_given_h(h_0)
prob_h_1, h_1 = self.get_h_given_v(v_1)
# _, h_1 = get_h_given_v(v_1)
#h_1 = prob_h_1.copy() # Because we just have one step of gibbs sampling
#v_0_averaged = np.sum(v_0, axis=0)
#h_0_averaged = np.sum(h_0, axis=0)
#v_1_averaged = np.sum(v_1, axis=0)
#h_1_averaged = np.sum(h_1, axis=0)
self.update_params(v_0,h_0,prob_v_1,prob_h_1)
# visualize once in a while when visible layer is input images
if it % self.rf["period"] == 0 and self.is_bottom:
print("I should plot")
viz_rf(weights=self.weight_vh[:,self.rf["ids"]].reshape((self.image_size[0],self.image_size[1],-1)), it=it, grid=self.rf["grid"])
# print progress
if it % self.print_period == 0 :
print ("iteration=%7d recon_loss=%4.4f"%(it, np.linalg.norm(visible_trainset - visible_trainset)))
_, h = self.get_h_given_v(visible_trainset)
_, v = self.get_v_given_h(h)
iteration_errors.append(np.sum((visible_trainset-v)**2)/n_samples)
print(iteration_errors)
return iteration_errors
def update_params(self, v_0, h_0, v_k, h_k, add_momentum=False, add_weight_decay=False):
"""Update the weight and bias parameters.
You could also add weight decay and momentum for weight updates.
Args:
v_0: activities or probabilities of visible layer (data to the rbm)
h_0: activities or probabilities of hidden layer
v_k: activities or probabilities of visible layer
h_k: activities or probabilities of hidden layer
all args have shape (size of mini-batch, size of respective layer)
"""
# [TODO TASK 4.1] get the gradients from the arguments (replace the 0s below) and update the weight and bias parameters
###### Calculation of delta_weight_vh
prod_initial_v_h = np.dot(np.transpose(v_0), h_0)/self.batch_size
prod_hat_v_h = np.dot(np.transpose(v_k), h_k)/self.batch_size
current_delta_weight_vh = prod_initial_v_h - prod_hat_v_h
###### Calculation of visible bias
v_0_sum = np.sum(v_0, axis=0)
v_k_sum = np.sum(v_k, axis=0)
current_delta_bias_v = v_0_sum - v_k_sum
###### Calculation of hidden bias
h_0_sum = np.sum(h_0, axis=0)
h_k_sum = np.sum(h_k, axis=0)
current_delta_bias_h = h_0_sum - h_k_sum
if add_weight_decay:
pass
if add_momentum:
current_delta_weight_vh += self.momentum * self.delta_weight_vh
current_delta_bias_v += self.momentum * self.delta_bias_v
current_delta_bias_h += self.momentum * self.delta_bias_h
self.delta_weight_vh = self.learning_rate*current_delta_weight_vh
self.delta_bias_v = self.learning_rate*current_delta_bias_v
self.delta_bias_h = self.learning_rate*current_delta_bias_h
"""
# What was written in the code
# With this structure is not possible to multiply the momentum
self.delta_bias_v += self.learning_rate * current_delta_bias_v
self.delta_weight_vh += self.learning_rate * current_delta_bias_h
self.delta_bias_h += self.learning_rate * current_delta_bias_h
"""
self.bias_v += self.delta_bias_v
self.weight_vh += self.delta_weight_vh
self.bias_h += self.delta_bias_h
return
def get_h_given_v(self,visible_minibatch):
"""Compute probabilities p(h|v) and activations h ~ p(h|v)
Uses undirected weight "weight_vh" and bias "bias_h"
Args:
visible_minibatch: shape is (size of mini-batch, size of visible layer)
Returns:
tuple ( p(h|v) , h)
both are shaped (size of mini-batch, size of hidden layer)
"""
assert self.weight_vh is not None
n_samples = visible_minibatch.shape[0]
inside_term = self.bias_h + visible_minibatch @ self.weight_vh
prob_h_given_v = sigmoid(inside_term)
h = sample_binary(prob_h_given_v)
return prob_h_given_v, h
def get_v_given_h(self,hidden_minibatch):
"""Compute probabilities p(v|h) and activations v ~ p(v|h)
Uses undirected weight "weight_vh" and bias "bias_v"
Args:
hidden_minibatch: shape is (size of mini-batch, size of hidden layer)
Returns:
tuple ( p(v|h) , v)
both are shaped (size of mini-batch, size of visible layer)
"""
assert self.weight_vh is not None
n_samples = hidden_minibatch.shape[0]
if self.is_top:
"""
Here visible layer has both data and labels. Compute total input for each unit (identical for both cases), \
and split into two parts, something like support[:, :-self.n_labels] and support[:, -self.n_labels:]. \
Then, for both parts, use the appropriate activation function to get probabilities and a sampling method \
to get activities. The probabilities as well as activities can then be concatenated back into a normal visible layer.
"""
# [TODO TASK 4.1] compute probabilities and activations (samples from probabilities) of visible layer (replace the pass below). \
# Note that this section can also be postponed until TASK 4.2, since in this task, stand-alone RBMs do not contain labels in visible layer.
inside_term = self.bias_v + hidden_minibatch @ np.transpose(self.weight_vh)
# Calculate probabilites with respective activation functions
prob_v_given_h_1 = sigmoid(inside_term[:, :-self.n_labels])
prob_v_given_h_2 = softmax(inside_term[:, -self.n_labels:])
# Sample to get v from respective probabilities + sampling functions
v1 = sample_binary(prob_v_given_h_1)
v2 = sample_categorical(prob_v_given_h_2)
# Concatenate binaries and labels
prob_v_given_h = np.concatenate((prob_v_given_h_1, prob_v_given_h_2), axis = 1)
v = np.concatenate((v1, v2), axis = 1)
else:
inside_term = self.bias_v + hidden_minibatch @ np.transpose(self.weight_vh)
prob_v_given_h = sigmoid(inside_term)
v = sample_binary(prob_v_given_h)
return prob_v_given_h, v
""" rbm as a belief layer : the functions below do not have to be changed until running a deep belief net """
def untwine_weights(self):
self.weight_v_to_h = np.copy( self.weight_vh )
self.weight_h_to_v = np.copy( np.transpose(self.weight_vh) )
self.weight_vh = None
def get_h_given_v_dir(self,visible_minibatch):
"""Compute probabilities p(h|v) and activations h ~ p(h|v)
Uses directed weight "weight_v_to_h" and bias "bias_h"
Args:
visible_minibatch: shape is (size of mini-batch, size of visible layer)
Returns:
tuple ( p(h|v) , h)
both are shaped (size of mini-batch, size of hidden layer)
"""
assert self.weight_v_to_h is not None
# [TODO TASK 4.2] perform same computation as the function 'get_h_given_v' but with directed connections (replace the zeros below)
inside_term = self.bias_h + visible_minibatch @ self.weight_v_to_h
prob_h_given_v = sigmoid(inside_term)
h = sample_binary(prob_h_given_v)
return prob_h_given_v, h
def get_v_given_h_dir(self,hidden_minibatch):
"""Compute probabilities p(v|h) and activations v ~ p(v|h)
Uses directed weight "weight_h_to_v" and bias "bias_v"
Args:
hidden_minibatch: shape is (size of mini-batch, size of hidden layer)
Returns:
tuple ( p(v|h) , v)
both are shaped (size of mini-batch, size of visible layer)
"""
assert self.weight_h_to_v is not None
if self.is_top:
"""
Here visible layer has both data and labels. Compute total input for each unit (identical for both cases), \
and split into two parts, something like support[:, :-self.n_labels] and support[:, -self.n_labels:]. \
Then, for both parts, use the appropriate activation function to get probabilities and a sampling method \
to get activities. The probabilities as well as activities can then be concatenated back into a normal visible layer.
"""
# [TODO TASK 4.2] Note that even though this function performs same computation as 'get_v_given_h' but with directed connections,
# this case should never be executed : when the RBM is a part of a DBN and is at the top, it will have not have directed connections.
# Appropriate code here is to raise an error (replace pass below)
raise Exception("Top RMB: call get_v_given_h instead.")
else:
#print("minibatch shape: ", hidden_minibatch.shape)
#print("weight h to v shape: ", self.weight_h_to_v.shape)
inside_term = self.bias_v + hidden_minibatch @ self.weight_h_to_v
prob_v_given_h = sigmoid(inside_term)
v = sample_binary(prob_v_given_h)
return prob_v_given_h, v
def update_generate_params(self,inps,trgs,preds):
"""Update generative weight "weight_h_to_v" and bias "bias_v"
Args:
inps: activities or probabilities of input unit
trgs: activities or probabilities of output unit (target)
preds: activities or probabilities of output unit (prediction)
all args have shape (size of mini-batch, size of respective layer)
"""
# [TODO TASK 4.3] find the gradients from the arguments (replace the 0s below) and update the weight and bias parameters.
reconstructed_diff = trgs-preds
self.delta_weight_h_to_v = self.learning_rate * np.transpose(inps) @ reconstructed_diff # Before was +=
self.delta_bias_v = self.learning_rate * np.mean(reconstructed_diff, axis=0) # Before was +=
self.weight_h_to_v += self.delta_weight_h_to_v
self.bias_v += self.delta_bias_v
return
def update_recognize_params(self,inps,trgs,preds):
"""Update recognition weight "weight_v_to_h" and bias "bias_h"
Args:
inps: activities or probabilities of input unit
trgs: activities or probabilities of output unit (target)
preds: activities or probabilities of output unit (prediction)
all args have shape (size of mini-batch, size of respective layer)
"""
# [TODO TASK 4.3] find the gradients from the arguments (replace the 0s below) and update the weight and bias parameters.
reconstructed_diff = trgs-preds
self.delta_weight_v_to_h = self.learning_rate * np.transpose(inps) @ reconstructed_diff
self.delta_bias_h = self.learning_rate * np.mean(reconstructed_diff, axis=0)
self.weight_v_to_h += self.delta_weight_v_to_h
self.bias_h += self.delta_bias_h
return