Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/rolling spectrums #191

Merged
merged 24 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion endaq/calc/fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import typing
from typing import Optional
import warnings

import numpy as np
import pandas as pd
Expand All @@ -10,7 +11,7 @@
from endaq.calc import psd, utils


__all__ = ["aggregate_fft", "fft", "rfft", "dct", "dst"]
__all__ = ["aggregate_fft", "rolling_fft", "fft", "rfft", "dct", "dst"]


def aggregate_fft(df, **kwargs):
Expand All @@ -35,6 +36,85 @@ def aggregate_fft(df, **kwargs):
return psd.welch(df, **kwargs)


def rolling_fft(
df: pd.DataFrame,
bin_width: float = 1.0,
num_slices: int = 100,
indexes: np.array = None,
index_values: np.array = None,
slice_width: float = None,
add_resultant: bool = True,
disable_warnings: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Compute FFTs for defined slices of a time series data set using :py:func:`~endaq.calc.fft.aggregate_fft()`

:param df: the input dataframe with an index defining the time in seconds or datetime
:param bin_width: the bin width or resolution in Hz for the FFT
:param num_slices: the number of slices to split the time series into, default is 100,
this is ignored if `indexes` is defined
:param indexes: the center index locations (not value) of each slice to compute the FFT
:param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`)
:param slice_width: the time in seconds to center about each slice index,
if none is provided it will calculate one based upon the number of slices
:param add_resultant: if `True` the root sum of squares of each FFT column will
also be computed
:param disable_warnings: if `True` (default) it disables the warnings on the PSD length
:param kwargs: Other parameters to pass directly to :py:func:`~endaq.calc.fft.aggregate_fft()`
:return: a dataframe containing all the FFTs, stacked on each other

See example use cases and syntax at :py:func:`~endaq.plot.spectrum_over_time()`
which visualizes the output of this function in Heatmaps, Waterfall plots,
Surface plots, and Animations

"""
use_spectrogram = True
if (indexes is not None) | (index_values is not None):
use_spectrogram = False

indexes, slice_width, num, length = utils._rolling_slice_definitions(
df,
indexes=indexes,
index_values=index_values,
num_slices=num_slices,
slice_width=slice_width
)

if use_spectrogram:
fft_df = psd.spectrogram(
df=df,
num_slices=num_slices,
bin_width=bin_width,
scaling='unit',
add_resultant=add_resultant,
disable_warnings=disable_warnings,
**kwargs
)
else:
# Loop through and compute fft
fft_df = pd.DataFrame()
for i in indexes:
window_start = max(0, i - num)
window_end = min(length, i + num)
with warnings.catch_warnings():
if disable_warnings:
warnings.filterwarnings('ignore', '.*greater than.*')
slice_fft = aggregate_fft(
df.iloc[window_start:window_end],
bin_width=bin_width,
**kwargs
)
if add_resultant:
slice_fft['Resultant'] = slice_fft.pow(2).sum(axis=1).pow(0.5)

slice_fft = slice_fft.reset_index().melt(id_vars=slice_fft.index.name)
slice_fft['timestamp'] = df.index[i]
fft_df = pd.concat([fft_df, slice_fft])

return fft_df


def fft(
df: pd.DataFrame,
output: typing.Literal[None, "magnitude", "angle", "complex"] = None,
Expand Down
263 changes: 263 additions & 0 deletions endaq/calc/psd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import typing
import warnings
import datetime

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -335,3 +336,265 @@ def vc_curves(
# The PSD must already scale by ∆f -> need only scale by √∆f?
# TODO make `density` parameter, scale differently depending on mode
return np.sqrt(accel_psd.index[1]) * df_vel_oct.apply(np.sqrt)


def rolling_psd(
df: pd.DataFrame,
bin_width: float = 1.0,
octave_bins: float = None,
fstart: float = 1.0,
scaling: typing.Literal[None, "density", "spectrum", "parseval", "unit", "rms"] = None,
agg: typing.Union[
typing.Literal["mean", "sum"],
typing.Callable[[np.ndarray, int], float],
] = "mean",
freq_splits: np.array = None,
num_slices: int = 100,
indexes: np.array = None,
index_values: np.array = None,
slice_width: float = None,
add_resultant: bool = True,
disable_warnings: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Compute PSDs for defined slices of a time series data set using :py:func:`~endaq.calc.psd.welch()`

:param df: the input dataframe with an index defining the time in seconds or datetime
:param bin_width: the bin width or resolution in Hz for the PSD, defaults to 1,
this is ignored if `octave_bins` is defined
:param octave_bins: the number of frequency bins in each octave, defaults to `None`
:param fstart: the lowest frequency for an octave PSD, defaults to 1
:param scaling: the scaling of the output; `"density"` & `"spectrum"`
correspond to the same options in ``scipy.signal.welch``; `"parseval"`
will maintain the "energy" between the input & output, s.t.
``welch(df, scaling="parseval").sum(axis="rows")`` is roughly equal to
``df.abs().pow(2).sum(axis="rows")``;
`"unit"` will maintain the units and scale of the input data. `"rms"` will use `"parseval"` for the PSD
calculations and set `agg` to "sum", but then take the square root at the end
:param agg: the method for aggregating values into bins (only used if converting to octave or jagged); `'mean'` preserves
the PSD's area-under-the-curve, `'sum'` preserves the PSD's "energy"
:param freq_splits: the boundaries of the frequency bins to pass to :py:func:`~endaq.calc.psd.to_jagged()`
:param num_slices: the number of slices to split the time series into, default is 100,
this is ignored if `indexes` is defined
:param indexes: the center index locations (not value) of each slice to compute the PSD
:param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`)
:param slice_width: the time in seconds to center about each slice index,
if none is provided it will calculate one based upon the number of slices
:param add_resultant: if `True` (default) the root sum of squares of each PSD column will
also be computed
:param disable_warnings: if `True` (default) it disables the warnings on the PSD length
:param kwargs: Other parameters to pass directly to :py:func:`psd.welch`
:return: a dataframe containing all the PSDs, stacked on each other

