Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download large time period #13

Open
wants to merge 1 commit into
base: example_notebooks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions download_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import glob
import os
import wave
from datetime import datetime
from orca_hls_utils import DateRangeHLSStream, datetime_utils
from orca_hls_utils.datetime_utils import get_unix_time_from_datetime_pdt

if __name__ == '__main__':

stream_base = 'https://s3-us-west-2.amazonaws.com/streaming-orcasound-net/rpi_orcasound_lab'
polling_interval = 2 * 3600 # [s] time period to extract from start_unix_time

start_unix_time = get_unix_time_from_datetime_pdt(datetime.strptime("2022-09-03 16:15:00", "%Y-%m-%d %H:%M:%S")) # -> to do

end_unix_time = start_unix_time + polling_interval
wav_dir = 'prout'
if not os.path.exists(wav_dir):
os.makedirs(wav_dir)

stream = DateRangeHLSStream.DateRangeHLSStream(stream_base=stream_base,
polling_interval=polling_interval,
start_unix_time=start_unix_time,
end_unix_time=end_unix_time,
wav_dir=wav_dir,
overwrite_output=True,
real_time=False)
nb_folders_ori = len(stream.valid_folders)
for _ in range(nb_folders_ori):
stream.get_next_clip()

if len(stream.valid_folders) > 1:
infiles = glob.glob(os.path.join(wav_dir, '*.wav'))
outfile = infiles[-1].replace('.wav', '_concat.wav')

data = []
for infile in infiles:
w = wave.open(infile, 'rb')
data.append([w.getparams(), w.readframes(w.getnframes())])
w.close()

output = wave.open(outfile, 'wb')
output.setparams(data[0][0])
for i in range(len(data)):
output.writeframes(data[i][1])
output.close()

print('ok')



# # start_unix_time = 1657540800 # 11/07/2022 8am
# # end_unix_time = 1657569600 # 11/07/2022 4pm
# start_unix_time = 1657551600 # 11/07/2022 11am
# end_unix_time = 1657558800 # 11/07/2022 1pm bucket 1657546219
# # start_unix_time = 1657539000 # 11/07/2022 11am
# # end_unix_time = 1657553400 # 11/07/2022 1pm bucket 1657546219
# # start_unix_time = 1659121800 # 29/07/2022 3pm
# # end_unix_time = 1659132600 # 29/07/2022 6pm 2 buckets: [1659101419, 1659123019]
# # start_unix_time = 1659752820 # 05/08/2022 10:27pm
# # end_unix_time = 1659758400 # 06/08/2022 12am bucket 1659749419
# # start_unix_time = 1653426000 # 24/05/2022 5pm
# # end_unix_time = 1653433200 # 24/05/2022 6pm bucket 1659749419
# # start_unix_time = 1646166600 # 01/03/2022 3:30pm
# # end_unix_time = 1646173800 # 01/03/2022 5:30pm bucket [1646145019=9h30-15h30, 1646166619]
# # annot_unix_time = datetime.strptime("2022-07-11 13:30:00", "%Y-%m-%d %H:%M:%S").timestamp()
# # annot_unix_time = 1657567800 # "2022-07-11 15:30:00"
# start_unix_time = 1662634800 # 080922 6:00
# start_unix_time = 1668269923 # 121122 8:00:23 PDT

# does not work 2022-07-11 18:20:00 / 2022-09-07 18:30:00 /
144 changes: 92 additions & 52 deletions src/orca_hls_utils/DateRangeHLSStream.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,49 @@

from . import s3_utils
import boto3
from botocore import UNSIGNED
from botocore.config import Config

import m3u8
# Native imports
import math
from . import scraper
import ffmpeg
import os
from datetime import datetime
from datetime import timedelta
from pytz import timezone
import time
import sys
from pathlib import Path
from . import datetime_utils
from tempfile import TemporaryDirectory
import shutil

# Required imports
import ffmpeg
import m3u8
from pytz import timezone

