Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/rolling spectrums #191

Merged
merged 24 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions endaq/calc/fft.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import typing
from typing import Optional
import warnings

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -35,6 +36,79 @@ def aggregate_fft(df, **kwargs):
return psd.welch(df, **kwargs)


def rolling_fft(
df: pd.DataFrame,
bin_width: float = 1.0,
num_slices: int = 100,
indexes: np.array = None,
index_values: np.array = None,
slice_width: float = None,
add_resultant: bool = True,
disable_warnings: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Compute FFTs for defined slices of a time series data set using :py:func:`~endaq.calc.fft.aggregate_fft()`
:param df: the input dataframe with an index defining the time in seconds or datetime
:param bin_width: the bin width or resolution in Hz for the FFT
:param num_slices: the number of slices to split the time series into, default is 100,
this is ignored if `indexes` is defined
:param indexes: the center index locations (not value) of each slice to compute the FFT
:param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`)
:param slice_width: the time in seconds to center about each slice index,
if none is provided it will calculate one based upon the number of slices
:param add_resultant: if `True` the root sum of squares of each FFT column will
also be computed
:param disable_warnings: if `True` (default) it disables the warnings on the PSD length
:param kwargs: Other parameters to pass directly to :py:func:`~endaq.calc.fft.aggregate_fft()`
:return: a dataframe containing all the FFTs, stacked on each other

See example use cases and syntax at :py:func:`~endaq.plot.plots.spectrum_over_time()`
which visualizes the output of this function in Heatmaps, Waterfall plots,
Surface plots, and Animations

"""
if disable_warnings:
warnings.filterwarnings('ignore', '.*greater than.*', )
S-Hanly marked this conversation as resolved.
Show resolved Hide resolved

length = len(df)

#Define center index locations of each slice if not provided
if indexes is None:
if index_values is not None:
indexes = np.zeros(len(index_values),int)
for i in range(len(indexes)):
indexes[i] = int((np.abs(df.index - index_values[i])).argmin())
else:
indexes = np.linspace(0, length, num_slices, endpoint = False, dtype=int)
indexes = indexes + int(indexes[1]/2)

#Calculate slice step size
spacing = utils.sample_spacing(df)
if slice_width is None:
slice_width = spacing * length / len(indexes)
num = int(slice_width / spacing / 2)

#Loop through and compute fft
fft = pd.DataFrame()
for i in indexes:
window_start = max(0, i - num)
window_end = min(length, i + num)
slice_fft = aggregate_fft(
df.iloc[window_start:window_end],
bin_width = bin_width,
**kwargs
)
S-Hanly marked this conversation as resolved.
Show resolved Hide resolved
if add_resultant:
slice_fft['Resultant'] = slice_fft.pow(2).sum(axis=1).pow(0.5)

slice_fft = slice_fft.reset_index().melt(id_vars=slice_fft.index.name)
slice_fft['timestamp'] = df.index[i]
fft = pd.concat([fft,slice_fft])

return fft


def fft(
df: pd.DataFrame,
output: typing.Literal[None, "magnitude", "angle", "complex"] = None,
Expand Down
100 changes: 100 additions & 0 deletions endaq/calc/psd.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,103 @@ def vc_curves(
# The PSD must already scale by ∆f -> need only scale by √∆f?
# TODO make `density` parameter, scale differently depending on mode
return np.sqrt(accel_psd.index[1]) * df_vel_oct.apply(np.sqrt)


def rolling_psd(
df: pd.DataFrame,
bin_width: float = 1.0,
octave_bins: float = None,
fstart: float = 1.0,
scaling: typing.Literal[None, "density", "spectrum", "parseval", "unit"] = None,
agg: typing.Union[
typing.Literal["mean", "sum"],
typing.Callable[[np.ndarray, int], float],
] = "mean",
num_slices: int = 100,
indexes: np.array = None,
index_values: np.array = None,
slice_width: float = None,
add_resultant: bool = True,
disable_warnings: bool = True,
**kwargs,
) -> pd.DataFrame:
"""
Compute PSDs for defined slices of a time series data set using :py:func:`~endaq.calc.psd.welch()`
:param df: the input dataframe with an index defining the time in seconds or datetime
:param bin_width: the bin width or resolution in Hz for the PSD, defaults to 1,
this is ignored if `octave_bins` is defined
:param octave_bins: the number of frequency bins in each octave, defaults to `None`
:param fstart: the lowest frequency for an octave PSD, defaults to 1
:param scaling: the scaling of the output; `"density"` & `"spectrum"`
correspond to the same options in ``scipy.signal.welch``; `"parseval"`
will maintain the "energy" between the input & output, s.t.
``welch(df, scaling="parseval").sum(axis="rows")`` is roughly equal to
``df.abs().pow(2).sum(axis="rows")``;
`"unit"` will maintain the units and scale of the input data.
:param agg: the method for aggregating values into bins (only used if converting to octave); `'mean'` preserves
the PSD's area-under-the-curve, `'sum'` preserves the PSD's "energy"
:param num_slices: the number of slices to split the time series into, default is 100,
this is ignored if `indexes` is defined
:param indexes: the center index locations (not value) of each slice to compute the PSD
:param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`)
:param slice_width: the time in seconds to center about each slice index,
if none is provided it will calculate one based upon the number of slices
:param add_resultant: if `True` (default) the root sum of squares of each PSD column will
also be computed
:param disable_warnings: if `True` (default) it disables the warnings on the PSD length
:param kwargs: Other parameters to pass directly to :py:func:`psd.welch`
:return: a dataframe containing all the PSDs, stacked on each other

