Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements to node specific page #221

Merged
merged 6 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 89 additions & 31 deletions OrcanodeMonitor/Core/Fetcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
using System.Threading.Channels;
using Microsoft.AspNetCore.Http.HttpResults;
using Microsoft.Extensions.Logging;
using Microsoft.AspNetCore.Diagnostics;

namespace OrcanodeMonitor.Core
{
Expand Down Expand Up @@ -735,42 +736,77 @@ public static long DateTimeToUnixTimeStamp(DateTime dateTime)
return unixTime;
}

/// <summary>
/// Update the timestamps for a given Orcanode by querying files on S3.
/// </summary>
/// <param name="context">Database context</param>
/// <param name="node">Orcanode to update</param>
/// <param name="logger"></param>
/// <returns></returns>
public async static Task UpdateS3DataAsync(OrcanodeMonitorContext context, Orcanode node, ILogger logger)
public class TimestampResult
{
public string UnixTimestampString { get; }
public DateTimeOffset? Offset { get; }
public TimestampResult(string unixTimestampString, DateTimeOffset? offset)
{
UnixTimestampString = unixTimestampString;
Offset = offset;
}
}

public async static Task<TimestampResult?> GetLatestS3TimestampAsync(Orcanode node, bool updateNode, ILogger logger)
{
string url = "https://" + node.S3Bucket + ".s3.amazonaws.com/" + node.S3NodeName + "/latest.txt";
using HttpResponseMessage response = await _httpClient.GetAsync(url);
if (response.StatusCode == HttpStatusCode.NotFound)
{
logger.LogError($"{node.S3NodeName} not found on S3");

// Absent.
node.LatestRecordedUtc = null;
return;
if (updateNode)
{
node.LatestRecordedUtc = null;
}
return null;
}
if (response.StatusCode == HttpStatusCode.Forbidden)
{
logger.LogError($"{node.S3NodeName} got access denied on S3");

// Access denied.
node.LatestRecordedUtc = DateTime.MinValue;
return;
if (updateNode)
{
node.LatestRecordedUtc = DateTime.MinValue;
}
return null;
}
if (!response.IsSuccessStatusCode)
{
return;
logger.LogError($"{node.S3NodeName} got status {response.StatusCode} on S3");

return null;
}

string content = await response.Content.ReadAsStringAsync();
string unixTimestampString = content.TrimEnd();
var result = new TimestampResult(unixTimestampString, response.Content.Headers.LastModified);
return result;
}

