-
Notifications
You must be signed in to change notification settings - Fork 11
/
tools.py
256 lines (204 loc) · 8.22 KB
/
tools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import tensorflow as tf
import numpy as np
#%%
def conv(layer_name, x, out_channels, kernel_size=[3,3], stride=[1,1,1,1], is_pretrain=True):
'''Convolution op wrapper, use RELU activation after convolution
Args:
layer_name: e.g. conv1, pool1...
x: input tensor, [batch_size, height, width, channels]
out_channels: number of output channels (or comvolutional kernels)
kernel_size: the size of convolutional kernel, VGG paper used: [3,3]
stride: A list of ints. 1-D of length 4. VGG paper used: [1, 1, 1, 1]
is_pretrain: if load pretrained parameters, freeze all conv layers.
Depending on different situations, you can just set part of conv layers to be freezed.
the parameters of freezed layers will not change when training.
Returns:
4D tensor
'''
in_channels = x.get_shape()[-1]
with tf.variable_scope(layer_name):
w = tf.get_variable(name='weights',
trainable=is_pretrain,
shape=[kernel_size[0], kernel_size[1], in_channels, out_channels],
initializer=tf.contrib.layers.xavier_initializer()) # default is uniform distribution initialization
b = tf.get_variable(name='biases',
trainable=is_pretrain,
shape=[out_channels],
initializer=tf.constant_initializer(0.0))
x = tf.nn.conv2d(x, w, stride, padding='SAME', name='conv')
x = tf.nn.bias_add(x, b, name='bias_add')
x = tf.nn.relu(x, name='relu')
return x
#%%
def pool(layer_name, x, kernel=[1,2,2,1], stride=[1,2,2,1], is_max_pool=True):
'''Pooling op
Args:
x: input tensor
kernel: pooling kernel, VGG paper used [1,2,2,1], the size of kernel is 2X2
stride: stride size, VGG paper used [1,2,2,1]
padding:
is_max_pool: boolen
if True: use max pooling
else: use avg pooling
'''
if is_max_pool:
x = tf.nn.max_pool(x, kernel, strides=stride, padding='SAME', name=layer_name)
else:
x = tf.nn.avg_pool(x, kernel, strides=stride, padding='SAME', name=layer_name)
return x
#%%
def batch_norm(x):
'''Batch normlization(I didn't include the offset and scale)
'''
epsilon = 1e-3
batch_mean, batch_var = tf.nn.moments(x, [0])
x = tf.nn.batch_normalization(x,
mean=batch_mean,
variance=batch_var,
offset=None,
scale=None,
variance_epsilon=epsilon)
return x
#%%
def FC_layer(layer_name, x, out_nodes):
'''Wrapper for fully connected layers with RELU activation as default
Args:
layer_name: e.g. 'FC1', 'FC2'
x: input feature map
out_nodes: number of neurons for current FC layer
'''
shape = x.get_shape()
if len(shape) == 4:
size = shape[1].value * shape[2].value * shape[3].value
else:
size = shape[-1].value
with tf.variable_scope(layer_name):
w = tf.get_variable('weights',
shape=[size, out_nodes],
initializer=tf.contrib.layers.xavier_initializer())
b = tf.get_variable('biases',
shape=[out_nodes],
initializer=tf.constant_initializer(0.0))
flat_x = tf.reshape(x, [-1, size]) # flatten into 1D
x = tf.nn.bias_add(tf.matmul(flat_x, w), b)
x = tf.nn.relu(x)
return x
#%%
def loss(logits, labels):
'''Compute loss
Args:
logits: logits tensor, [batch_size, n_classes]
labels: one-hot labels
'''
with tf.name_scope('loss') as scope:
cross_entropy = tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels,name='cross-entropy')
loss = tf.reduce_mean(cross_entropy, name='loss')
tf.summary.scalar(scope+'/loss', loss)
return loss
#%%
def accuracy(logits, labels):
"""Evaluate the quality of the logits at predicting the label.
Args:
logits: Logits tensor, float - [batch_size, NUM_CLASSES].
labels: Labels tensor,
"""
with tf.name_scope('accuracy') as scope:
correct = tf.equal(tf.arg_max(logits, 1), tf.arg_max(labels, 1))
correct = tf.cast(correct, tf.float32)
accuracy = tf.reduce_mean(correct)*100.0
tf.summary.scalar(scope+'/accuracy', accuracy)
return accuracy
#%%
def num_correct_prediction(logits, labels):
"""Evaluate the quality of the logits at predicting the label.
Return:
the number of correct predictions
"""
correct = tf.equal(tf.arg_max(logits, 1), tf.arg_max(labels, 1))
correct = tf.cast(correct, tf.int32)
n_correct = tf.reduce_sum(correct)
return n_correct
#%%
def optimize(loss, learning_rate, global_step):
'''optimization, use Gradient Descent as default
'''
with tf.name_scope('optimizer'):
optimizer = tf.train.GradientDescentOptimizer(learning_rate=learning_rate)
#optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
train_op = optimizer.minimize(loss, global_step=global_step)
return train_op
#%%
def load(data_path, session):
data_dict = np.load(data_path, encoding='latin1').item()
keys = sorted(data_dict.keys())
for key in keys:
with tf.variable_scope(key, reuse=True):
for subkey, data in zip(('weights', 'biases'), data_dict[key]):
session.run(tf.get_variable(subkey).assign(data))
#%%
def test_load():
data_path = './/vgg16_pretrain//vgg16.npy'
data_dict = np.load(data_path, encoding='latin1').item()
keys = sorted(data_dict.keys())
for key in keys:
weights = data_dict[key][0]
biases = data_dict[key][1]
print('\n')
print(key)
print('weights shape: ', weights.shape)
print('biases shape: ', biases.shape)
#%%
def load_with_skip(data_path, session, skip_layer):
data_dict = np.load(data_path, encoding='latin1').item()
for key in data_dict:
if key not in skip_layer:
with tf.variable_scope(key, reuse=True):
for subkey, data in zip(('weights', 'biases'), data_dict[key]):
session.run(tf.get_variable(subkey).assign(data))
#%%
def print_all_variables(train_only=True):
"""Print all trainable and non-trainable variables
without tl.layers.initialize_global_variables(sess)
Parameters
----------
train_only : boolean
If True, only print the trainable variables, otherwise, print all variables.
"""
# tvar = tf.trainable_variables() if train_only else tf.all_variables()
if train_only:
t_vars = tf.trainable_variables()
print(" [*] printing trainable variables")
else:
try: # TF1.0
t_vars = tf.global_variables()
except: # TF0.12
t_vars = tf.all_variables()
print(" [*] printing global variables")
for idx, v in enumerate(t_vars):
print(" var {:3}: {:15} {}".format(idx, str(v.get_shape()), v.name))
#%%
##***** the followings are just for test the tensor size at diferent layers *********##
#%%
def weight(kernel_shape, is_uniform = True):
''' weight initializer
Args:
shape: the shape of weight
is_uniform: boolen type.
if True: use uniform distribution initializer
if False: use normal distribution initizalizer
Returns:
weight tensor
'''
w = tf.get_variable(name='weights',
shape=kernel_shape,
initializer=tf.contrib.layers.xavier_initializer())
return w
#%%
def bias(bias_shape):
'''bias initializer
'''
b = tf.get_variable(name='biases',
shape=bias_shape,
initializer=tf.constant_initializer(0.0))
return b
#%%