See example use cases and syntax at :py:func:`~endaq.plot.spectrum_over_time()`
which visualizes the output of this function in Heatmaps, Waterfall plots,
Surface plots, and Animations

"""
use_spectrogram = True
if (indexes is not None) | (index_values is not None):
use_spectrogram = False

indexes, slice_width, num, length = utils._rolling_slice_definitions(
df,
indexes=indexes,
index_values=index_values,
num_slices=num_slices,
slice_width=slice_width
)

if use_spectrogram:
if "window" not in kwargs:
kwargs["window"] = "hann"
psd_df = spectrogram(
df=df,
bin_width=bin_width,
num_slices=num_slices,
scaling=scaling,
octave_bins=octave_bins,
fstart=fstart,
freq_splits=freq_splits,
add_resultant=add_resultant,
disable_warnings=disable_warnings,
**kwargs
)
else:
# Allow for RMS calculations
exp = 1
if scaling == "rms":
scaling = "parseval"
exp = 0.5
agg = "sum"

# Define bin_width if octave spacing
if octave_bins is not None:
bin_width = 1 / slice_width

# Loop through and compute PSD
psd_df = pd.DataFrame()
for i in indexes:
window_start = max(0, i - num)
window_end = min(length, i + num)
with warnings.catch_warnings():
if disable_warnings:
warnings.filterwarnings('ignore', '.*greater than.*', )
slice_psd = welch(
df.iloc[window_start:window_end],
bin_width=bin_width,
scaling=scaling,
**kwargs
)

if octave_bins is not None or freq_splits is not None:
with warnings.catch_warnings():
if disable_warnings:
warnings.filterwarnings('ignore', '.*empty frequency.*')
if freq_splits is None:
slice_psd = to_octave(slice_psd, octave_bins=octave_bins, fstart=fstart, agg=agg)
else:
slice_psd = to_jagged(slice_psd, freq_splits=freq_splits, agg=agg)

if add_resultant:
slice_psd['Resultant'] = slice_psd.sum(axis=1)

slice_psd = (slice_psd ** exp).reset_index().melt(id_vars=slice_psd.index.name)
slice_psd['timestamp'] = df.index[i]
psd_df = pd.concat([psd_df, slice_psd])

return psd_df


def spectrogram(
df: pd.DataFrame,
num_slices: int = 100,
scaling: typing.Literal[None, "density", "spectrum", "parseval", "unit", "rms"] = None,
bin_width: float = 1.0,
octave_bins: float = None,
fstart: float = 1.0,
agg: typing.Union[
typing.Literal["mean", "sum"],
typing.Callable[[np.ndarray, int], float],
] = "mean",
freq_splits: np.array = None,
add_resultant: bool = True,
disable_warnings: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Compute a spectrogram for a time series data set using :py:func:`scipy.signal.spectrogram()`

:param df: the input dataframe with an index defining the time in seconds or datetime
:param num_slices: the number of slices to split the time series into, default is 100
:param scaling: the scaling of the output; `"density"` & `"spectrum"`
correspond to the same options in ``scipy.signal.welch``; `"parseval"`
will maintain the "energy" between the input & output, s.t.
``welch(df, scaling="parseval").sum(axis="rows")`` is roughly equal to
``df.abs().pow(2).sum(axis="rows")``;
`"unit"` will maintain the units and scale of the input data. `"rms"` will use `"parseval"` for the PSD
calculations and set `agg` to "sum", but then take the square root at the end
:param bin_width: the bin width or resolution in Hz for the PSD or FFT, defaults to 1
:param octave_bins: the number of frequency bins in each octave, defaults to `None`
:param fstart: the lowest frequency for an octave spaced output, defaults to 1
:param agg: the method for aggregating values into bins (only used if converting to octave or jagged); `'mean'` preserves
the PSD's area-under-the-curve, `'sum'` preserves the PSD's "energy"
:param freq_splits: the boundaries of the frequency bins to pass to :py:func:`~endaq.calc.psd.to_jagged()`
:param add_resultant: if `True` (default) the root sum of squares of each PSD column will
also be computed
:param disable_warnings: if `True` (default) it disables the warnings in helper functions
:param kwargs: Other parameters to pass directly to :py:func:`scipy.signal.spectrogram()`
:return: a dataframe containing all the spectrograms "melted" with columns defining the value, frequency, timeslice,
and original column from the input dataframe

.. seealso::

- `SciPy Spectrogram method <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.spectrogram.html>`_
Documentation for the function wrapped internally.
"""