See example use cases and syntax at :py:func:`~endaq.plot.plots.spectrum_over_time()`
which visualizes the output of this function in Heatmaps, Waterfall plots,
Surface plots, and Animations

"""
if disable_warnings:
warnings.filterwarnings('ignore', '.*greater than.*', )
warnings.filterwarnings('ignore', '.*empty frequency*', )

length = len(df)

#Define center index locations of each slice if not provided
if indexes is None:
if index_values is not None:
indexes = np.zeros(len(index_values),int)
for i in range(len(indexes)):
indexes[i] = int((np.abs(df.index - index_values[i])).argmin())
else:
indexes = np.linspace(0, length, num_slices, endpoint = False, dtype=int)
indexes = indexes + int(indexes[1]/2)

#Calculate slice step size
spacing = utils.sample_spacing(df)
if slice_width is None:
slice_width = spacing * length / len(indexes)
num = int(slice_width / spacing / 2)

#Define bin_width if octave spacing
if octave_bins is not None:
bin_width = 1 / slice_width

#Loop through and compute PSD
psd = pd.DataFrame()
for i in indexes:
window_start = max(0, i - num)
window_end = min(length, i + num)
slice_psd = welch(
df.iloc[window_start:window_end],
bin_width = bin_width,
scaling = scaling,
**kwargs
)
if octave_bins is not None:
slice_psd = to_octave(slice_psd, octave_bins=octave_bins, fstart = fstart, agg=agg)

if add_resultant:
slice_psd['Resultant'] = slice_psd.sum(axis=1)

slice_psd = slice_psd.reset_index().melt(id_vars=slice_psd.index.name)
slice_psd['timestamp'] = df.index[i]
psd = pd.concat([psd,slice_psd])

return psd
115 changes: 109 additions & 6 deletions endaq/calc/shock.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,21 +405,25 @@ def relative_displacement_static(accel: pd.DataFrame, omega: float, damp: float

def shock_spectrum(
accel: pd.DataFrame,
freqs: np.ndarray,
freqs: np.ndarray = None,
init_freq: float = 0.5,
bins_per_octave: float = 12.0,
damp: float = 0.0,
mode: typing.Literal["srs", "pvss"] = "srs",
two_sided: bool = False,
aggregate_axes: bool = False,
) -> pd.DataFrame:
"""
Calculate the shock spectrum of an acceleration signal.

S-Hanly marked this conversation as resolved.
Show resolved Hide resolved
:param accel: the absolute acceleration `y"`
:param freqs: the natural frequencies across which to calculate the spectrum
:param freqs: the natural frequencies across which to calculate the spectrum,
if `None` (the default) it uses `init_freq` and `bins_per_octave` to define them
:param init_freq: the initial frequency in the sequence; if `None`,
use the frequency corresponding to the data's duration, default is 0.5 Hz
:param bins_per_octave: the number of frequencies per octave, default is 12
:param damp: the damping coefficient `ζ`, related to the Q-factor by
`ζ = 1/(2Q)`; defaults to 0
:param mode: the type of spectrum to calculate:

