forked from yashiro32/speech_recognition
-
Notifications
You must be signed in to change notification settings - Fork 0
/
speech_rec.py
351 lines (295 loc) · 12.8 KB
/
speech_rec.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
#from utils import progress_bar_downloader
import os
import matplotlib.pyplot as plt
#Hosting files on my dropbox since downloading from google code is painful
#Original project hosting is here: https://code.google.com/p/hmm-speech-recognition/downloads/list
#Audio is included in the zip file
link = 'https://dl.dropboxusercontent.com/u/15378192/audio.tar.gz'
dlname = 'audio.tar.gz'
if not os.path.exists('./%s'%dlname):
progress_bar_downloader(link, dlname)
os.system('tar xzf %s'%dlname)
else:
print '%s already downloaded!'%dlname
fpaths = []
labels = []
spoken = []
for f in os.listdir('audio'):
for w in os.listdir('audio/' + f):
fpaths.append('audio/' + f + '/' + w)
labels.append(f)
if f not in spoken:
spoken.append(f)
print 'Words spoken:',spoken
# Files can be heard in Linux using the following commands from the command line
# cat kiwi07.wav | aplay -f S16_LE -t wav -r 8000
# Files are signed 16 bit raw, sample rate 8000
from scipy.io import wavfile
import numpy as np
data = np.zeros((len(fpaths), 32000))
maxsize = -1
for n,file in enumerate(fpaths):
_, d = wavfile.read(file)
data[n, :d.shape[0]] = d
if d.shape[0] > maxsize:
maxsize = d.shape[0]
data = data[:, :maxsize]
#Each sample file is one row in data, and has one entry in labels
print 'Number of files total:',data.shape[0]
all_labels = np.zeros(data.shape[0])
for n, l in enumerate(set(labels)):
all_labels[np.array([i for i, _ in enumerate(labels) if _ == l])] = n
print 'Labels and label indices',all_labels
import scipy
import numpy as np
def stft(x, fftsize=64, overlap_pct=.5):
#Modified from http://stackoverflow.com/questions/2459295/stft-and-istft-in-python
hop = int(fftsize * (1 - overlap_pct))
w = scipy.hanning(fftsize + 1)[:-1]
raw = np.array([np.fft.rfft(w * x[i:i + fftsize]) for i in range(0, len(x) - fftsize, hop)])
return raw[:, :(fftsize / 2)]
plt.plot(data[0, :], color='steelblue')
plt.title('Timeseries example for %s'%labels[0])
plt.xlim(0, 3500)
plt.xlabel('Time (samples)')
plt.ylabel('Amplitude (signed 16 bit)')
plt.figure()
log_freq = 20 * np.log(np.abs(stft(data[0, :])))
print log_freq.shape
plt.imshow(log_freq, cmap='gray', interpolation=None)
plt.xlabel('Freq (bin)')
plt.ylabel('Time (overlapped frames)')
plt.ylim(log_freq.shape[1])
plt.title('PSD of %s example'%labels[0])
from numpy.lib.stride_tricks import as_strided
#Peak detection using the technique described here: http://kkjkok.blogspot.com/2013/12/dsp-snippets_9.html
def peakfind(x, n_peaks, l_size=3, r_size=3, c_size=3, f=np.mean):
win_size = l_size + r_size + c_size
shape = x.shape[:-1] + (x.shape[-1] - win_size + 1, win_size)
strides = x.strides + (x.strides[-1],)
xs = as_strided(x, shape=shape, strides=strides)
def is_peak(x):
centered = (np.argmax(x) == l_size + int(c_size/2))
l = x[:l_size]
c = x[l_size:l_size + c_size]
r = x[-r_size:]
passes = np.max(c) > np.max([f(l), f(r)])
if centered and passes:
return np.max(c)
else:
return -1
r = np.apply_along_axis(is_peak, 1, xs)
top = np.argsort(r, None)[::-1]
heights = r[top[:n_peaks]]
#Add l_size and half - 1 of center size to get to actual peak location
top[top > -1] = top[top > -1] + l_size + int(c_size / 2.)
return heights, top[:n_peaks]
plot_data = np.abs(stft(data[20, :]))[15, :]
values, locs = peakfind(plot_data, n_peaks=6)
fp = locs[values > -1]
fv = values[values > -1]
plt.plot(plot_data, color='steelblue')
plt.plot(fp, fv, 'x', color='darkred')
plt.title('Peak location example')
plt.xlabel('Frequency (bins)')
plt.ylabel('Amplitude')
# This processing (top freq peaks) only works for single speaker case... need better features for multispeaker!
# MFCC (or deep NN/automatic feature extraction) could be interesting
all_obs = []
for i in range(data.shape[0]):
d = np.abs(stft(data[i, :]))
n_dim = 6
obs = np.zeros((n_dim, d.shape[0]))
for r in range(d.shape[0]):
_, t = peakfind(d[r, :], n_peaks=n_dim)
obs[:, r] = t.copy()
if i % 10 == 0:
print "Processed obs %s"%i
all_obs.append(obs)
all_obs = np.atleast_3d(all_obs)
import scipy.stats as st
class gmmhmm:
#This class converted with modifications from https://code.google.com/p/hmm-speech-recognition/source/browse/Word.m
def __init__(self, n_states):
self.n_states = n_states
self.random_state = np.random.RandomState(0)
#Normalize random initial state
self.prior = self._normalize(self.random_state.rand(self.n_states, 1))
self.A = self._stochasticize(self.random_state.rand(self.n_states, self.n_states))
self.mu = None
self.covs = None
self.n_dims = None
def _forward(self, B):
log_likelihood = 0.
T = B.shape[1]
alpha = np.zeros(B.shape)
for t in range(T):
if t == 0:
alpha[:, t] = B[:, t] * self.prior.ravel()
else:
alpha[:, t] = B[:, t] * np.dot(self.A.T, alpha[:, t - 1])
alpha_sum = np.sum(alpha[:, t])
alpha[:, t] /= alpha_sum
log_likelihood = log_likelihood + np.log(alpha_sum)
return log_likelihood, alpha
def _backward(self, B):
T = B.shape[1]
beta = np.zeros(B.shape);
beta[:, -1] = np.ones(B.shape[0])
for t in range(T - 1)[::-1]:
beta[:, t] = np.dot(self.A, (B[:, t + 1] * beta[:, t + 1]))
beta[:, t] /= np.sum(beta[:, t])
return beta
def _state_likelihood(self, obs):
obs = np.atleast_2d(obs)
B = np.zeros((self.n_states, obs.shape[1]))
for s in range(self.n_states):
#Needs scipy 0.14
B[s, :] = st.multivariate_normal.pdf(obs.T, mean=self.mu[:, s].T, cov=self.covs[:, :, s].T)
#This function can (and will!) return values >> 1
#See the discussion here for the equivalent matlab function
#https://groups.google.com/forum/#!topic/comp.soft-sys.matlab/YksWK0T74Ak
#Key line: "Probabilities have to be less than 1,
#Densities can be anything, even infinite (at individual points)."
#This is evaluating the density at individual points...
return B
def _normalize(self, x):
return (x + (x == 0)) / np.sum(x)
def _stochasticize(self, x):
return (x + (x == 0)) / np.sum(x, axis=1)
def _em_init(self, obs):
#Using this _em_init function allows for less required constructor args
if self.n_dims is None:
self.n_dims = obs.shape[0]
if self.mu is None:
subset = self.random_state.choice(np.arange(self.n_dims), size=self.n_states, replace=False)
self.mu = obs[:, subset]
if self.covs is None:
self.covs = np.zeros((self.n_dims, self.n_dims, self.n_states))
self.covs += np.diag(np.diag(np.cov(obs)))[:, :, None]
return self
def _em_step(self, obs):
obs = np.atleast_2d(obs)
B = self._state_likelihood(obs)
T = obs.shape[1]
log_likelihood, alpha = self._forward(B)
beta = self._backward(B)
xi_sum = np.zeros((self.n_states, self.n_states))
gamma = np.zeros((self.n_states, T))
for t in range(T - 1):
partial_sum = self.A * np.dot(alpha[:, t], (beta[:, t] * B[:, t + 1]).T)
xi_sum += self._normalize(partial_sum)
partial_g = alpha[:, t] * beta[:, t]
gamma[:, t] = self._normalize(partial_g)
partial_g = alpha[:, -1] * beta[:, -1]
gamma[:, -1] = self._normalize(partial_g)
expected_prior = gamma[:, 0]
expected_A = self._stochasticize(xi_sum)
expected_mu = np.zeros((self.n_dims, self.n_states))
expected_covs = np.zeros((self.n_dims, self.n_dims, self.n_states))
gamma_state_sum = np.sum(gamma, axis=1)
#Set zeros to 1 before dividing
gamma_state_sum = gamma_state_sum + (gamma_state_sum == 0)
for s in range(self.n_states):
gamma_obs = obs * gamma[s, :]
expected_mu[:, s] = np.sum(gamma_obs, axis=1) / gamma_state_sum[s]
partial_covs = np.dot(gamma_obs, obs.T) / gamma_state_sum[s] - np.dot(expected_mu[:, s], expected_mu[:, s].T)
#Symmetrize
partial_covs = np.triu(partial_covs) + np.triu(partial_covs).T - np.diag(partial_covs)
#Ensure positive semidefinite by adding diagonal loading
expected_covs += .01 * np.eye(self.n_dims)[:, :, None]
self.prior = expected_prior
self.mu = expected_mu
self.covs = expected_covs
self.A = expected_A
return log_likelihood
def fit(self, obs, n_iter=15):
#Support for 2D and 3D arrays
#2D should be n_features, n_dims
#3D should be n_examples, n_features, n_dims
#For example, with 6 features per speech segment, 105 different words
#this array should be size
#(105, 6, X) where X is the number of frames with features extracted
#For a single example file, the array should be size (6, X)
if len(obs.shape) == 2:
for i in range(n_iter):
self._em_init(obs)
log_likelihood = self._em_step(obs)
elif len(obs.shape) == 3:
count = obs.shape[0]
for n in range(count):
for i in range(n_iter):
self._em_init(obs[n, :, :])
log_likelihood = self._em_step(obs[n, :, :])
return self
def transform(self, obs):
#Support for 2D and 3D arrays
#2D should be n_features, n_dims
#3D should be n_examples, n_features, n_dims
#For example, with 6 features per speech segment, 105 different words
#this array should be size
#(105, 6, X) where X is the number of frames with features extracted
#For a single example file, the array should be size (6, X)
if len(obs.shape) == 2:
B = self._state_likelihood(obs)
log_likelihood, _ = self._forward(B)
return log_likelihood
elif len(obs.shape) == 3:
count = obs.shape[0]
out = np.zeros((count,))
for n in range(count):
B = self._state_likelihood(obs[n, :, :])
log_likelihood, _ = self._forward(B)
out[n] = log_likelihood
return out
if __name__ == "__main__":
"""rstate = np.random.RandomState(0)
t1 = np.ones((4, 40)) + .001 * rstate.rand(4, 40)
t1 /= t1.sum(axis=0)
t2 = rstate.rand(*t1.shape)
t2 /= t2.sum(axis=0)
m1 = gmmhmm(2)
m1.fit(t1)
m2 = gmmhmm(2)
m2.fit(t2)
m1t1 = m1.transform(t1)
m2t1 = m2.transform(t1)
print "Likelihoods for test set 1"
print "M1:",m1t1
print "M2:",m2t1
print "Prediction for test set 1"
print "Model", np.argmax([m1t1, m2t1]) + 1
print
m1t2 = m1.transform(t2)
m2t2 = m2.transform(t2)
print "Likelihoods for test set 2"
print "M1:",m1t2
print "M2:",m2t2
print "Prediction for test set 2"
print "Model", np.argmax([m1t2, m2t2]) + 1"""
from sklearn.cross_validation import StratifiedShuffleSplit
sss = StratifiedShuffleSplit(all_labels, test_size=0.1, random_state=0)
for n,i in enumerate(all_obs):
all_obs[n] /= all_obs[n].sum(axis=0)
for train_index, test_index in sss:
X_train, X_test = all_obs[train_index, ...], all_obs[test_index, ...]
y_train, y_test = all_labels[train_index], all_labels[test_index]
print 'Size of training matrix:', X_train.shape
print 'Size of testing matrix:', X_test.shape
ys = set(all_labels)
ms = [gmmhmm(6) for y in ys]
_ = [m.fit(X_train[y_train == y, :, :]) for m, y in zip(ms, ys)]
ps = [m.transform(X_test) for m in ms]
res = np.vstack(ps)
predicted_labels = np.argmax(res, axis=0)
missed = (predicted_labels != y_test)
print 'Test accuracy:%.2f percent'%(100 * (1 - np.mean(missed)))
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(y_test, predicted_labels)
plt.matshow(cm, cmap='gray')
ax = plt.gca()
_ = ax.set_xticklabels([" "] + [l[:2] for l in spoken])
_ = ax.set_yticklabels([" "] + spoken)
plt.title('Confusion matrix, single speaker')
plt.ylabel('True label')
plt.xlabel('Predicted label')