# Local imports
from . import s3_utils, datetime_utils, scraper


class fragile(object):
#https://stackoverflow.com/questions/11195140/break-or-exit-out-of-with-statement
class Break(Exception):
"""Break out of the with statement"""

def __init__(self, value):
self.value = value

def __enter__(self):
return self.value.__enter__()

def __exit__(self, etype, value, traceback):
error = self.value.__exit__(etype, value, traceback)
if etype == self.Break:
return True
return error


def get_readable_clipname(hydrophone_id, cliptime_utc):
# cliptime is of the form 2020-09-27T00/16/55.677242Z
cliptime_utc = timezone('UTC').localize(cliptime_utc)
date = cliptime_utc.astimezone(timezone('US/Pacific'))
date_format='%Y_%m_%d_%H_%M_%S_%Z'
date_format = '%Y_%m_%d_%H_%M_%S_%Z'
clipname = date.strftime(date_format)
return hydrophone_id + "_" + clipname, date

#TODO: Handle date ranges that don't exist

# TODO: Handle date ranges that don't exist
class DateRangeHLSStream():
"""
stream_base = 'https://s3-us-west-2.amazonaws.com/streaming-orcasound-net/rpi_orcasound_lab'
Expand All @@ -52,6 +70,9 @@ def __init__(self, stream_base, polling_interval, start_unix_time, end_unix_time
self.real_time = real_time
self.is_end_of_stream = False

# Create wav dir if necessary
Path(self.wav_dir).mkdir(parents=True, exist_ok=True)

# query the stream base for all m3u8 files between the timestamps

# split the stream base into bucket and folder
Expand All @@ -64,7 +85,7 @@ def __init__(self, stream_base, polling_interval, start_unix_time, end_unix_time
self.folder_name = tokens[1]
prefix = self.folder_name + "/hls/"

# returns folder names corresponding to epochs, this grows as more data is added, we should probably maintain a list of
# returns folder names corresponding to epochs, this grows as more data is added, we should probably maintain a list of
# hydrophone folders that exist
all_hydrophone_folders = s3_utils.get_all_folders(self.s3_bucket, prefix=prefix)
print("Found {} folders in all for hydrophone".format(len(all_hydrophone_folders)))
Expand All @@ -75,7 +96,9 @@ def __init__(self, stream_base, polling_interval, start_unix_time, end_unix_time
self.current_folder_index = 0
self.current_clip_start_time = self.start_unix_time

def get_next_clip(self, current_clip_name = None):
self.change_folder = False

def get_next_clip(self, current_clip_name=None):
"""

