-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy paths3connection.py
144 lines (127 loc) · 5.68 KB
/
s3connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# -*- coding: utf-8 -*-
"""
Created on Sat Jan 4 12:58:13 2014
@author: Lukasz Tracewski
Module for getting data from given S3 bucket.
"""
import os
import sys
import logging
import boto
import scipy.io.wavfile as wav
import Tkinter
import tkFileDialog
class RecordingsFetcher(object):
""" Class for getting WAVE recordings from a given S3 bucket """
def __init__(self):
self._log = logging.getLogger('log.html')
def get_next_recording(self, bucket_name, data_store):
"""
Generator for getting WAVE files. The data will be provided on-demand basis.
Parameters
-------------
bucket_name : string
Name of AWS S3 bucket. If none provided then it is assumed data is available locally.
data_store : string
In case bucket name is provided, this location will be used for storing the data.
Once a recording is there, or no bucket was provided, from this place data will be
read recursively. In none provided, then user has to select a single file through
a file dialog.
Returns
------------
samplerate : int
Rate of the sample in Hz
sample : 1-d array
Wave file read as numpy array of int16
name : string
Name of a wave file
"""
if bucket_name: # Download data from a bucket
self._connect_to_bucket(bucket_name)
for key in self.Bucket.list():
if key.name.endswith('.wav') and not key.name.startswith('5mincounts'):
# self._log.info('Downloading %s', key.name)
path = os.path.join(data_store, key.name)
_make_sure_dir_exists(path)
key.get_contents_to_filename(path) # Download the file
(rate, sample) = wav.read(path)
yield rate, sample.astype('float32'), path
elif data_store: # Get locally stored data
for dirpath, dirnames, filenames in os.walk(data_store):
for filename in [f for f in filenames if f.endswith('.wav')]:
path = os.path.join(dirpath, filename)
(rate, sample) = wav.read(path)
yield rate, sample.astype('float32'), path
else: # Interactive mode - let user select a signle file
root = Tkinter.Tk()
root.withdraw()
filename = tkFileDialog.askopenfilename()
(rate, sample) = wav.read(path)
yield rate, sample.astype('float32'), path
def get_recordings(self, app_config, inq):
"""
Get data from selected location and pass it to the queue.
Parameters
-------------
bucket_name : string
Name of AWS S3 bucket. If none provided then it is assumed data is available locally.
data_store : string
In case bucket name is provided, this location will be used for storing the data.
Once a recording is there, or no bucket was provided, from this place data will be
read recursively. In none provided, then user has to select a single file through
a file dialog.
inq : multiprocessing.Queue
The recordings shall be put on the queue.
Returns
------------
samplerate : int
Rate of the sample in Hz.
sample : 1-d array
Wave file read as numpy array of int16.
name : string
Name of a wave file.
"""
if app_config.bucket: # Download data from a bucket
self._connect_to_bucket(app_config.bucket)
for key in self.Bucket.list():
if key.name.endswith('.wav') and not key.name.startswith('5mincounts'):
# self._log.info('Downloading %s', key.name)
path = os.path.join(app_config.data_store, key.name)
_make_sure_dir_exists(path)
key.get_contents_to_filename(path) # Download the file
(rate, sample) = wav.read(path)
inq.put((rate, sample.astype('float32'), path))
elif app_config.data_store: # Get locally stored data
for dirpath, dirnames, filenames in os.walk(app_config.data_store):
for filename in [f for f in filenames if f.endswith('.wav')]:
path = os.path.join(dirpath, filename)
(rate, sample) = wav.read(path)
inq.put((rate, sample.astype('float32'), path))
else: # Interactive mode - parallel processing on one file makes no sense ...
root = Tkinter.Tk()
root.withdraw()
filename = tkFileDialog.askopenfilename()
(rate, sample) = wav.read(path)
inq.put((rate, sample.astype('float32'), path))
for i in range(app_config.no_processes):
inq.put("STOP")
def _connect_to_bucket(self, bucket_name):
try:
self._log.info('Connecting to S3 ...')
s3 = boto.connect_s3()
except:
self._log.critical('Failure while connecting to S3. Check credentials.')
sys.exit(1)
try:
self._log.info('Connection established. Fetching bucket %s...', bucket_name)
self.Bucket = s3.get_bucket(bucket_name)
except:
self._log.critical('Failure while connecting to bucket. Check if bucket exists.')
sys.exit(1)
self._log.info('Bucket ready.')
return self.Bucket
def _make_sure_dir_exists(filename):
# Create recursively directory if it does not exist.
dir_name = os.path.dirname(filename)
if not os.path.exists(dir_name):
os.makedirs(dir_name)