# Apply Scaling
if scaling == "parseval":
kwargs["scaling"] = "density"
elif scaling == "unit":
kwargs["scaling"] = "density"
elif scaling == "rms":
kwargs["scaling"] = "density"
agg = "sum"
elif scaling is not None:
kwargs["scaling"] = scaling

# Windowing, Default to Boxcar
if "window" not in kwargs:
kwargs["window"] = "boxcar"

# Determine Scipy.Spectrogram Stats
fs = 1 / utils.sample_spacing(df)
nperseg = int(fs / bin_width)
num_f = int(len(df) / fs / bin_width)
noverlap = nperseg - (nperseg * num_f // num_slices)

# Loop Through and Compute Spectrogram
spec_df = pd.DataFrame()
if add_resultant:
res_df = pd.DataFrame()
for c in df.columns:
f, t, spec = scipy.signal.spectrogram(
x=df[c].to_numpy(),
fs=fs,
nperseg=nperseg,
mode='psd',
noverlap=noverlap,
**kwargs
)

# create dataframe
spec_df_col = pd.DataFrame(
data=spec,
columns=t,
index=f
)
spec_df_col.index.name = 'frequency (Hz)'

# Do octave spacing
if octave_bins is not None or freq_splits is not None:
with warnings.catch_warnings():
if disable_warnings:
warnings.filterwarnings('ignore', '.*empty frequency.*')
if freq_splits is None:
spec_df_col = to_octave(spec_df_col, octave_bins=octave_bins, fstart=fstart, agg=agg)
else:
spec_df_col = to_jagged(spec_df_col, freq_splits=freq_splits, agg=agg)

# Melt & concatenate dataframe
spec_df_col = spec_df_col.reset_index().melt(id_vars='frequency (Hz)')
spec_df_col.columns = ['frequency (Hz)', 'timestamp', 'value']
spec_df_col['variable'] = c
spec_df = pd.concat([spec_df, spec_df_col])

# add resultant
if add_resultant:
if len(res_df) == 0:
res_df = spec_df_col
res_df['variable'] = "Resultant"
else:
res_df.value += spec_df_col.value

# Add resultant
if add_resultant:
spec_df = pd.concat([spec_df, res_df])

# Update timestamps to be based on input, deal with datetime
if isinstance(df.index[0], datetime.datetime):
spec_df.timestamp = pd.to_timedelta(spec_df.timestamp.astype(float), unit='s') + df.index[0]
else:
spec_df.timestamp = spec_df.timestamp.astype(float) + df.index[0]

# Apply scales
if scaling == "parseval":
spec_df.value = spec_df.value * f[1]
if scaling == "rms":
spec_df.value = spec_df.value ** 0.5
elif scaling == "unit":
spec_df.value = (spec_df.value * 2 * f[1]) ** 0.5

return spec_df
Loading