Skip to content

Commit

Permalink
Make extracting stereo channels more efficient
Browse files Browse the repository at this point in the history
  • Loading branch information
mmcauliffe committed Sep 1, 2016
1 parent c3f7a31 commit 7632670
Showing 1 changed file with 37 additions and 41 deletions.
78 changes: 37 additions & 41 deletions aligner/corpus.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,57 +135,54 @@ def get_sample_rate(file_path):
sr = soundf.getframerate()
return sr

def write_file(wav_path, data, sample_rate):
values = []
for d in data:
packed_value = struct.pack('h', d)
values.append(packed_value)
with wave.open(wav_path, 'wb') as f:
f.setnchannels(1)
f.setframerate(sample_rate)
f.setsampwidth(2)
value_str = b''.join(values)
f.writeframes(value_str)

def oneChannel(fname, chanIdx):
""" list with specified channel's data from multichannel wave with 16-bit data
taken from http://stackoverflow.com/questions/23154400/read-the-data-of-a-single-channel-from-a-stereo-wave-file-in-python"""
f = wave.open(fname, 'rb')
chans = f.getnchannels()
samps = f.getnframes()
samplerate = f.getframerate()
sampwidth = f.getsampwidth()
assert sampwidth == 2
s = f.readframes(samps) #read the all the samples from the file into a byte string
f.close()
unpstr = '<{0}h'.format(samps*chans) #little-endian 16-bit samples
x = list(struct.unpack(unpstr, s)) #convert the byte string into a list of ints
return x[chanIdx::chans], samplerate #return the desired channel

def extract_temp_channel(wav_path, channel, temp_directory):
def extract_temp_channels(wav_path, temp_directory):
'''
Extract a single channel from a stereo file to a new mono wav file
Parameters
----------
wav_path : str
Path to stereo wav file
channel : int
Channel of file to extract
temp_directory : str
Directory to save extracted
'''
data, rate = oneChannel(wav_path, channel)
name, ext = os.path.splitext(wav_path)
base = os.path.basename(name)
if channel == 0:
new_ext = '_A.wav'
else:
new_ext = '_B.wav'
new_path = os.path.join(temp_directory, base + new_ext)
if not os.path.exists(new_path):
write_file(new_path, data, rate)
return new_path
A_path = os.path.join(temp_directory, base + '_A.wav')
B_path = os.path.join(temp_directory, base + '_B.wav')
samp_step = 1000000
if not os.path.exists(A_path):
with wave.open(wav_path, 'rb') as inf, \
wave.open(A_path, 'wb') as af, \
wave.open(B_path, 'wb') as bf:
chans = inf.getnchannels()
samps = inf.getnframes()
samplerate = inf.getframerate()
sampwidth = inf.getsampwidth()
assert sampwidth == 2
af.setnchannels(1)
af.setframerate(samplerate)
af.setsampwidth(sampwidth)
bf.setnchannels(1)
bf.setframerate(samplerate)
bf.setsampwidth(sampwidth)
cur_samp = 0
while cur_samp < samps:
s = inf.readframes(samp_step)
cur_samp += samp_step
act = samp_step
if cur_samp > samps:
act -= (cur_samp - samps)

unpstr = '<{0}h'.format(act*chans) #little-endian 16-bit samples
x = list(struct.unpack(unpstr, s)) #convert the byte string into a list of ints
values = [struct.pack('h', d) for d in x[0::chans]]
value_str = b''.join(values)
af.writeframes(value_str)
values = [struct.pack('h', d) for d in x[1::chans]]
value_str = b''.join(values)
bf.writeframes(value_str)
return A_path, B_path

class Corpus(object):
'''
Expand Down Expand Up @@ -281,8 +278,7 @@ def __init__(self, directory, output_directory,
A_name = file_name + "_A"
B_name = file_name + "_B"

A_path = extract_temp_channel(wav_path, 0, self.temp_directory)
B_path = extract_temp_channel(wav_path, 1, self.temp_directory)
A_path, B_path = extract_temp_channels(wav_path, self.temp_directory)
elif n_channels > 2:
raise(Exception('More than two channels'))
for i, ti in enumerate(tg.tiers):
Expand Down

0 comments on commit 7632670

Please sign in to comment.