diff --git a/endaq/calc/fft.py b/endaq/calc/fft.py index b6901d9..eaa649a 100644 --- a/endaq/calc/fft.py +++ b/endaq/calc/fft.py @@ -2,6 +2,7 @@ import typing from typing import Optional +import warnings import numpy as np import pandas as pd @@ -10,7 +11,7 @@ from endaq.calc import psd, utils -__all__ = ["aggregate_fft", "fft", "rfft", "dct", "dst"] +__all__ = ["aggregate_fft", "rolling_fft", "fft", "rfft", "dct", "dst"] def aggregate_fft(df, **kwargs): @@ -35,6 +36,85 @@ def aggregate_fft(df, **kwargs): return psd.welch(df, **kwargs) +def rolling_fft( + df: pd.DataFrame, + bin_width: float = 1.0, + num_slices: int = 100, + indexes: np.array = None, + index_values: np.array = None, + slice_width: float = None, + add_resultant: bool = True, + disable_warnings: bool = True, + **kwargs, +) -> pd.DataFrame: + """ + Compute FFTs for defined slices of a time series data set using :py:func:`~endaq.calc.fft.aggregate_fft()` + + :param df: the input dataframe with an index defining the time in seconds or datetime + :param bin_width: the bin width or resolution in Hz for the FFT + :param num_slices: the number of slices to split the time series into, default is 100, + this is ignored if `indexes` is defined + :param indexes: the center index locations (not value) of each slice to compute the FFT + :param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`) + :param slice_width: the time in seconds to center about each slice index, + if none is provided it will calculate one based upon the number of slices + :param add_resultant: if `True` the root sum of squares of each FFT column will + also be computed + :param disable_warnings: if `True` (default) it disables the warnings on the PSD length + :param kwargs: Other parameters to pass directly to :py:func:`~endaq.calc.fft.aggregate_fft()` + :return: a dataframe containing all the FFTs, stacked on each other + + See example use cases and syntax at :py:func:`~endaq.plot.spectrum_over_time()` + which visualizes the output of this function in Heatmaps, Waterfall plots, + Surface plots, and Animations + + """ + use_spectrogram = True + if (indexes is not None) | (index_values is not None): + use_spectrogram = False + + indexes, slice_width, num, length = utils._rolling_slice_definitions( + df, + indexes=indexes, + index_values=index_values, + num_slices=num_slices, + slice_width=slice_width + ) + + if use_spectrogram: + fft_df = psd.spectrogram( + df=df, + num_slices=num_slices, + bin_width=bin_width, + scaling='unit', + add_resultant=add_resultant, + disable_warnings=disable_warnings, + **kwargs + ) + else: + # Loop through and compute fft + fft_df = pd.DataFrame() + for i in indexes: + window_start = max(0, i - num) + window_end = min(length, i + num) + with warnings.catch_warnings(): + if disable_warnings: + warnings.filterwarnings('ignore', '.*greater than.*') + slice_fft = aggregate_fft( + df.iloc[window_start:window_end], + bin_width=bin_width, + **kwargs + ) + if add_resultant: + slice_fft['Resultant'] = slice_fft.pow(2).sum(axis=1).pow(0.5) + + slice_fft = slice_fft.reset_index().melt(id_vars=slice_fft.index.name) + slice_fft['timestamp'] = df.index[i] + fft_df = pd.concat([fft_df, slice_fft]) + + return fft_df + + def fft( df: pd.DataFrame, output: typing.Literal[None, "magnitude", "angle", "complex"] = None, diff --git a/endaq/calc/psd.py b/endaq/calc/psd.py index 6304ad8..0508794 100644 --- a/endaq/calc/psd.py +++ b/endaq/calc/psd.py @@ -4,6 +4,7 @@ import typing import warnings +import datetime import numpy as np import pandas as pd @@ -335,3 +336,265 @@ def vc_curves( # The PSD must already scale by ∆f -> need only scale by √∆f? # TODO make `density` parameter, scale differently depending on mode return np.sqrt(accel_psd.index[1]) * df_vel_oct.apply(np.sqrt) + + +def rolling_psd( + df: pd.DataFrame, + bin_width: float = 1.0, + octave_bins: float = None, + fstart: float = 1.0, + scaling: typing.Literal[None, "density", "spectrum", "parseval", "unit", "rms"] = None, + agg: typing.Union[ + typing.Literal["mean", "sum"], + typing.Callable[[np.ndarray, int], float], + ] = "mean", + freq_splits: np.array = None, + num_slices: int = 100, + indexes: np.array = None, + index_values: np.array = None, + slice_width: float = None, + add_resultant: bool = True, + disable_warnings: bool = True, + **kwargs, +) -> pd.DataFrame: + """ + Compute PSDs for defined slices of a time series data set using :py:func:`~endaq.calc.psd.welch()` + + :param df: the input dataframe with an index defining the time in seconds or datetime + :param bin_width: the bin width or resolution in Hz for the PSD, defaults to 1, + this is ignored if `octave_bins` is defined + :param octave_bins: the number of frequency bins in each octave, defaults to `None` + :param fstart: the lowest frequency for an octave PSD, defaults to 1 + :param scaling: the scaling of the output; `"density"` & `"spectrum"` + correspond to the same options in ``scipy.signal.welch``; `"parseval"` + will maintain the "energy" between the input & output, s.t. + ``welch(df, scaling="parseval").sum(axis="rows")`` is roughly equal to + ``df.abs().pow(2).sum(axis="rows")``; + `"unit"` will maintain the units and scale of the input data. `"rms"` will use `"parseval"` for the PSD + calculations and set `agg` to "sum", but then take the square root at the end + :param agg: the method for aggregating values into bins (only used if converting to octave or jagged); `'mean'` preserves + the PSD's area-under-the-curve, `'sum'` preserves the PSD's "energy" + :param freq_splits: the boundaries of the frequency bins to pass to :py:func:`~endaq.calc.psd.to_jagged()` + :param num_slices: the number of slices to split the time series into, default is 100, + this is ignored if `indexes` is defined + :param indexes: the center index locations (not value) of each slice to compute the PSD + :param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`) + :param slice_width: the time in seconds to center about each slice index, + if none is provided it will calculate one based upon the number of slices + :param add_resultant: if `True` (default) the root sum of squares of each PSD column will + also be computed + :param disable_warnings: if `True` (default) it disables the warnings on the PSD length + :param kwargs: Other parameters to pass directly to :py:func:`psd.welch` + :return: a dataframe containing all the PSDs, stacked on each other + + See example use cases and syntax at :py:func:`~endaq.plot.spectrum_over_time()` + which visualizes the output of this function in Heatmaps, Waterfall plots, + Surface plots, and Animations + + """ + use_spectrogram = True + if (indexes is not None) | (index_values is not None): + use_spectrogram = False + + indexes, slice_width, num, length = utils._rolling_slice_definitions( + df, + indexes=indexes, + index_values=index_values, + num_slices=num_slices, + slice_width=slice_width + ) + + if use_spectrogram: + if "window" not in kwargs: + kwargs["window"] = "hann" + psd_df = spectrogram( + df=df, + bin_width=bin_width, + num_slices=num_slices, + scaling=scaling, + octave_bins=octave_bins, + fstart=fstart, + freq_splits=freq_splits, + add_resultant=add_resultant, + disable_warnings=disable_warnings, + **kwargs + ) + else: + # Allow for RMS calculations + exp = 1 + if scaling == "rms": + scaling = "parseval" + exp = 0.5 + agg = "sum" + + # Define bin_width if octave spacing + if octave_bins is not None: + bin_width = 1 / slice_width + + # Loop through and compute PSD + psd_df = pd.DataFrame() + for i in indexes: + window_start = max(0, i - num) + window_end = min(length, i + num) + with warnings.catch_warnings(): + if disable_warnings: + warnings.filterwarnings('ignore', '.*greater than.*', ) + slice_psd = welch( + df.iloc[window_start:window_end], + bin_width=bin_width, + scaling=scaling, + **kwargs + ) + + if octave_bins is not None or freq_splits is not None: + with warnings.catch_warnings(): + if disable_warnings: + warnings.filterwarnings('ignore', '.*empty frequency.*') + if freq_splits is None: + slice_psd = to_octave(slice_psd, octave_bins=octave_bins, fstart=fstart, agg=agg) + else: + slice_psd = to_jagged(slice_psd, freq_splits=freq_splits, agg=agg) + + if add_resultant: + slice_psd['Resultant'] = slice_psd.sum(axis=1) + + slice_psd = (slice_psd ** exp).reset_index().melt(id_vars=slice_psd.index.name) + slice_psd['timestamp'] = df.index[i] + psd_df = pd.concat([psd_df, slice_psd]) + + return psd_df + + +def spectrogram( + df: pd.DataFrame, + num_slices: int = 100, + scaling: typing.Literal[None, "density", "spectrum", "parseval", "unit", "rms"] = None, + bin_width: float = 1.0, + octave_bins: float = None, + fstart: float = 1.0, + agg: typing.Union[ + typing.Literal["mean", "sum"], + typing.Callable[[np.ndarray, int], float], + ] = "mean", + freq_splits: np.array = None, + add_resultant: bool = True, + disable_warnings: bool = True, + **kwargs, +) -> pd.DataFrame: + """ + Compute a spectrogram for a time series data set using :py:func:`scipy.signal.spectrogram()` + + :param df: the input dataframe with an index defining the time in seconds or datetime + :param num_slices: the number of slices to split the time series into, default is 100 + :param scaling: the scaling of the output; `"density"` & `"spectrum"` + correspond to the same options in ``scipy.signal.welch``; `"parseval"` + will maintain the "energy" between the input & output, s.t. + ``welch(df, scaling="parseval").sum(axis="rows")`` is roughly equal to + ``df.abs().pow(2).sum(axis="rows")``; + `"unit"` will maintain the units and scale of the input data. `"rms"` will use `"parseval"` for the PSD + calculations and set `agg` to "sum", but then take the square root at the end + :param bin_width: the bin width or resolution in Hz for the PSD or FFT, defaults to 1 + :param octave_bins: the number of frequency bins in each octave, defaults to `None` + :param fstart: the lowest frequency for an octave spaced output, defaults to 1 + :param agg: the method for aggregating values into bins (only used if converting to octave or jagged); `'mean'` preserves + the PSD's area-under-the-curve, `'sum'` preserves the PSD's "energy" + :param freq_splits: the boundaries of the frequency bins to pass to :py:func:`~endaq.calc.psd.to_jagged()` + :param add_resultant: if `True` (default) the root sum of squares of each PSD column will + also be computed + :param disable_warnings: if `True` (default) it disables the warnings in helper functions + :param kwargs: Other parameters to pass directly to :py:func:`scipy.signal.spectrogram()` + :return: a dataframe containing all the spectrograms "melted" with columns defining the value, frequency, timeslice, + and original column from the input dataframe + + .. seealso:: + + - `SciPy Spectrogram method `_ + Documentation for the function wrapped internally. + """ + + # Apply Scaling + if scaling == "parseval": + kwargs["scaling"] = "density" + elif scaling == "unit": + kwargs["scaling"] = "density" + elif scaling == "rms": + kwargs["scaling"] = "density" + agg = "sum" + elif scaling is not None: + kwargs["scaling"] = scaling + + # Windowing, Default to Boxcar + if "window" not in kwargs: + kwargs["window"] = "boxcar" + + # Determine Scipy.Spectrogram Stats + fs = 1 / utils.sample_spacing(df) + nperseg = int(fs / bin_width) + num_f = int(len(df) / fs / bin_width) + noverlap = nperseg - (nperseg * num_f // num_slices) + + # Loop Through and Compute Spectrogram + spec_df = pd.DataFrame() + if add_resultant: + res_df = pd.DataFrame() + for c in df.columns: + f, t, spec = scipy.signal.spectrogram( + x=df[c].to_numpy(), + fs=fs, + nperseg=nperseg, + mode='psd', + noverlap=noverlap, + **kwargs + ) + + # create dataframe + spec_df_col = pd.DataFrame( + data=spec, + columns=t, + index=f + ) + spec_df_col.index.name = 'frequency (Hz)' + + # Do octave spacing + if octave_bins is not None or freq_splits is not None: + with warnings.catch_warnings(): + if disable_warnings: + warnings.filterwarnings('ignore', '.*empty frequency.*') + if freq_splits is None: + spec_df_col = to_octave(spec_df_col, octave_bins=octave_bins, fstart=fstart, agg=agg) + else: + spec_df_col = to_jagged(spec_df_col, freq_splits=freq_splits, agg=agg) + + # Melt & concatenate dataframe + spec_df_col = spec_df_col.reset_index().melt(id_vars='frequency (Hz)') + spec_df_col.columns = ['frequency (Hz)', 'timestamp', 'value'] + spec_df_col['variable'] = c + spec_df = pd.concat([spec_df, spec_df_col]) + + # add resultant + if add_resultant: + if len(res_df) == 0: + res_df = spec_df_col + res_df['variable'] = "Resultant" + else: + res_df.value += spec_df_col.value + + # Add resultant + if add_resultant: + spec_df = pd.concat([spec_df, res_df]) + + # Update timestamps to be based on input, deal with datetime + if isinstance(df.index[0], datetime.datetime): + spec_df.timestamp = pd.to_timedelta(spec_df.timestamp.astype(float), unit='s') + df.index[0] + else: + spec_df.timestamp = spec_df.timestamp.astype(float) + df.index[0] + + # Apply scales + if scaling == "parseval": + spec_df.value = spec_df.value * f[1] + if scaling == "rms": + spec_df.value = spec_df.value ** 0.5 + elif scaling == "unit": + spec_df.value = (spec_df.value * 2 * f[1]) ** 0.5 + + return spec_df diff --git a/endaq/calc/shock.py b/endaq/calc/shock.py index a354d0b..8b39965 100644 --- a/endaq/calc/shock.py +++ b/endaq/calc/shock.py @@ -405,21 +405,26 @@ def relative_displacement_static(accel: pd.DataFrame, omega: float, damp: float def shock_spectrum( accel: pd.DataFrame, - freqs: np.ndarray, - damp: float = 0.0, + freqs: np.ndarray = None, + init_freq: float = 0.5, + bins_per_octave: float = 12.0, + damp: float = 0.05, mode: typing.Literal["srs", "pvss"] = "srs", two_sided: bool = False, aggregate_axes: bool = False, ) -> pd.DataFrame: """ Calculate the shock spectrum of an acceleration signal. - + :param accel: the absolute acceleration `y"` - :param freqs: the natural frequencies across which to calculate the spectrum + :param freqs: the natural frequencies across which to calculate the spectrum, + if `None` (the default) it uses `init_freq` and `bins_per_octave` to define them + :param init_freq: the initial frequency in the sequence; if `None`, + use the frequency corresponding to the data's duration, default is 0.5 Hz + :param bins_per_octave: the number of frequencies per octave, default is 12 :param damp: the damping coefficient `ζ`, related to the Q-factor by - `ζ = 1/(2Q)`; defaults to 0 + `ζ = 1/(2Q)`; defaults to 0.05 :param mode: the type of spectrum to calculate: - - `'srs'` (default) specifies the Shock Response Spectrum (SRS) - `'pvss'` specifies the Pseudo-Velocity Shock Spectrum (PVSS) :param two_sided: whether to return for each frequency: @@ -428,9 +433,7 @@ def shock_spectrum( :param aggregate_axes: whether to calculate the column-wise resultant (`True`) or calculate spectra along each column independently (`False`; default) :return: the shock spectrum - .. seealso:: - - `Pseudo Velocity Shock Spectrum Rules For Analysis Of Mechanical Shock, Howard A. Gaberson `_ - `An Introduction To The Shock Response Spectrum, Tom Irvine, 9 July 2012 `_ - `SciPy transfer functions `_ @@ -440,6 +443,8 @@ def shock_spectrum( Documentation for the biquad function used to implement the transfer function. """ + if freqs is None: + freqs = utils.logfreqs(accel, init_freq=init_freq, bins_per_octave=bins_per_octave) if two_sided and aggregate_axes: raise ValueError("cannot enable both options `two_sided` and `aggregate_axes`") freqs = np.asarray(freqs) @@ -506,6 +511,96 @@ def shock_spectrum( ) +def rolling_shock_spectrum( + df: pd.DataFrame, + damp: float = 0.05, + mode: typing.Literal["srs", "pvss"] = "srs", + add_resultant: bool = True, + init_freq: float = 0.5, + bins_per_octave: float = 12.0, + num_slices: int = 100, + indexes: np.array = None, + index_values: np.array = None, + slice_width: float = None, + disable_warnings: bool = True, +) -> pd.DataFrame: + """ + Compute Shock Response Spectrums for defined slices of a time series data set using :py:func:`~endaq.calc.shock.shock_spectrum()` + + :param df: the input dataframe with an index defining the time in seconds or datetime + :param damp: the damping coefficient `ζ`, related to the Q-factor by + `ζ = 1/(2Q)`; defaults to 0.05 + :param mode: the type of spectrum to calculate: + + * `'srs'` (default) specifies the Shock Response Spectrum (SRS) + * `'pvss'` specifies the Pseudo-Velocity Shock Spectrum (PVSS) + :param add_resultant: if `True` (default) the column-wise resultant will + also be computed + :param add_resultant: whether to calculate the column-wise resultant (`True`) + or calculate spectra along each column independently (`False`; default) + :param init_freq: the initial frequency in the sequence; if `None`, + use the frequency corresponding to the data's duration, default is 0.5 Hz + :param bins_per_octave: the number of frequencies per octave, default is 12 + :param num_slices: the number of slices to split the time series into, default is 100, + this is ignored if `indexes` is defined + :param indexes: the center index locations (not value) of each slice to compute the shock spectrum + :param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`) + :param slice_width: the time in seconds to center about each slice index, + if none is provided it will calculate one based upon the number of slices + :param add_resultant: if `True` the root sum of squares of each shock spectrum column will + also be computed, default is `False` + :param disable_warnings: if `True` (default) it disables the warnings on the initial frequency + :return: a dataframe containing all the shock spectrums, stacked on each other + + See example use cases and syntax at :py:func:`~endaq.plot.spectrum_over_time()` + which visualizes the output of this function in Heatmaps, Waterfall plots, + Surface plots, and Animations + + """ + + indexes, slice_width, num, length = utils._rolling_slice_definitions( + df, + indexes=indexes, + index_values=index_values, + num_slices=num_slices, + slice_width=slice_width + ) + + # Loop through and compute shock spectrum + srs = pd.DataFrame() + for i in indexes: + window_start = max(0, i - num) + window_end = min(length, i + num) + with warnings.catch_warnings(): + if disable_warnings: + warnings.filterwarnings('ignore', '.*too short*', ) + slice_srs = shock_spectrum( + df.iloc[window_start:window_end], + mode=mode, + damp=damp, + init_freq=init_freq, + bins_per_octave=bins_per_octave, + ) + if add_resultant: + with warnings.catch_warnings(): + if disable_warnings: + warnings.filterwarnings('ignore', '.*too short*', ) + slice_srs['Resultant'] = shock_spectrum( + df.iloc[window_start:window_end], + mode=mode, + damp=damp, + init_freq=init_freq, + bins_per_octave=bins_per_octave, + aggregate_axes=True + )['resultant'] + + slice_srs = slice_srs.reset_index().melt(id_vars=slice_srs.index.name) + slice_srs['timestamp'] = df.index[i] + srs = pd.concat([srs, slice_srs]) + + return srs + + @dataclass class HalfSineWavePulse: """ diff --git a/endaq/calc/utils.py b/endaq/calc/utils.py index 87e3ebc..a3bffa0 100644 --- a/endaq/calc/utils.py +++ b/endaq/calc/utils.py @@ -160,3 +160,49 @@ def resample(df: pd.DataFrame, sample_rate: Optional[float] = None) -> pd.DataFr resampled_df.index.name = df.index.name return resampled_df + + +def _rolling_slice_definitions( + df: pd.DataFrame, + num_slices: int = 100, + indexes: np.array = None, + index_values: np.array = None, + slice_width: float = None, +): + """ + Compute parameters needed to define index locations and slice width for rolling computations + + :param df: the input dataframe with an index defining the time in seconds or datetime + :param num_slices: the number of slices to split the time series into, default is 100, + this is ignored if `indexes` is defined + :param indexes: the center index locations (not value) of each slice to compute the FFT + :param index_values: the index values of each peak event to quantify (slower but more intuitive than using `indexes`) + :param slice_width: the time in seconds to center about each slice index, + if none is provided it will calculate one based upon the number of slices + :return: a tuple of `indexes`, `slice_width`, `num`, and `length` + + See example use cases and syntax at :py:func:`~endaq.plot.spectrum_over_time()` + which visualizes the output of this function in Heatmaps, Waterfall plots, + Surface plots, and Animations + + """ + + length = len(df) + + # Define center index locations of each slice if not provided + if indexes is None: + if index_values is not None: + indexes = np.zeros(len(index_values), int) + for i in range(len(indexes)): + indexes[i] = int((np.abs(df.index - index_values[i])).argmin()) + else: + indexes = np.linspace(0, length, num_slices, endpoint=False, dtype=int) + indexes = indexes + int(indexes[1] / 2) + + # Calculate slice step size + spacing = sample_spacing(df) + if slice_width is None: + slice_width = spacing * length / len(indexes) + num = int(slice_width / spacing / 2) + + return indexes, slice_width, num, length diff --git a/endaq/plot/plots.py b/endaq/plot/plots.py index a4f1e53..f2b5cd1 100644 --- a/endaq/plot/plots.py +++ b/endaq/plot/plots.py @@ -1,17 +1,22 @@ from __future__ import annotations +import typing +import warnings + import numpy as np import pandas as pd import plotly.graph_objects as go import plotly.express as px +from plotly import colors import scipy.spatial.transform from plotly.subplots import make_subplots from scipy import signal, spatial from typing import Optional from collections.abc import Container +import datetime from endaq.calc import sample_spacing -from endaq.calc.psd import to_octave, welch +from endaq.calc.psd import to_octave, welch, rolling_psd from .utilities import determine_plotly_map_zoom, get_center_of_coordinates from .dashboards import rolling_enveloped_dashboard @@ -26,6 +31,7 @@ 'rolling_min_max_envelope', 'around_peak', 'animate_quaternion', + 'spectrum_over_time' ] DEFAULT_ATTRIBUTES_TO_PLOT_INDIVIDUALLY = np.array([ @@ -273,10 +279,11 @@ def gen_map(df_map: pd.DataFrame, mapbox_access_token: str, filter_points_by_pos def octave_spectrogram(df: pd.DataFrame, window: float, bins_per_octave: int = 3, freq_start: float = 20.0, max_freq: float = float('inf'), db_scale: bool = True, log_scale_y_axis: bool = True - ) -> go.Figure: + ) -> tuple[pd.DataFrame, go.Figure]: """ - Produces an octave spectrogram of the given data. - + Produces an octave spectrogram of the given data, this is a wrapper around + :py:func:`~endaq.calc.psd.rolling_psd()` and :py:func:`~endaq.plot.spectrum_over_time()` + :param df: The dataframe of sensor data. This must only have 1 column. :param window: The time window for each of the columns in the spectrogram :param bins_per_octave: The number of frequency bins per octave @@ -285,55 +292,18 @@ def octave_spectrogram(df: pd.DataFrame, window: float, bins_per_octave: int = 3 :param db_scale: If the spectrogram should be log-scaled for visibility (with 10*log10(x)) :param log_scale_y_axis: If the y-axis of the plot should be log scaled :return: a tuple containing: - - - the frequency bins - - the time bins - - the spectrogram data + - dataframe of the spectrogram data - the corresponding plotly figure """ if len(df.columns) != 1: raise ValueError("The parameter 'df' must have only one column of data!") - ary = df.values.squeeze() - - fs = 1/sample_spacing(df)#(len(df) - 1) / (df.index[-1] - df.index[0]) - N = int(fs * window) #Number of points in the fft - w = signal.blackman(N, False) - - freqs, bins, Pxx = signal.spectrogram(ary, fs, window=w, nperseg=N, noverlap=0) - - time_dfs = [pd.DataFrame({bins[j]: Pxx[:, j]}, index=freqs) for j in range(len(bins))] - - octave_dfs = list(map(lambda x: to_octave(x, freq_start, octave_bins=bins_per_octave, agg='sum'), time_dfs)) - - combined_df = pd.concat(octave_dfs, axis=1) - - freqs = combined_df.index.values - Pxx = combined_df.values - - include_freqs_mask = freqs <= max_freq - Pxx = Pxx[include_freqs_mask] - freqs = freqs[include_freqs_mask] - - if db_scale: - Pxx = 10 * np.log10(Pxx) - - trace = [go.Heatmap(x=bins, y=freqs, z=Pxx, colorscale='Jet')] - layout = go.Layout( - yaxis={'title': 'Frequency (Hz)'}, - xaxis={'title': 'Time (s)'}, - ) - - fig = go.Figure(data=trace, layout=layout) - - if log_scale_y_axis: - fig.update_yaxes(type="log") + num_slices = int(len(df) * sample_spacing(df) / window) - fig.update_traces(showscale=False) + df_psd = rolling_psd(df, num_slices=num_slices, octave_bins=bins_per_octave, + fstart=freq_start) - data_df = pd.DataFrame(Pxx, index=freqs, columns=bins) - - return data_df, fig + return df_psd, spectrum_over_time(df_psd, freq_max=max_freq, log_val=db_scale, log_freq=log_scale_y_axis) def octave_psd_bar_plot(df: pd.DataFrame, bins_per_octave: int = 3, f_start: float = 20.0, yaxis_title: str = '', @@ -538,3 +508,428 @@ def animate_quaternion(df, rate=6., scale=1.): fig.layout.updatemenus[0].buttons[0].args[1]['frame']['duration'] = int(1000//rate) return fig + + +def spectrum_over_time( + df: pd.DataFrame, + plot_type: typing.Literal["Heatmap", + "Surface", + "Waterfall", + "Animation", + "Peak", + "Lines"] = "Heatmap", + var_column: str = 'variable', + var_to_process: str = None, + time_column: str = 'timestamp', + freq_column: str = 'frequency (Hz)', + val_column: str = 'value', + freq_min: float = None, + freq_max: float = None, + log_freq: bool = False, + log_val: bool = False, + round_time: bool = True, + waterfall_line_sequence: bool = True, + zsmooth: str = "best", + waterfall_line_color: str = '#EE7F27', + min_median_max_line_color: str = '#6914F0', +) -> go.Figure: + """ + Generate a 3D Plotly figure from a stacked spectrum to visualize how the frequency content changes over time + + :param df: the input dataframe with columns defining the frequency content, timestamps, values, and variables, + see the following functions which provides outputs that would then be passed into this function as an input: + + * :py:func:`~endaq.calc.fft.rolling_fft()` + * :py:func:`~endaq.calc.psd.rolling_psd()` + * :py:func:`~endaq.calc.shock.rolling_shock_spectrum()` + * :py:func:`~endaq.batch.GetDataBuilder.add_psd()` + * :py:func:`~endaq.batch.GetDataBuilder.add_pvss()` + :param plot_type: the type of plot to display the spectrum, options are: + + * `Heatmap`: a 2D visualization with the color defining the z value (the default) + * `Surface`: similar to `Heatmap` but the z value is also projected "off the page" + * `Waterfall`: distinct lines are plotted per time slice in a 3D view + * `Animation`: a 2D display of the waterfall but the spectrum changes with animation frames + * `Peak`: per timestamp the peak frequency is determined and plotted against time + * `Lines`: the value in each frequency bin is plotted against time + :param var_column: the column name in the dataframe that defines the different variables, default is `"variable"` + :param var_to_process: the variable value in the `var_column` to filter the input df down to, + if none is provided (the default) this function will filter to the first value + :param time_column: the column name in the dataframe that defines the timestamps, default is `"timestamp"` + :param freq_column: the column name in the dataframe that defines the frequency, default is `"frequency (Hz)"` + :param val_column: the column name in the dataframe that defines the values, default is `"value"` + :param freq_min: the minimum of the y axis (frequency) to include in the figure, + default is None meaning it will display all content + :param freq_max: the maximum of the y axis (frequency) to include in the figure, + default is None meaning it will display all content + :param log_freq: if `True` the frequency will be in a log scale, default is `False` + :param log_val: if `True` the values will be in a log scale, default is `False` + :param round_time: if `True` (default) the time values will be rounded to the nearest second for datetimes and + hundredths of a second for floats + :param waterfall_line_sequence: if `True` the waterfall line colors are defined with a color scale, + if `False` all lines will have the same color, default is `True` + :param zsmooth: the Plotly smooth algorithm to use in the heatmap, default is `"best"` which looks ideal but + `"fast"` will be more responsive, or `False` will attempt no smoothing + :param waterfall_line_color: the color to use for all lines in the Waterfall plot if `waterfall_line_sequence` is + `False` + :param min_median_max_line_color: the color to use for the min, max, and median lines in the Animation, if set to + `None` these lines won't be added, default is `'#6914F0'` + :return: a Plotly figure visualizing the spectrum over time + + + Here's a few examples from a dataset recorded with an enDAQ sensor on a motorcycle + as it revved the engine which resulted in changing frequency content + + .. code:: python + + import pandas as pd + import endaq + + # Set Theme + endaq.plot.utilities.set_theme() + + # Get Vibration Data + df_vibe = pd.read_csv('https://info.endaq.com/hubfs/data/motorcycle-vibration-moving-frequency.csv', index_col=0) + + # Calculate a Rolling FFT + fft = endaq.calc.fft.rolling_fft(df_vibe, num_slices=200, add_resultant=True) + + # Visualize the Rolling FFT as a Heatmap + heatmap = endaq.plot.plots.spectrum_over_time(fft, plot_type='Heatmap', freq_max=200, var_to_process='Resultant') + heatmap.show() + + # Plot the Peak Frequency vs Time + peak = endaq.plot.plots.spectrum_over_time(fft, plot_type='Peak', freq_max=200, var_to_process='Resultant') + peak.show() + + # Visualize as a Surface Plot + surface = endaq.plot.plots.spectrum_over_time(fft, plot_type='Surface', freq_max=200, var_to_process='Resultant') + surface.show() + + # Visualize as a Waterfall + waterfall = endaq.plot.plots.spectrum_over_time(fft, plot_type='Waterfall', freq_max=200, var_to_process='Resultant') + waterfall.show() + + .. plotly:: + :fig-vars: heatmap, peak, surface, waterfall + + import pandas as pd + import endaq + + # Set Theme + endaq.plot.utilities.set_theme() + + # Get Vibration Data + df_vibe = pd.read_csv('https://info.endaq.com/hubfs/data/motorcycle-vibration-moving-frequency.csv', index_col=0) + + # Calculate a Rolling FFT + fft = endaq.calc.fft.rolling_fft(df_vibe, num_slices=200, add_resultant=True) + + # Visualize the Rolling FFT as a Heatmap + heatmap = endaq.plot.plots.spectrum_over_time(fft, plot_type='Heatmap', freq_max=200, var_to_process='Resultant') + heatmap.show() + + # Plot the Peak Frequency vs Time + peak = endaq.plot.plots.spectrum_over_time(fft, plot_type='Peak', freq_max=200, var_to_process='Resultant') + peak.show() + + # Visualize as a Surface Plot + surface = endaq.plot.plots.spectrum_over_time(fft, plot_type='Surface', freq_max=200, var_to_process='Resultant') + surface.show() + + # Visualize as a Waterfall + waterfall = endaq.plot.plots.spectrum_over_time(fft, plot_type='Waterfall', freq_max=200, var_to_process='Resultant') + waterfall.show() + + Here's another few examples with a longer dataset with DatetimeIndex of a car engine during a morning commute + + .. code:: python + + import pandas as pd + import endaq + + # Set Theme + endaq.plot.utilities.set_theme() + + # Get a Longer Dataset with DatetimeIndex + engine = endaq.ide.get_primary_sensor_data('https://info.endaq.com/hubfs/data/Commute.ide', measurement_type='accel', + time_mode='datetime') + + # Compute PSD + psd = endaq.calc.psd.rolling_psd(engine, num_slices=500, add_resultant=True, octave_bins=12, fstart=4) + + # Visualize as a Heatmap + heatmap2 = endaq.plot.plots.spectrum_over_time(psd, plot_type='Heatmap', var_to_process='Resultant', zsmooth='best', + log_freq=True, log_val=True) + heatmap2.show() + + # Visualize as an Animation + animation = endaq.plot.plots.spectrum_over_time(psd, plot_type='Animation', var_to_process='Resultant', + log_freq=True, log_val=True) + animation.show() + + # Use Rolling PSD to Calculate RMS in Certain Frequency Bins + rms = endaq.calc.psd.rolling_psd(engine, num_slices=500, scaling='rms', add_resultant=True, + freq_splits=[1, 20, 60, 300, 3000]) + + # Plot the RMS per Frequency Bin Over Time + lines = endaq.plot.plots.spectrum_over_time(rms, plot_type='Lines', log_val=False, var_to_process='Resultant') + lines.show() + + # Compute Pseudo Velocity at Specific Times + pvss = endaq.calc.shock.rolling_shock_spectrum(engine, slice_width=2.0, add_resultant=True, + mode='pvss', init_freq=4, damp=0.05, + index_values=pd.DatetimeIndex(['2016-08-02 12:07:15', + '2016-08-02 12:08:01', + '2016-08-02 12:10:06'], tz='UTC')) + + # Visualize as a Waterfall + waterfall2 = endaq.plot.plots.spectrum_over_time(pvss, plot_type='Waterfall', log_freq=True, log_val=True, + var_to_process='Resultant', waterfall_line_sequence=False) + waterfall2.show() + + .. plotly:: + :fig-vars: heatmap2, animation, lines, waterfall2 + + import pandas as pd + import endaq + + # Set Theme + endaq.plot.utilities.set_theme() + + # Get a Longer Dataset with DatetimeIndex + engine = endaq.ide.get_primary_sensor_data('https://info.endaq.com/hubfs/data/Commute.ide', measurement_type='accel', + time_mode='datetime') + + # Compute PSD + psd = endaq.calc.psd.rolling_psd(engine, num_slices=500, add_resultant=True, octave_bins=12, fstart=4) + + # Visualize as a Heatmap + heatmap2 = endaq.plot.plots.spectrum_over_time(psd, plot_type='Heatmap', var_to_process='Resultant', zsmooth='best', + log_freq=True, log_val=True) + heatmap2.show() + + # Visualize as an Animation + animation = endaq.plot.plots.spectrum_over_time(psd, plot_type='Animation', var_to_process='Resultant', + log_freq=True, log_val=True) + animation.show() + + # Use Rolling PSD to Calculate RMS in Certain Frequency Bins + rms = endaq.calc.psd.rolling_psd(engine, num_slices=500, scaling='rms', add_resultant=True, + freq_splits=[1, 20, 60, 300, 3000]) + + # Plot the RMS per Frequency Bin Over Time + lines = endaq.plot.plots.spectrum_over_time(rms, plot_type='Lines', log_val=False, var_to_process='Resultant') + lines.show() + + # Compute Pseudo Velocity at Specific Times + pvss = endaq.calc.shock.rolling_shock_spectrum(engine, slice_width=2.0, add_resultant=True, + mode='pvss', init_freq=4, damp=0.05, + index_values=pd.DatetimeIndex(['2016-08-02 12:07:15', + '2016-08-02 12:08:01', + '2016-08-02 12:10:06'], tz='UTC')) + + # Visualize as a Waterfall + waterfall2 = endaq.plot.plots.spectrum_over_time(pvss, plot_type='Waterfall', log_freq=True, log_val=True, + var_to_process='Resultant', waterfall_line_sequence=False) + waterfall2.show() + + """ + df = df.copy() + + # Filter to one variable + if var_to_process is None: + var_to_process = df[var_column].unique()[0] + df = df.loc[df[var_column] == var_to_process] + + # Round time + if round_time: + if isinstance(df[time_column].iloc[0], datetime.datetime): + df[time_column] = df[time_column].round('s') + else: + df[time_column] = np.round(df[time_column].to_numpy(), 2) + + # Filter frequency + df = df.loc[df[freq_column] > 0.0] + if freq_max is not None: + df = df.loc[df[freq_column] <= freq_max] + if freq_min is not None: + df = df.loc[df[freq_column] >= freq_min] + + # Remove 0s + df = df.loc[df[val_column] > 0] + + # Check Length of Dataframe + if len(df) > 100000: + warnings.warn( + "plot data is very large, may be unresponsive, suggest limiting frequency range and/or using less slices", + RuntimeWarning, + ) + + # Create pivot table + df_pivot = df.copy() + first_step = df_pivot[freq_column].iloc[1] - df_pivot[freq_column].iloc[0] + second_step = df_pivot[freq_column].iloc[2] - df_pivot[freq_column].iloc[1] + if np.isclose(first_step, second_step): + round_freq = np.round(df_pivot[freq_column].min(), 0) + df_pivot[freq_column] = np.round(df_pivot[freq_column].to_numpy() / round_freq, 0) * round_freq + df_pivot = df_pivot.pivot_table(columns=time_column, index=freq_column, values=val_column) + + # Heatmap & Surface + if plot_type in ["Heatmap", "Surface"]: + # Deal with Datetime + x_type = float + if isinstance(df_pivot.columns[0], datetime.datetime): + if plot_type == "Surface": + df_pivot.columns = (df_pivot.columns - df_pivot.columns[0]).total_seconds() + else: + x_type = str + + # Build Dictionary of Plot Data, Apply Log Scale If Needed + data_dict = { + 'x': df_pivot.columns.astype(x_type), + 'y': df_pivot.index.astype(float), + 'z': df_pivot.to_numpy().astype(float), + 'connectgaps': True + } + if log_val: + data_dict['z'] = np.log10(data_dict['z']) + + # Generate Figures + if plot_type == "Heatmap": + fig = go.Figure(data=go.Heatmap(data_dict, zsmooth=zsmooth)).update_layout( + xaxis_title_text='Timestamp', yaxis_title_text='Frequency (Hz)') + if log_freq: + fig.update_layout(yaxis_type='log') + else: + fig = go.Figure(data=go.Surface(data_dict)) + fig.update_traces(showscale=False) + + # Waterfall + elif plot_type == 'Waterfall': + # Define Colors + if waterfall_line_sequence: + color_sequence = colors.sample_colorscale( + [[0.0, '#6914F0'], + [0.2, '#3764FF'], + [0.4, '#2DB473'], + [0.6, '#FAC85F'], + [0.8, '#EE7F27'], + [1.0, '#D72D2D']], len(df[time_column].unique())) + else: + color_sequence = [waterfall_line_color] + + # Deal with Datetime + df['label_column'] = df[time_column] + if isinstance(df[time_column].iloc[0], datetime.datetime): + df[time_column] = (df[time_column] - df[time_column].iloc[0]).dt.total_seconds() + df['label_column'] = df['label_column'].dt.tz_convert(None).astype(str) + + # Generate Figure + fig = px.line_3d( + df, + x=time_column, + y=freq_column, + z=val_column, + color='label_column', + color_discrete_sequence=color_sequence).update_layout( + legend_orientation='v', + legend_y=1, + legend_title_text='Timestamps' + ) + + # Animation + elif plot_type == 'Animation': + # Deal with Datetime + if isinstance(df[time_column].iloc[0], datetime.datetime): + df[time_column] = df[time_column].dt.tz_convert(None).astype(str) + + # Generate Figure + fig = px.line( + df, + animation_frame=time_column, + x=freq_column, + y=val_column, + log_y=log_val, + log_x=log_freq + ).update_layout( + showlegend=False, + yaxis_title_text='', + xaxis_title_text='Frequency (Hz)' + ) + + # Add min, max, median lines + if min_median_max_line_color is not None: + # Add Max + df_t = df_pivot.max(axis=1).dropna() + fig.add_trace(go.Scatter( + x=df_t.index, + y=df_t, + mode='lines', + line_color=min_median_max_line_color, + )) + + # Add Min + df_t = df_pivot.min(axis=1).dropna() + fig.add_trace(go.Scatter( + x=df_t.index, + y=df_t, + mode='lines', + line_color=min_median_max_line_color, + )) + + # Add Median + df_t = df_pivot.median(axis=1).dropna() + fig.add_trace(go.Scatter( + x=df_t.index, + y=df_t, + mode='lines', + line_color=min_median_max_line_color, + line_dash='dash', + )) + + # Peak Frequencies + elif plot_type == 'Peak': + fig = px.scatter( + df_pivot.idxmax(), + log_y=log_freq + ).update_layout( + showlegend=False, + yaxis_title_text='Peak Frequency (Hz)', + xaxis_title_text='Timestamp' + ) + + # Lines per Frequency Bin + elif plot_type == 'Lines': + df_pivot.index = np.round(df_pivot.index, 2) + fig = px.line( + df_pivot.T, + log_y=log_val + ).update_layout( + legend_title_text='Frequency Bin (Hz)', + yaxis_title_text='', + xaxis_title_text='Timestamp', + legend_orientation='v', + legend_y=1, + ) + + else: + raise ValueError(f"invalid plot type {plot_type}") + + # Add Labels to 3D plots + if plot_type in ["Surface", "Waterfall"]: + fig.update_scenes( + aspectratio_x=2.0, + aspectratio_y=1.0, + aspectratio_z=0.3, + xaxis_title_text='Timestamp', + yaxis_title_text='Frequency (Hz)', + zaxis_title_text='' + ) + if log_freq: + fig.update_scenes(yaxis_type='log') + if log_val: + fig.update_scenes(zaxis_type='log') + fig.update_layout(scene_camera=dict(eye=dict(x=-1.5, y=-1.5, z=1))) + + return fig diff --git a/tests/calc/test_fft.py b/tests/calc/test_fft.py index 7e87768..45e31af 100644 --- a/tests/calc/test_fft.py +++ b/tests/calc/test_fft.py @@ -139,5 +139,59 @@ def test_fft_exceptions(self, normalization, output, n, raises): df = pd.DataFrame(np.arange(10)) with raises: - fft.rfft(df, norm=normalization, output=output, nfft=n) + + +@pytest.fixture +def df_test(): + # Build Time Array + time = np.arange(1000) / 1000 + + # Build Dataframe with Noise + df_accel = pd.DataFrame({ + 'time': np.concatenate((time, time + 1.0)), + 'A': np.concatenate(( + np.sin(2 * np.pi * 10 * time), + np.sin(2 * np.pi * 13 * time)) + ) + }).set_index('time') + + # Add Random + df_accel.A = df_accel.A + np.random.random(2000) * 2 - 1 + + return df_accel + + +class TestRollingFFT: + + def test_using_spectrogram(self, df_test): + df_rolling_fft = fft.rolling_fft( + df_test, + num_slices=2, + add_resultant=False + ) + times = df_rolling_fft.timestamp.unique() + + npt.assert_almost_equal( + df_rolling_fft[df_rolling_fft.timestamp == times[0]].value.to_numpy(), + fft.aggregate_fft(df_test[:1.0])['A'].to_numpy()) + npt.assert_almost_equal( + df_rolling_fft[df_rolling_fft.timestamp == times[1]].value.to_numpy(), + fft.aggregate_fft(df_test[1.0:])['A'].to_numpy()) + + @pytest.mark.filterwarnings("ignore::UserWarning") + def test_defined_slices(self, df_test): + df_rolling_fft = fft.rolling_fft( + df_test, + index_values=[1.0, 1.5], + slice_width=0.5, + add_resultant=False, + ) + + # Do Assertions + npt.assert_almost_equal( + df_rolling_fft[df_rolling_fft.timestamp == 1.0].value.to_numpy(), + fft.aggregate_fft(df_test.iloc[750:1250])['A'].to_numpy()) + npt.assert_almost_equal( + df_rolling_fft[df_rolling_fft.timestamp == 1.5].value.to_numpy(), + fft.aggregate_fft(df_test.iloc[1250:1750])['A'].to_numpy()) diff --git a/tests/calc/test_psd.py b/tests/calc/test_psd.py index 13c003c..663bb03 100644 --- a/tests/calc/test_psd.py +++ b/tests/calc/test_psd.py @@ -159,3 +159,53 @@ def test_to_octave(psd_df, agg, expt_f, expt_array): calc_df = psd.to_octave(psd_df, fstart=1, octave_bins=1, agg=agg) assert calc_df.index.to_numpy().tolist() == expt_f assert calc_df.to_numpy().flatten().tolist() == expt_array + + +@pytest.fixture +def df_test(): + # Build Time Array + time = np.arange(1000) / 1000 + + # Build Dataframe with Noise + df_accel = pd.DataFrame({ + 'time': np.concatenate((time, time + 1.0)), + 'A': np.random.random(2000) + }).set_index('time') + + return df_accel + + +@pytest.mark.filterwarnings("ignore:empty frequency bins:RuntimeWarning") +class TestRollingPSD: + + def test_using_spectrogram(self, df_test): + df_rolling_psd = psd.rolling_psd( + df_test, + num_slices=2, + add_resultant=False + ) + times = df_rolling_psd.timestamp.unique() + + np.testing.assert_almost_equal( + df_rolling_psd[df_rolling_psd.timestamp == times[0]].value.to_numpy(), + psd.welch(df_test[:1.0])['A'].to_numpy()) + np.testing.assert_almost_equal( + df_rolling_psd[df_rolling_psd.timestamp == times[1]].value.to_numpy(), + psd.welch(df_test[1.0:])['A'].to_numpy()) + + @pytest.mark.filterwarnings("ignore::UserWarning") + def test_defined_slices(self, df_test): + df_rolling_psd = psd.rolling_psd( + df_test, + index_values=[1.0, 1.5], + slice_width=0.5, + add_resultant=False, + ) + + # Do Assertions + np.testing.assert_almost_equal( + df_rolling_psd[df_rolling_psd.timestamp == 1.0].value.to_numpy(), + psd.welch(df_test.iloc[750:1250])['A'].to_numpy()) + np.testing.assert_almost_equal( + df_rolling_psd[df_rolling_psd.timestamp == 1.5].value.to_numpy(), + psd.welch(df_test.iloc[1250:1750])['A'].to_numpy()) \ No newline at end of file diff --git a/tests/calc/test_shock.py b/tests/calc/test_shock.py index 37cdcf2..cc7e348 100644 --- a/tests/calc/test_shock.py +++ b/tests/calc/test_shock.py @@ -486,3 +486,69 @@ def test_tuple_like(self): ampl, T = env_half_sine assert ampl == mock.sentinel.amplitude assert T == mock.sentinel.duration + +@pytest.fixture +def df_test(): + half = shock.HalfSineWavePulse( + amplitude=pd.Series([100, 50]), + duration=pd.Series([.1, .05]) + ) + + df_accel = pd.concat([ + half.to_time_series(tstart=0, tstop=1), + half.to_time_series(tstart=1, tstop=2), + ]) + df_accel.columns = ['A', 'B'] + + return df_accel + + +class TestRollingShockSpectrum: + + def test_even_slices(self, df_test): + df_rolling_srs = shock.rolling_shock_spectrum( + df_test, + num_slices=2, + add_resultant=False, + init_freq=2 + ) + + # Get the Individual SRS + first_srs = shock.shock_spectrum(df_test[:1.0], init_freq=2) + second_srs = shock.shock_spectrum(df_test[1.0:], init_freq=2) + + # Do Assertions + npt.assert_almost_equal( + df_rolling_srs[(df_rolling_srs.variable == 'A') & (df_rolling_srs.timestamp == 0.5)]['value'].to_numpy(), + first_srs['A'].to_numpy()) + npt.assert_almost_equal( + df_rolling_srs[(df_rolling_srs.variable == 'B') & (df_rolling_srs.timestamp == 0.5)]['value'].to_numpy(), + first_srs['B'].to_numpy()) + npt.assert_almost_equal( + df_rolling_srs[(df_rolling_srs.variable == 'A') & (df_rolling_srs.timestamp != 0.5)]['value'].to_numpy(), + second_srs['A'].to_numpy()) + npt.assert_almost_equal( + df_rolling_srs[(df_rolling_srs.variable == 'B') & (df_rolling_srs.timestamp != 0.5)]['value'].to_numpy(), + second_srs['B'].to_numpy()) + + def test_defined_slices(self, df_test): + df_rolling_srs = shock.rolling_shock_spectrum( + df_test, + index_values=[1.0], + bins_per_octave=6, + slice_width=0.5, + mode='pvss', + init_freq=4 + ) + + npt.assert_almost_equal( + df_rolling_srs[df_rolling_srs.variable == 'Resultant']['value'].to_numpy(), + shock.shock_spectrum( + df_test[0.75:1.2499], + bins_per_octave=6, + mode='pvss', + aggregate_axes=True, + init_freq=4 + )['resultant'].to_numpy() + ) + diff --git a/tests/calc/test_utils.py b/tests/calc/test_utils.py index 60b42af..e7ad432 100644 --- a/tests/calc/test_utils.py +++ b/tests/calc/test_utils.py @@ -67,3 +67,35 @@ def test_uniform_resample_is_uniform_for_datetime64ns(): times = np.arange('2021-02', '2021-03', np.timedelta64(1, 's'), dtype='datetime64[ns]') df = pd.DataFrame(np.arange(len(times)), index=times) assert len(np.unique(np.diff(utils.resample(df).index))) == 1 + + +def test_rolling_slice_definitions(): + # Build dataframe with 1 second of data + df = pd.DataFrame({ + 'time': np.arange(1000) / 1000, + 'A': np.ones(1000) + }).set_index('time') + + indexes, slice_width, num, length = utils._rolling_slice_definitions( + df=df, + num_slices=2 + ) + assert slice_width == 0.5 + + indexes, slice_width, num, length = utils._rolling_slice_definitions( + df=df, + index_values=[0.1, 0.9] + ) + assert indexes[1] == 900 + + df.index = pd.to_datetime(df.index, unit='s') + indexes, slice_width, num, length = utils._rolling_slice_definitions( + df=df, + ) + assert num == 5 + + indexes, slice_width, num, length = utils._rolling_slice_definitions( + df=df, + index_values=pd.DatetimeIndex(['1970-01-01 00:00:00.9951']) + ) + assert indexes[0] == 995 diff --git a/tests/plot/test_plots.py b/tests/plot/test_plots.py new file mode 100644 index 0000000..f87756c --- /dev/null +++ b/tests/plot/test_plots.py @@ -0,0 +1,63 @@ +import pytest + +import numpy as np +import pandas as pd + +from endaq import plot + + +@pytest.fixture(params=["float", "datetime"]) +def generate_dataframe(request): + freqs = np.arange(10) + times = np.zeros_like(freqs) + + df = pd.DataFrame({ + 'frequency (Hz)': np.concatenate((freqs, freqs, freqs)), + 'timestamp': np.concatenate((times, times + 30, times + 60)), + 'value': np.random.random(30) + }) + df['variable'] = 'X' + + if request.param == 'datetime': + df.timestamp = pd.to_datetime(df.timestamp, unit='D', utc=True) + + return df + + +class TestSpectrumOverTime: + + def test_heatmap(self, generate_dataframe): + fig = plot.spectrum_over_time(generate_dataframe, 'Heatmap') + assert fig['data'][0]['type'] == 'heatmap' + + def test_surface(self, generate_dataframe): + fig = plot.spectrum_over_time(generate_dataframe, 'Surface') + assert fig['data'][0]['type'] == 'surface' + + def test_waterfall(self, generate_dataframe): + fig = plot.spectrum_over_time(generate_dataframe, 'Waterfall') + assert fig['data'][0]['type'] == 'scatter3d' + + def test_animation(self, generate_dataframe): + fig = plot.spectrum_over_time(generate_dataframe, 'Animation') + assert len(fig['data']) == 4 + + def test_lines(self, generate_dataframe): + fig = plot.spectrum_over_time(generate_dataframe, 'Lines') + assert len(fig['data']) == 9 + + def test_peak(self, generate_dataframe): + fig = plot.spectrum_over_time(generate_dataframe, 'Peak') + assert fig['data'][0]['marker']['symbol'] == 'circle' + + +def test_octave_spectrogram(): + time = np.arange(1000) / 1000 + df_accel = pd.DataFrame({ + 'time': np.concatenate((time, time + 1.0, time + 2.0)), + 'A': np.random.random(3000) + }).set_index('time') + + df, fig = plot.octave_spectrogram(df_accel, window=0.1, max_freq=50) + assert fig['data'][0]['type'] == 'heatmap' +