diff --git a/download_example.py b/download_example.py new file mode 100644 index 0000000..43e5509 --- /dev/null +++ b/download_example.py @@ -0,0 +1,70 @@ +import glob +import os +import wave +from datetime import datetime +from orca_hls_utils import DateRangeHLSStream, datetime_utils +from orca_hls_utils.datetime_utils import get_unix_time_from_datetime_pdt + +if __name__ == '__main__': + + stream_base = 'https://s3-us-west-2.amazonaws.com/streaming-orcasound-net/rpi_orcasound_lab' + polling_interval = 2 * 3600 # [s] time period to extract from start_unix_time + + start_unix_time = get_unix_time_from_datetime_pdt(datetime.strptime("2022-09-03 16:15:00", "%Y-%m-%d %H:%M:%S")) # -> to do + + end_unix_time = start_unix_time + polling_interval + wav_dir = 'prout' + if not os.path.exists(wav_dir): + os.makedirs(wav_dir) + + stream = DateRangeHLSStream.DateRangeHLSStream(stream_base=stream_base, + polling_interval=polling_interval, + start_unix_time=start_unix_time, + end_unix_time=end_unix_time, + wav_dir=wav_dir, + overwrite_output=True, + real_time=False) + nb_folders_ori = len(stream.valid_folders) + for _ in range(nb_folders_ori): + stream.get_next_clip() + + if len(stream.valid_folders) > 1: + infiles = glob.glob(os.path.join(wav_dir, '*.wav')) + outfile = infiles[-1].replace('.wav', '_concat.wav') + + data = [] + for infile in infiles: + w = wave.open(infile, 'rb') + data.append([w.getparams(), w.readframes(w.getnframes())]) + w.close() + + output = wave.open(outfile, 'wb') + output.setparams(data[0][0]) + for i in range(len(data)): + output.writeframes(data[i][1]) + output.close() + + print('ok') + + + +# # start_unix_time = 1657540800 # 11/07/2022 8am +# # end_unix_time = 1657569600 # 11/07/2022 4pm +# start_unix_time = 1657551600 # 11/07/2022 11am +# end_unix_time = 1657558800 # 11/07/2022 1pm bucket 1657546219 +# # start_unix_time = 1657539000 # 11/07/2022 11am +# # end_unix_time = 1657553400 # 11/07/2022 1pm bucket 1657546219 +# # start_unix_time = 1659121800 # 29/07/2022 3pm +# # end_unix_time = 1659132600 # 29/07/2022 6pm 2 buckets: [1659101419, 1659123019] +# # start_unix_time = 1659752820 # 05/08/2022 10:27pm +# # end_unix_time = 1659758400 # 06/08/2022 12am bucket 1659749419 +# # start_unix_time = 1653426000 # 24/05/2022 5pm +# # end_unix_time = 1653433200 # 24/05/2022 6pm bucket 1659749419 +# # start_unix_time = 1646166600 # 01/03/2022 3:30pm +# # end_unix_time = 1646173800 # 01/03/2022 5:30pm bucket [1646145019=9h30-15h30, 1646166619] +# # annot_unix_time = datetime.strptime("2022-07-11 13:30:00", "%Y-%m-%d %H:%M:%S").timestamp() +# # annot_unix_time = 1657567800 # "2022-07-11 15:30:00" +# start_unix_time = 1662634800 # 080922 6:00 +# start_unix_time = 1668269923 # 121122 8:00:23 PDT + +# does not work 2022-07-11 18:20:00 / 2022-09-07 18:30:00 / \ No newline at end of file diff --git a/src/orca_hls_utils/DateRangeHLSStream.py b/src/orca_hls_utils/DateRangeHLSStream.py index 8aa4221..74ae175 100644 --- a/src/orca_hls_utils/DateRangeHLSStream.py +++ b/src/orca_hls_utils/DateRangeHLSStream.py @@ -1,31 +1,49 @@ - -from . import s3_utils -import boto3 -from botocore import UNSIGNED -from botocore.config import Config - -import m3u8 +# Native imports import math -from . import scraper -import ffmpeg import os from datetime import datetime -from datetime import timedelta -from pytz import timezone import time -import sys from pathlib import Path -from . import datetime_utils +from tempfile import TemporaryDirectory +import shutil + +# Required imports +import ffmpeg +import m3u8 +from pytz import timezone + +# Local imports +from . import s3_utils, datetime_utils, scraper + + +class fragile(object): + #https://stackoverflow.com/questions/11195140/break-or-exit-out-of-with-statement + class Break(Exception): + """Break out of the with statement""" + + def __init__(self, value): + self.value = value + + def __enter__(self): + return self.value.__enter__() + + def __exit__(self, etype, value, traceback): + error = self.value.__exit__(etype, value, traceback) + if etype == self.Break: + return True + return error + def get_readable_clipname(hydrophone_id, cliptime_utc): # cliptime is of the form 2020-09-27T00/16/55.677242Z cliptime_utc = timezone('UTC').localize(cliptime_utc) date = cliptime_utc.astimezone(timezone('US/Pacific')) - date_format='%Y_%m_%d_%H_%M_%S_%Z' + date_format = '%Y_%m_%d_%H_%M_%S_%Z' clipname = date.strftime(date_format) return hydrophone_id + "_" + clipname, date -#TODO: Handle date ranges that don't exist + +# TODO: Handle date ranges that don't exist class DateRangeHLSStream(): """ stream_base = 'https://s3-us-west-2.amazonaws.com/streaming-orcasound-net/rpi_orcasound_lab' @@ -52,6 +70,9 @@ def __init__(self, stream_base, polling_interval, start_unix_time, end_unix_time self.real_time = real_time self.is_end_of_stream = False + # Create wav dir if necessary + Path(self.wav_dir).mkdir(parents=True, exist_ok=True) + # query the stream base for all m3u8 files between the timestamps # split the stream base into bucket and folder @@ -64,7 +85,7 @@ def __init__(self, stream_base, polling_interval, start_unix_time, end_unix_time self.folder_name = tokens[1] prefix = self.folder_name + "/hls/" - # returns folder names corresponding to epochs, this grows as more data is added, we should probably maintain a list of + # returns folder names corresponding to epochs, this grows as more data is added, we should probably maintain a list of # hydrophone folders that exist all_hydrophone_folders = s3_utils.get_all_folders(self.s3_bucket, prefix=prefix) print("Found {} folders in all for hydrophone".format(len(all_hydrophone_folders))) @@ -75,7 +96,9 @@ def __init__(self, stream_base, polling_interval, start_unix_time, end_unix_time self.current_folder_index = 0 self.current_clip_start_time = self.start_unix_time - def get_next_clip(self, current_clip_name = None): + self.change_folder = False + + def get_next_clip(self, current_clip_name=None): """ """ @@ -96,7 +119,7 @@ def get_next_clip(self, current_clip_name = None): if time_to_sleep > 0: time.sleep(time_to_sleep) - + # read in current m3u8 file # stream_url for the current AWS folder stream_url = "{}/hls/{}/live.m3u8".format( @@ -104,53 +127,62 @@ def get_next_clip(self, current_clip_name = None): stream_obj = m3u8.load(stream_url) num_total_segments = len(stream_obj.segments) target_duration = sum([item.duration for item in stream_obj.segments])/num_total_segments + + if self.change_folder: + self.polling_interval_in_seconds = math.ceil(self.tmp_polling*target_duration) + num_segments_in_wav_duration = math.ceil(self.polling_interval_in_seconds/target_duration) # calculate the start index by computing the current time - start of current folder segment_start_index = math.ceil(datetime_utils.get_difference_between_times_in_seconds(self.current_clip_start_time, current_folder)/target_duration) segment_end_index = segment_start_index + num_segments_in_wav_duration + if segment_end_index < num_total_segments: + self.change_folder = False + if segment_end_index > num_total_segments: - # move to the next folder and increment the current_clip_start_time to the new - self.current_folder_index += 1 - self.current_clip_start_time = self.valid_folders[self.current_folder_index] - return None, None, None + self.tmp_polling = segment_end_index - num_total_segments + segment_end_index = num_total_segments + self.change_folder = True # Can get the whole segment so update the clip_start_time for the next clip # We do this before we actually do the pulling in case there is a problem with this clip self.current_clip_start_time = datetime_utils.add_interval_to_unix_time(self.current_clip_start_time, self.polling_interval_in_seconds) # Create tmp path to hold .ts segments - tmp_path = "tmp_path" - os.makedirs(tmp_path,exist_ok=True) - - file_names = [] - for i in range(segment_start_index, segment_end_index): - audio_segment = stream_obj.segments[i] - base_path = audio_segment.base_uri - file_name = audio_segment.uri - audio_url = base_path + file_name - try: - scraper.download_from_url(audio_url,tmp_path) - file_names.append(file_name) - except Exception: - print("Skipping",audio_url,": error.") - - # concatentate all .ts files with ffmpeg - hls_file = (clipname+".ts") - audio_file = (clipname+".wav") - wav_file_path = os.path.join(self.wav_dir, audio_file) - filenames_str = " ".join(file_names) - concat_ts_cmd = "cd {tp} && cat {fstr} > {hls_file}".format(tp=tmp_path, fstr=filenames_str, hls_file=hls_file) - os.system(concat_ts_cmd) - - # read the concatenated .ts and write to wav - stream = ffmpeg.input(os.path.join(tmp_path, Path(hls_file))) - stream = ffmpeg.output(stream, wav_file_path) - ffmpeg.run(stream, overwrite_output=self.overwrite_output, quiet=False) - - # clear the tmp_path - os.system(f'rm -rf {tmp_path}') + with fragile(TemporaryDirectory()) as tmp_path: + os.makedirs(tmp_path, exist_ok=True) + + file_names = [] + for i in range(segment_start_index, segment_end_index): + audio_segment = stream_obj.segments[i] + base_path = audio_segment.base_uri + file_name = audio_segment.uri + audio_url = base_path + file_name + try: + scraper.download_from_url(audio_url, tmp_path) + file_names.append(file_name) + except Exception: + print("Skipping", audio_url, ": error.") + + # concatentate all .ts files + hls_file = os.path.join(tmp_path, Path(clipname+".ts")) + with open(hls_file, 'wb') as wfd: + for f in file_names: + with open(os.path.join(tmp_path, f), 'rb') as fd: + shutil.copyfileobj(fd, wfd) + + # read the concatenated .ts and write to wav + audio_file = (clipname+".wav") + wav_file_path = os.path.join(self.wav_dir, audio_file) + + if len(file_names) == 0: + # self.to_pop.append(self.current_folder_index) + raise fragile.Break + + stream = ffmpeg.input(os.path.join(tmp_path, Path(hls_file))) + stream = ffmpeg.output(stream, wav_file_path) + ffmpeg.run(stream, overwrite_output=self.overwrite_output, quiet=False) # If we're in demo mode, we need to fake timestamps to make it seem like the date range is real-time if current_clip_name: @@ -164,6 +196,14 @@ def get_next_clip(self, current_clip_name = None): # change clip_start_time - this has to be in UTC so that the email can be in PDT clip_start_time = current_clip_name.isoformat() + "Z" + if self.change_folder: + # move to the next folder and increment the current_clip_start_time to the new + self.current_folder_index += 1 + if self.current_folder_index > len(self.valid_folders) - 1: + print("No more valid folders") + else: + self.current_clip_start_time = self.valid_folders[self.current_folder_index] + # Get new index return wav_file_path, clip_start_time, current_clip_name diff --git a/src/orca_hls_utils/datetime_utils.py b/src/orca_hls_utils/datetime_utils.py index 78b171b..8ecf9e1 100644 --- a/src/orca_hls_utils/datetime_utils.py +++ b/src/orca_hls_utils/datetime_utils.py @@ -35,3 +35,13 @@ def get_unix_time_from_datetime_utc(dt_utc): unix_time = int(dt_pst.timestamp()) return unix_time + +def get_unix_time_from_datetime_pdt(dt_pst): + + dt_aware = timezone('US/Pacific').localize(dt_pst) + dt_utc = dt_aware.astimezone(timezone('UTC')) + + # convert to PST + unix_time = int(dt_utc.timestamp()) + + return unix_time