Skip to content

Commit

Permalink
Merge pull request #47 from karan2704/mseed
Browse files Browse the repository at this point in the history
Append ts segments to m3u8 file
  • Loading branch information
mcshicks authored Aug 6, 2022
2 parents 884ea73 + 2637897 commit 9c71d60
Show file tree
Hide file tree
Showing 16 changed files with 464 additions and 104 deletions.
50 changes: 28 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Orcasound's orcastream

This software contains audio tools and scripts for capturing, reformatting, transcoding and uploading audio for Orcasound. The directory structure reflects that we have developed a **base** set of tools and a couple of specific projects, orcanode and orcamseed (in the node and mseed directories). Orcasound hydrophone nodes stream by running the **node** code on Intel (amd64) or [Raspberry Pi](https://www.raspberrypi.org/) (arm32v7) platforms using a soundcard. While any soundcard should work, the most common one in use is the [Pisound](https://blokas.io/pisound/) board on either a Raspberry Pi 3B+ or 4. The other project (in the **mseed** directory) is for converting mseed format data to be streamed via Orcanode through the Orcasound human & machine detection pipeline. This is mainly used for streaming audio data from the [OOI](https://oceanobservatories.org/ "OOI") (NSF-funded Ocean Observatory Initiative) hydrophones off the coast of Oregon. See the README in each of those directories for more info.
This software contains audio tools and scripts for capturing, reformatting, transcoding and uploading audio for Orcasound. There is a base set of tools and a couple of specific projects, orcanode and orcamseed. Orcanode is streaming using Intel (amd64) or Raspberry Pi (arm32v7) platforms using a soundcard. While any soundcard should work, the most common one in use is the pisound board on either a Raspberry Pi 3B+ or 4. The other project orcamseed is for converting mseed format data to be streamed on Orcanode. This is mainly used for the [OOI](https://oceanobservatories.org/ "OOI") network. See the README in each of those directories for more info.

## Background & motivation

Expand All @@ -22,25 +22,7 @@ An ARM or X86 device with a sound card (or other audio input devices) connected

### Installing

Create a base docker image for your architecture by running the script in /base/rpi or /base/amd64 as appropriate. You will need to create a .env file as appropriate for your projects. Here is an example of an .env file (tested/working as of June, 2021)...

```
AWS_METADATA_SERVICE_TIMEOUT=5
AWS_METADATA_SERVICE_NUM_ATTEMPTS=0
REGION=us-west-2
BUCKET_TYPE=dev
NODE_TYPE=hls-only
NODE_NAME=rpi_YOURNODENAME_test
NODE_LOOPBACK=true
SAMPLE_RATE=48000
AUDIO_HW_ID=pisound
CHANNELS=1
FLAC_DURATION=30
SEGMENT_DURATION=10
LC_ALL=C.UTF-8
```

... except that the following fields are excised and will need to be added if you are integrating with the audio and logging systems of Orcasound:
Create a base docker image for your architecture by running the script in /base/rpi or /base/amd64 as appropriate. You will need to create a .env file as appropriate for your projects. Common to to all projects are the need for AWS keys

```
AWSACCESSKEYID=YourAWSaccessKey
Expand All @@ -56,15 +38,39 @@ Here are explanations of some of the .env fields:

* NODE_NAME should indicate your device and it's location, ideally in the form `device_location` (e.g. we call our Raspberry Pi staging device in Seattle `rpi_seattle`.
* NODE_TYPE determines what audio data formats will be generated and transferred to their respective AWS buckets.
* AUDIO_HW_ID is the card, device providing the audio data. Note: you can find your sound device by using the command "arecord -l". It's preferred to use the logical name i.e. pisound, USB, etc, instead of the "0,0" or "1,0" format which can change on reboots.
* AUDIO_HW_ID is the card, device providing the audio data. Note: you can find your sound device by using the command "arecord -l". For Raspberry Pi hardware with pisound just use AUDIO_HW_ID=pisound
* CHANNELS indicates the number of audio channels to expect (1 or 2).
* FLAC_DURATION is the amount of seconds you want in each archived lossless file.
* SEGMENT_DURATION is the amount of seconds you want in each streamed lossy segment.


## Supported combinations


| NODE ARCHITECTURE | node | mseed |
|-------------------|------|-------|
| arm32v7 | Y | N |
| amd64 | Y | Y |



| NODE ARCHITECTURE | hls-only | research | dev-virt |
|-------------------|----------|----------|----------|
| arm32v7 | Y | Y | N |
| amd64 | Y | N | Y |



| NODE Hardware | hls-only | research |
|-------------------|----------|----------|
| RPI4 | Y | Y |
| RPI3 B- | Y | N |



## Running local tests

At the root of the repository directory (where you also put your .env file) first copy the compose file you want to `docker-compose.yml`. For example, if you have a Raspberry Pi and you want to use the prebuilt image, then copy `docker-compose.rpi-pull.yml` to `docker-compose.yml`. Then run `docker-compose up -d`. Watch what happens using `htop`. If you want to verify files are being written to /tmp or /mnt directories, get the name of your streaming service using `docker-compose ps` (in this case `orcanode_streaming_1`) and then do `docker exec -it orcanode_streaming_1 /bin/bash` to get a bash shell within the running container.
In the repository directory (where you also put your .env file) first copy the compose file you want to docker-compose.yml. For example if you are raspberry pi and you want to use the prebuilt image then copy docker-compose.rpi-pull.yml to docker-compose. Then run `docker-compose up -d`. Watch what happens using `htop`. If you want to verify files are being written to /tmp or /mnt directories, get the name of your streaming service using `docker-compose ps` (in this case `orcanode_streaming_1`) and then do `docker exec -it orcanode_streaming_1 /bin/bash` to get a bash shell within the running container.

### Running an end-to-end test

Expand Down
6 changes: 5 additions & 1 deletion base/upload_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import os
import sys

# AWS access key and secret
ACCESS_ID = os.environ["AWSACCESSKEYID"]
ACCESS_KEY = os.environ["AWSSECRETACCESSKEY"]

NODE = os.environ["NODE_NAME"]
BASEPATH = os.path.join("/tmp", NODE)
PATH = os.path.join(BASEPATH, "hls")
Expand Down Expand Up @@ -63,7 +67,7 @@
def s3_copy_file(path, filename):
log.debug('uploading file '+filename+' from '+path+' to bucket '+BUCKET)
try:
resource = boto3.resource('s3', REGION) # Doesn't seem like we have to specify region
resource = boto3.resource('s3', aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
# transfer = S3Transfer(client)
uploadfile = os.path.join(path, filename)
log.debug('upload file: ' + uploadfile)
Expand Down
1 change: 1 addition & 0 deletions mseed/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ MAINTAINER Orcasound <[email protected]>

RUN pip3 install numpy
RUN pip3 install obspy
RUN pip3 install ooipy

############################### Copy files #####################################

Expand Down
10 changes: 10 additions & 0 deletions mseed/createpaths.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

date=$(date '+%Y-%m-%d')

mkdir -p /tmp/$NODE_NAME
mkdir -p /tmp/$NODE_NAME/hls
mkdir -p /tmp/$NODE_NAME/hls/$date

echo "starting upload"

python3 upload_s3.py
31 changes: 10 additions & 21 deletions mseed/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,37 +1,26 @@
version: "3"
services:
pull:
fetch:
image: orcastream/orcamseed
build: ./
# command: tail -F README.md
command: python3 mseedpull.py
command: python3 ooipypull.py
restart: always
env_file: .env
volumes:
- data:/root/data
stream:
- data:/tmp
depends_on:
- upload

upload:
image: orcastream/orcamseed
build: ./
# command: tail -F README.md
command: ./streamfiles.sh
command: tail -F README.md
restart: always
env_file: .env
volumes:
- data:/root/data
- data:/tmp
privileged: true
logspout:
image: gliderlabs/logspout
command: ${SYSLOG_URL}
restart: always
hostname: ${NODE_NAME}
env_file: .env
environment:
- SYSLOG_HOSTNAME=${NODE_NAME}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
ports:
- "8000:8000"



volumes:
data:
75 changes: 59 additions & 16 deletions mseed/mseedpull.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,25 @@ def handle_data(self, data):
if (datestr != datenowstr):
dates.append(datenowstr)
for datestr in dates:
r = None
url = BASE_URL + '{}'.format(datestr)
log.debug("fetching: "+url)
r = requests.get(url)
if r == 'Response [404]':
# Day folder does not exist yet or website down
try:
r = requests.get(url)
except (OSError):
print('OS error. Please check Internet connection.')
time.sleep(10)
except:
log.debug("Unexpected error to get url.")
time.sleep(10)
# If url is none, skip the following operations:
if r is None:
continue
elif r == 'Response [404]':
print("website not responding or file not posted")
parser = MyHTMLParser()
parser.feed(str(r.content))
else:
parser = MyHTMLParser()
parser.feed(str(r.content))
return filelist


Expand All @@ -111,6 +122,22 @@ def fetchAndConvert(files):
if (filedelay < maxdelay and filedelay > mindelay):
toconvert += 1
log.debug(f'files to convert: {toconvert}')

# If no file to convert, capture the next available file time:
if(toconvert == 0):
nextAvailable = None
for file in files:
filetime = file['datetime']
filedelay = now - filetime
if (filedelay < maxdelay):
nextAvailable = filetime
break
if(nextAvailable):
time_to_wait_for_next = filetime - (now - mindelay)
print('The next available filetime: ' + str(filetime) + '; Time delta: ' + str(time_to_wait_for_next))
else:
print('The next available file is cuurently unavailable.')

for file in files:
filetime = file['datetime']
url = file['url']
Expand Down Expand Up @@ -159,15 +186,15 @@ def queueFiles(files):
if (delay + duration < age): # in the past
log.debug('deleting old entry: ' + tsfilename)
if os.path.exists(tsfilename):
os.remove(tsfilename)
os.remove(tsfilename)
filesdone.remove(filepath)
del files[idx]
deleted += 1
if ((delay + duration >= age) and (age > delay)):
# should be playing next
# should be playing next
log.debug('playing : ' + tsfilename)
if os.path.exists(tsfilename):
shutil.move(tsfilename, '/root/data/dummy.ts')
shutil.move(tsfilename, '/root/data/dummy.ts')
played += 1
filesdone.remove(filepath)
del files[idx]
Expand All @@ -183,14 +210,30 @@ def main_loop():
# TODO this converts correctly but after queue files it
# get overwritten by fetchandconver
# you need to change it fetchandconvert appends the exisitng list
# and all timedate stamps are only converted once at most.
log.debug("checking")
files = getFileUrls()
log.debug(f'number of URLS: {len(files)}')
convertedfiles.extend(fetchAndConvert(files))
log.debug(f'number of converted files: {len(files)}')
played, deleted, convertedfiles = queueFiles(convertedfiles)
log.debug(f'played: {played}, deleted: {deleted}')
# and all timedate stamps are only converted once at most.
for i in range(0, 5):
print("Call getFileUrls(). Attempt #" + str(i+1) + " out of 5.")
files = getFileUrls()
if len(files) > 0:
log.debug(f'number of URLS: {len(files)}')
for i in range(0, 3):
print("Call fetchAndConvert(files). Attempt "+str(i+1) + " out of 3")
try:
convertedfiles.extend(fetchAndConvert(files))
except (OSError):
print('OS error. Please check Internet connection.')
time.sleep(10)
except:
logging.exception("Unexpected error to convert files.")
time.sleep(10)
log.debug(f'number of converted files: {len(files)}')
played, deleted, convertedfiles = queueFiles(convertedfiles)
log.debug(f'played: {played}, deleted: {deleted}')
break
else:
print("Unable to get file urls.")
time.sleep(10)
print("The next call will start in 2.5 minutes.")
time.sleep(150.0 - ((time.time() - starttime) % 150.0))


Expand Down
117 changes: 117 additions & 0 deletions mseed/ooipypull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import ooipy
import os
import datetime
import shutil
import logging
import logging.handlers
import sys

LOGLEVEL = logging.DEBUG
PREFIX = os.environ["TIME_PREFIX"]
DELAY = os.environ["DELAY_SEGMENT"]
NODE = os.environ["NODE_NAME"]
BASEPATH = os.path.join('/tmp', NODE)
PATH = os.path.join(BASEPATH, 'hls')

log = logging.getLogger(__name__)

log.setLevel(LOGLEVEL)

handler = logging.StreamHandler(sys.stdout)

formatter = logging.Formatter('%(module)s.%(funcName)s: %(message)s')
handler.setFormatter(formatter)

log.addHandler(handler)

def fetchData(start_time, segment_length, end_time, node):
os.makedirs(BASEPATH, exist_ok=True)
os.makedirs(PATH, exist_ok=True)
while start_time < end_time:
segment_end = min(start_time + segment_length, end_time)

#paths and filenames
datestr = start_time.strftime("%Y-%m-%dT%H-%M-%S-%f")[:-3]
sub_directory = start_time.strftime("%Y-%m-%d")
file_path = os.path.join(PATH, sub_directory)
wav_name = "{date}.wav".format(date=datestr)
ts_name = "{prefix}{date}.ts".format(prefix=PREFIX, date=datestr)

#create directory and edit latest.txt
if not os.path.exists(file_path):
os.mkdir(file_path)
manifest_file = os.path.join('/root', 'live.m3u8')
if os.path.exists(manifest_file):
os.remove(manifest_file)
if not os.path.exists(os.path.join(BASEPATH, 'latest.txt')):
with open('latest.txt', 'x') as f:
f.write(sub_directory)
shutil.move('latest.txt', BASEPATH )
else:
with open(f'/{BASEPATH}/latest.txt', 'w') as f:
f.write(sub_directory)


#fetch if file doesn't already exist
if(os.path.exists(os.path.join(file_path, ts_name))):
print("EXISTS")
continue
hydrophone_data = ooipy.request.hydrophone_request.get_acoustic_data(
start_time, segment_end, node, verbose=True, data_gap_mode=2
)
if hydrophone_data is None:
print(f"Could not get data from {start_time} to {segment_end}")
start_time = segment_end
continue
print(f"data: {hydrophone_data}")
hydrophone_data.wav_write(wav_name)

writeManifest(wav_name, ts_name)

#move files to tmp for upload
shutil.move(os.path.join('/root', ts_name), os.path.join(file_path, ts_name))
shutil.copy('/root/live.m3u8', os.path.join(file_path, 'live.m3u8'))
os.remove(wav_name)
start_time = segment_end


def writeManifest(wav_name, ts_name):
root_path = os.path.join('/root', 'live.m3u8')
if not os.path.exists(root_path):
os.system("ffmpeg -i {wavfile} -f segment -segment_list 'live.m3u8' -strftime 1 -segment_time 300 -segment_format mpegts -ac 1 -acodec aac {tsfile}".format(wavfile=wav_name, tsfile=ts_name))
else:
os.system('ffmpeg -i {wavfile} -f mpegts -ar 64000 -acodec aac -ac 1 {tsfile}'.format(wavfile=wav_name, tsfile=ts_name))
#remove EXT-X-ENDLIST and write new segment
with open("live.m3u8", "r+") as f:
lines = f.readlines()
f.seek(0)
f.truncate()
f.writelines(lines[:-1])
f.write("#EXTINF:300.000000, \n")
f.write(f'{ts_name} \n')
f.write(f'#EXT-X-ENDLIST \n')

#os.system("ffmpeg -i {wavfile} -hls_playlist_type event -strftime 1 -hls_segment_type mpegts -ac 1 -acodec aac -hls_segment_filename {tsfile} -hls_time 1800 -hls_flags omit_endlist+append_list live.m3u8".format(wavfile=wav_name, tsfile=ts_name))



def _main():

segment_length = datetime.timedelta(minutes = 5)
fixed_delay = datetime.timedelta(hours=8)

while True:
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(hours=8)

#near live fetch
fetchData(start_time, segment_length, end_time, 'PC01A')

#delayed fetch
fetchData(end_time-datetime.timedelta(hours=24), segment_length, end_time, 'PC01A')

start_time, end_time = end_time, datetime.datetime.utcnow()



_main()
Loading

0 comments on commit 9c71d60

Please sign in to comment.