"""
Expand All @@ -96,61 +119,70 @@ def get_next_clip(self, current_clip_name = None):

if time_to_sleep > 0:
time.sleep(time_to_sleep)

# read in current m3u8 file
# stream_url for the current AWS folder
stream_url = "{}/hls/{}/live.m3u8".format(
(self.stream_base), (current_folder))
stream_obj = m3u8.load(stream_url)
num_total_segments = len(stream_obj.segments)
target_duration = sum([item.duration for item in stream_obj.segments])/num_total_segments

if self.change_folder:
self.polling_interval_in_seconds = math.ceil(self.tmp_polling*target_duration)

num_segments_in_wav_duration = math.ceil(self.polling_interval_in_seconds/target_duration)

# calculate the start index by computing the current time - start of current folder
segment_start_index = math.ceil(datetime_utils.get_difference_between_times_in_seconds(self.current_clip_start_time, current_folder)/target_duration)
segment_end_index = segment_start_index + num_segments_in_wav_duration

if segment_end_index < num_total_segments:
self.change_folder = False

if segment_end_index > num_total_segments:
# move to the next folder and increment the current_clip_start_time to the new
self.current_folder_index += 1
self.current_clip_start_time = self.valid_folders[self.current_folder_index]
return None, None, None
self.tmp_polling = segment_end_index - num_total_segments
segment_end_index = num_total_segments
self.change_folder = True

# Can get the whole segment so update the clip_start_time for the next clip
# We do this before we actually do the pulling in case there is a problem with this clip
self.current_clip_start_time = datetime_utils.add_interval_to_unix_time(self.current_clip_start_time, self.polling_interval_in_seconds)

# Create tmp path to hold .ts segments
tmp_path = "tmp_path"
os.makedirs(tmp_path,exist_ok=True)

file_names = []
for i in range(segment_start_index, segment_end_index):
audio_segment = stream_obj.segments[i]
base_path = audio_segment.base_uri
file_name = audio_segment.uri
audio_url = base_path + file_name
try:
scraper.download_from_url(audio_url,tmp_path)
file_names.append(file_name)
except Exception:
print("Skipping",audio_url,": error.")

# concatentate all .ts files with ffmpeg
hls_file = (clipname+".ts")
audio_file = (clipname+".wav")
wav_file_path = os.path.join(self.wav_dir, audio_file)
filenames_str = " ".join(file_names)
concat_ts_cmd = "cd {tp} && cat {fstr} > {hls_file}".format(tp=tmp_path, fstr=filenames_str, hls_file=hls_file)
os.system(concat_ts_cmd)

# read the concatenated .ts and write to wav
stream = ffmpeg.input(os.path.join(tmp_path, Path(hls_file)))
stream = ffmpeg.output(stream, wav_file_path)
ffmpeg.run(stream, overwrite_output=self.overwrite_output, quiet=False)

# clear the tmp_path
os.system(f'rm -rf {tmp_path}')
with fragile(TemporaryDirectory()) as tmp_path:
os.makedirs(tmp_path, exist_ok=True)

file_names = []
for i in range(segment_start_index, segment_end_index):
audio_segment = stream_obj.segments[i]
base_path = audio_segment.base_uri
file_name = audio_segment.uri
audio_url = base_path + file_name
try:
scraper.download_from_url(audio_url, tmp_path)
file_names.append(file_name)
except Exception:
print("Skipping", audio_url, ": error.")

# concatentate all .ts files
hls_file = os.path.join(tmp_path, Path(clipname+".ts"))
with open(hls_file, 'wb') as wfd:
for f in file_names:
with open(os.path.join(tmp_path, f), 'rb') as fd:
shutil.copyfileobj(fd, wfd)

# read the concatenated .ts and write to wav
audio_file = (clipname+".wav")
wav_file_path = os.path.join(self.wav_dir, audio_file)

if len(file_names) == 0:
# self.to_pop.append(self.current_folder_index)
raise fragile.Break

stream = ffmpeg.input(os.path.join(tmp_path, Path(hls_file)))
stream = ffmpeg.output(stream, wav_file_path)
ffmpeg.run(stream, overwrite_output=self.overwrite_output, quiet=False)

# If we're in demo mode, we need to fake timestamps to make it seem like the date range is real-time
if current_clip_name:
Expand All @@ -164,6 +196,14 @@ def get_next_clip(self, current_clip_name = None):
# change clip_start_time - this has to be in UTC so that the email can be in PDT
clip_start_time = current_clip_name.isoformat() + "Z"

if self.change_folder:
# move to the next folder and increment the current_clip_start_time to the new
self.current_folder_index += 1
if self.current_folder_index > len(self.valid_folders) - 1:
print("No more valid folders")
else:
self.current_clip_start_time = self.valid_folders[self.current_folder_index]

# Get new index
return wav_file_path, clip_start_time, current_clip_name

Expand Down
10 changes: 10 additions & 0 deletions src/orca_hls_utils/datetime_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ def get_unix_time_from_datetime_utc(dt_utc):
unix_time = int(dt_pst.timestamp())

return unix_time

def get_unix_time_from_datetime_pdt(dt_pst):

dt_aware = timezone('US/Pacific').localize(dt_pst)
dt_utc = dt_aware.astimezone(timezone('UTC'))

# convert to PST
unix_time = int(dt_utc.timestamp())

return unix_time