/// <summary>
/// Update the timestamps for a given Orcanode by querying files on S3.
/// </summary>
/// <param name="context">Database context</param>
/// <param name="node">Orcanode to update</param>
/// <param name="logger"></param>
/// <returns></returns>
public async static Task UpdateS3DataAsync(OrcanodeMonitorContext context, Orcanode node, ILogger logger)
{
TimestampResult? result = await GetLatestS3TimestampAsync(node, true, logger);
if (result == null)
{
return;
}
string unixTimestampString = result.UnixTimestampString;
DateTime? latestRecorded = UnixTimeStampStringToDateTimeUtc(unixTimestampString);
if (latestRecorded.HasValue)
{
node.LatestRecordedUtc = latestRecorded?.ToUniversalTime();

DateTimeOffset? offset = response.Content.Headers.LastModified;
DateTimeOffset? offset = result.Offset;
if (offset.HasValue)
{
node.LatestUploadedUtc = offset.Value.UtcDateTime;
Expand Down Expand Up @@ -864,34 +900,32 @@ private static void AddHydrophoneStreamStatusEvent(OrcanodeMonitorContext contex
AddOrcanodeEvent(context, node, OrcanodeEventTypes.HydrophoneStream, value);
}

/// <summary>
/// Update the ManifestUpdated timestamp for a given Orcanode by querying S3.
/// </summary>
/// <param name="context">Database context</param>
/// <param name="node">Orcanode to update</param>
/// <param name="unixTimestampString">Value in the latest.txt file</param>
/// <param name="logger">Logger</param>
/// <returns></returns>
public async static Task UpdateManifestTimestampAsync(OrcanodeMonitorContext context, Orcanode node, string unixTimestampString, ILogger logger)
public async static Task<FrequencyInfo?> GetLatestAudioSampleAsync(Orcanode node, string unixTimestampString, bool updateNode, ILogger logger)
{
OrcanodeOnlineStatus oldStatus = node.S3StreamStatus;

string url = "https://" + node.S3Bucket + ".s3.amazonaws.com/" + node.S3NodeName + "/hls/" + unixTimestampString + "/live.m3u8";
using HttpResponseMessage response = await _httpClient.GetAsync(url);
if (!response.IsSuccessStatusCode)
{
return;
return null;
}

DateTimeOffset? offset = response.Content.Headers.LastModified;
if (!offset.HasValue)
{
node.LastCheckedUtc = DateTime.UtcNow;
return;
if (updateNode)
{
node.LastCheckedUtc = DateTime.UtcNow;
}
return null;
}

node.ManifestUpdatedUtc = offset.Value.UtcDateTime;
node.LastCheckedUtc = DateTime.UtcNow;
if (updateNode)
{
node.ManifestUpdatedUtc = offset.Value.UtcDateTime;
node.LastCheckedUtc = DateTime.UtcNow;
}

// Download manifest.
Uri baseUri = new Uri(url);
Expand All @@ -908,14 +942,38 @@ public async static Task UpdateManifestTimestampAsync(OrcanodeMonitorContext con
try
{
using Stream stream = await _httpClient.GetStreamAsync(newUri);
node.AudioStreamStatus = await FfmpegCoreAnalyzer.AnalyzeAudioStreamAsync(stream, oldStatus);
node.AudioStandardDeviation = 0.0;
} catch (Exception ex)
FrequencyInfo frequencyInfo = await FfmpegCoreAnalyzer.AnalyzeAudioStreamAsync(stream, oldStatus);
return frequencyInfo;
}
catch (Exception ex)
{
// We couldn't fetch the stream audio so could not update the
// audio standard deviation. Just ignore this for now.
logger.LogError(ex, "Exception in UpdateManifestTimestampAsync");
}
return null;
}

/// <summary>
/// Update the ManifestUpdated timestamp for a given Orcanode by querying S3.
/// </summary>
/// <param name="context">Database context</param>
/// <param name="node">Orcanode to update</param>
/// <param name="unixTimestampString">Value in the latest.txt file</param>
/// <param name="logger">Logger</param>
/// <returns></returns>
public async static Task UpdateManifestTimestampAsync(OrcanodeMonitorContext context, Orcanode node, string unixTimestampString, ILogger logger)
{
OrcanodeOnlineStatus oldStatus = node.S3StreamStatus;

FrequencyInfo? frequencyInfo = await GetLatestAudioSampleAsync(node, unixTimestampString, true, logger);
if (frequencyInfo == null)
{
return;
}

node.AudioStreamStatus = frequencyInfo.Status;
dthaler marked this conversation as resolved.
Show resolved Hide resolved
node.AudioStandardDeviation = 0.0;

OrcanodeOnlineStatus newStatus = node.S3StreamStatus;
if (newStatus != oldStatus)
Expand Down
51 changes: 35 additions & 16 deletions OrcanodeMonitor/Core/FfmpegCoreAnalyzer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

namespace OrcanodeMonitor.Core
{
public class FrequencyInfo
{
public Dictionary<double, double> FrequencyAmplitudes { get; set; }
public OrcanodeOnlineStatus Status { get; set; }
}

public class FfmpegCoreAnalyzer
{
// We consider anything above this average amplitude as not silence.
Expand Down Expand Up @@ -57,36 +63,47 @@ private static double MinSignalRatio

private static bool IsHumFrequency(double frequency) => (frequency >= MinHumFrequency && frequency <= MaxHumFrequency);

private static OrcanodeOnlineStatus AnalyzeFrequencies(float[] data, int sampleRate, OrcanodeOnlineStatus oldStatus)
public static Dictionary<double, double> ComputeFrequencyAmplitudes(float[] data, int sampleRate)
{
var result = new Dictionary<double, double>();
int n = data.Length;
Complex[] complexData = data.Select(d => new Complex(d, 0)).ToArray();
Fourier.Forward(complexData, FourierOptions.Matlab);
double[] amplitudes = new double[n / 2];
for (int i = 0; i < n / 2; i++)
{
amplitudes[i] = complexData[i].Magnitude;
double amplitude = complexData[i].Magnitude;
double frequency = (((double)i) * sampleRate) / n;
result[frequency] = amplitude;
}
return result;
}

double max = amplitudes.Max();
private static FrequencyInfo AnalyzeFrequencies(float[] data, int sampleRate, OrcanodeOnlineStatus oldStatus)
{
int n = data.Length;
FrequencyInfo frequencyInfo = new FrequencyInfo();
frequencyInfo.FrequencyAmplitudes = ComputeFrequencyAmplitudes(data, sampleRate);

double max = frequencyInfo.FrequencyAmplitudes.Values.Max();
if (max < MinNoiseAmplitude)
{
// File contains mostly silence across all frequencies.
return OrcanodeOnlineStatus.Unintelligible;
frequencyInfo.Status = OrcanodeOnlineStatus.Unintelligible;
return frequencyInfo;
}

if ((max <= MaxSilenceAmplitude) && (oldStatus == OrcanodeOnlineStatus.Unintelligible))
{
// In between the min and max unintelligibility range, so keep previous status.
return OrcanodeOnlineStatus.Unintelligible;
frequencyInfo.Status = OrcanodeOnlineStatus.Unintelligible;
return frequencyInfo;
}

// Find the maximum amplitude outside the audio hum range.
double maxNonHumAmplitude = 0;
for (int i = 0; i < amplitudes.Length; i++)
{
double frequency = (((double)i) * sampleRate) / n;
double amplitude = amplitudes[i];
foreach (var pair in frequencyInfo.FrequencyAmplitudes) {
double frequency = pair.Key;
double amplitude = pair.Value;
if (!IsHumFrequency(frequency))
{
if (maxNonHumAmplitude < amplitude)
Expand All @@ -99,11 +116,13 @@ private static OrcanodeOnlineStatus AnalyzeFrequencies(float[] data, int sampleR
if (maxNonHumAmplitude / max < MinSignalRatio)
{
// Essentially just silence outside the hum range, no signal.
return OrcanodeOnlineStatus.Unintelligible;
frequencyInfo.Status = OrcanodeOnlineStatus.Unintelligible;
return frequencyInfo;
}

// Signal outside the hum range.
return OrcanodeOnlineStatus.Online;
frequencyInfo.Status = OrcanodeOnlineStatus.Online;
return frequencyInfo;
}

/// <summary>
Expand All @@ -112,7 +131,7 @@ private static OrcanodeOnlineStatus AnalyzeFrequencies(float[] data, int sampleR
/// <param name="args">FFMpeg arguments</param>
/// <param name="oldStatus">Previous online status</param>
/// <returns>Status of the most recent audio samples</returns>
private static async Task<OrcanodeOnlineStatus> AnalyzeAsync(FFMpegArguments args, OrcanodeOnlineStatus oldStatus)
private static async Task<FrequencyInfo> AnalyzeAsync(FFMpegArguments args, OrcanodeOnlineStatus oldStatus)
{
var outputStream = new MemoryStream(); // Create an output stream (e.g., MemoryStream)
var pipeSink = new StreamPipeSink(outputStream);
Expand Down Expand Up @@ -142,18 +161,18 @@ private static async Task<OrcanodeOnlineStatus> AnalyzeAsync(FFMpegArguments arg
floatBuffer[i] = BitConverter.ToInt16(byteBuffer, i * sizeof(short)) / 32768f;
}

// Perform FFT and analyze frequencies
// Perform FFT and analyze frequencies.
var status = AnalyzeFrequencies(floatBuffer, waveFormat.SampleRate, oldStatus);
return status;
}

public static async Task<OrcanodeOnlineStatus> AnalyzeFileAsync(string filename, OrcanodeOnlineStatus oldStatus)
public static async Task<FrequencyInfo> AnalyzeFileAsync(string filename, OrcanodeOnlineStatus oldStatus)
{
var args = FFMpegArguments.FromFileInput(filename);
return await AnalyzeAsync(args, oldStatus);
}

public static async Task<OrcanodeOnlineStatus> AnalyzeAudioStreamAsync(Stream stream, OrcanodeOnlineStatus oldStatus)
public static async Task<FrequencyInfo> AnalyzeAudioStreamAsync(Stream stream, OrcanodeOnlineStatus oldStatus)
{
StreamPipeSource streamPipeSource = new StreamPipeSource(stream);
var args = FFMpegArguments.FromPipeInput(streamPipeSource);
Expand Down
Loading
Loading