- `'srs'` (default) specifies the Shock Response Spectrum (SRS)
- `'pvss'` specifies the Pseudo-Velocity Shock Spectrum (PVSS)
:param two_sided: whether to return for each frequency:
Expand All @@ -428,9 +432,7 @@ def shock_spectrum(
:param aggregate_axes: whether to calculate the column-wise resultant (`True`)
or calculate spectra along each column independently (`False`; default)
:return: the shock spectrum

.. seealso::

- `Pseudo Velocity Shock Spectrum Rules For Analysis Of Mechanical Shock, Howard A. Gaberson <https://info.endaq.com/hubfs/pvsrs_rules.pdf>`_
- `An Introduction To The Shock Response Spectrum, Tom Irvine, 9 July 2012 <http://www.vibrationdata.com/tutorials2/srs_intr.pdf>`_
- `SciPy transfer functions <https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.TransferFunction.html>`_
Expand All @@ -440,6 +442,8 @@ def shock_spectrum(
Documentation for the biquad function used to implement the transfer
function.
"""
if freqs is None:
freqs = utils.logfreqs(accel, init_freq = init_freq, bins_per_octave = bins_per_octave)
if two_sided and aggregate_axes:
raise ValueError("cannot enable both options `two_sided` and `aggregate_axes`")
freqs = np.asarray(freqs)
Expand Down Expand Up @@ -506,6 +510,105 @@ def shock_spectrum(
)


def rolling_shock_spectrum(
df: pd.DataFrame,
damp: float = 0.0,
mode: typing.Literal["srs", "pvss"] = "srs",
add_resultant: bool = True,
init_freq: float = 0.5,
bins_per_octave: float = 12.0,
num_slices: int = 100,
indexes: np.array = None,
index_values: np.array = None,
slice_width: float = None,
multiplier: float = 1.0,
disable_warnings: bool = True,
) -> pd.DataFrame:
"""
Compute Shock Response Spectrums for defined slices of a time series data set using :py:func:`~endaq.calc.shock.shock_spectrum()`
:param df: the input dataframe with an index defining the time in seconds or datetime
:param damp: the damping coefficient `ζ`, related to the Q-factor by
`ζ = 1/(2Q)`; defaults to 0
:param mode: the type of spectrum to calculate:
- `'srs'` (default) specifies the Shock Response Spectrum (SRS)
- `'pvss'` specifies the Pseudo-Velocity Shock Spectrum (PVSS)
:param add_resultant: if `True` (default) the column-wise resultant will
also be computed
:param aggregate_axes: whether to calculate the column-wise resultant (`True`)
or calculate spectra along each column independently (`False`; default)
:param init_freq: the initial frequency in the sequence; if `None`,
use the frequency corresponding to the data's duration, default is 0.5 Hz
:param bins_per_octave: the number of frequencies per octave, default is 12
:param num_slices: the number of slices to split the time series into, default is 100,
this is ignored if `indexes` is defined
:param indexes: the center index locations (not value) of each slice to compute the shock spectrum
:param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`)
:param slice_width: the time in seconds to center about each slice index,
if none is provided it will calculate one based upon the number of slices
:param add_resultant: if `True` the root sum of squares of each shock spectrum column will
also be computed, default is `False`
:param multiplier: applies a scale to the output
- 386.09 to convert from g to inches (in)
- 9806.65 to convert from g to millimeters (mm)
:param disable_warnings: if `True` (default) it disables the warnings on the initial frequency
:return: a dataframe containing all the shock spectrums, stacked on each other

See example use cases and syntax at :py:func:`~endaq.plot.plots.spectrum_over_time()`
which visualizes the output of this function in Heatmaps, Waterfall plots,
Surface plots, and Animations

"""
if disable_warnings:
warnings.filterwarnings('ignore', '.*too short*', )

length = len(df)

#Define center index locations of each slice if not provided
if indexes is None:
if index_values is not None:
indexes = np.zeros(len(index_values),int)
for i in range(len(indexes)):
indexes[i] = int((np.abs(df.index - index_values[i])).argmin())
else:
indexes = np.linspace(0, length, num_slices, endpoint = False, dtype=int)
indexes = indexes + int(indexes[1]/2)

#Calculate slice step size
spacing = utils.sample_spacing(df)
if slice_width is None:
slice_width = spacing * length / len(indexes)
num = int(slice_width / spacing / 2)

#Loop through and compute shock spectrum
srs = pd.DataFrame()
for i in indexes:
window_start = max(0, i - num)
window_end = min(length, i + num)
slice_srs = shock_spectrum(
df.iloc[window_start:window_end] * multiplier,
mode = mode,
damp = damp,
init_freq = init_freq,
bins_per_octave = bins_per_octave,
)
if add_resultant:
slice_srs['Resultant'] = shock_spectrum(
df.iloc[window_start:window_end] * multiplier,
mode = mode,
damp = damp,
init_freq = init_freq,
bins_per_octave = bins_per_octave,
aggregate_axes=True
)['resultant']


slice_srs = slice_srs.reset_index().melt(id_vars=slice_srs.index.name)
slice_srs['timestamp'] = df.index[i]
srs = pd.concat([srs,slice_srs])

return srs


@dataclass
class HalfSineWavePulse:
"""
Expand Down
Loading