Skip to content

Commit

Permalink
Add functions used to preprocess waveforms (#79)
Browse files Browse the repository at this point in the history
Co-authored-by: Mike Boyle <[email protected]>

Bump #minor version
  • Loading branch information
EthansGitHub authored May 21, 2024
1 parent fad2030 commit 560d3ab
Show file tree
Hide file tree
Showing 6 changed files with 839 additions and 23 deletions.
441 changes: 441 additions & 0 deletions docs/tutorials/05-PreprocessingForFFTs.ipynb

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ nav:
- "tutorials/01-Metadata.ipynb"
- "tutorials/03-Horizons.ipynb"
- "tutorials/04-Waveforms.ipynb"
- "tutorials/05-PreprocessingForFFTs.ipynb"
- API:
- "Load": "api/load.md"
- "Catalog": "api/catalog.md"
Expand Down
337 changes: 315 additions & 22 deletions sxs/time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def __array_finalize__(self, obj):
return
# # Since ndarray.__array_finalize__ is None, we skip this in its direct descendents:
# super().__array_finalize__(obj)
self._metadata = copy.copy(getattr(self, '_metadata', getattr(obj, '_metadata', {})))
self._metadata = copy.copy(getattr(self, "_metadata", getattr(obj, "_metadata", {})))
if "time" not in self._metadata:
self._metadata["time"] = None # Placeholder about to be updated

Expand Down Expand Up @@ -240,7 +240,7 @@ def normalize_basic_slicing_key(key_normalize, ndim):
if time_key is not None:
new_time = self.time[time_key]
metadata = self._metadata.copy()
metadata.update(**getattr(new_data, '_metadata', {}))
metadata.update(**getattr(new_data, "_metadata", {}))
metadata["time"] = new_time
metadata["time_axis"] = new_time_axis

Expand Down Expand Up @@ -619,37 +619,43 @@ def xor(self, reverse=False, preserve_dtype=False, **kwargs):
def truncate(self, abs_tolerance):
"""Truncate the precision of this object's `data` in place
This function sets bits in the array data to 0 when they have lower
significance than the number given as or returned by `abs_tolerance`. This is
a useful step in compressing data — though it is obviously lossy.
This function sets bits in the array data to 0 when they have
lower significance than the number given as or returned by
`abs_tolerance`. This is a useful step in compressing data —
though it is obviously lossy.
Parameters
----------
abs_tolerance : {callable, float, array-like}
If callable, it is called with this object as the parameter, and the
returned value is treated as a float or array-like would be. Floats are
simply treated as a uniform absolute tolerance to be applied at all times.
Array-like objects must broadcast against this array, and each element is
treated as the absolute tolerance for all the elements it broadcasts to.
If callable, it is called with this object as the
parameter, and the returned value is treated as a float or
array-like would be. Floats are simply treated as a
uniform absolute tolerance to be applied at all times.
Array-like objects must broadcast against this array, and
each element is treated as the absolute tolerance for all
the elements it broadcasts to.
Returns
-------
None
This value is returned to serve as a reminder that this function operates
in place.
This value is returned to serve as a reminder that this
function operates in place.
Notes
-----
The effect is achieved by multiplying the array's data by the same power of 2
that would be required to bring the `abs_tolerance` to between 1 and 2. Thus,
all digits less significant than 1 are less significant than `abs_tolerance` —
meaning that we can apply the standard `round` routine to set these digits to
0. We then divide by that same power of 2 to bring the array data back to
nearly its original value. By working with powers of 2, we ensure that the 0s
at the intermediate stage are represented as 0 bits in the final result.
For floats and array-like objects, all values must be strictly positive, or
`inf` or `nan` will result.
The effect is achieved by multiplying the array's data by the
same power of 2 that would be required to bring the
`abs_tolerance` to between 1 and 2. Thus, all digits less
significant than 1 are less significant than `abs_tolerance` —
meaning that we can apply the standard `round` routine to set
these digits to 0. We then divide by that same power of 2 to
bring the array data back to nearly its original value. By
working with powers of 2, we ensure that the 0s at the
intermediate stage are represented as 0 bits in the final
result.
For floats and array-like objects, all values must be strictly
positive, or `inf` or `nan` will result.
"""
if callable(abs_tolerance):
Expand All @@ -659,3 +665,290 @@ def truncate(self, abs_tolerance):
ndarray *= power_of_2
np.round(ndarray, out=ndarray)
ndarray /= power_of_2

def taper(self, t1, t2, y1=0, y2=1, *, transition_type="smooth"):
"""Smoothly taper (or transition) the data
This is essentially half of a standard window function, using
the C^∞ "compactified tanh" transition function. By default,
this function smoothly transitions the data from 0 before `t1`
to the input data after `t2`, which is useful to prevent Gibbs
effects caused by a sudden "turn on" of a signal. By swapping
the values of `y1` and `y2`, the transition can be reversed.
Parameters
----------
t1 : float
Time before which the output will equal `y1` times the
data.
t2 : float
Time after which the output will equal `y2` times the
data.
y1 : float, optional
Value before `t1`. Default value is 0.
y2 : float, optional
Value after `t2`. Default value is 1.
transition_type : str, optional
Type of transition to apply. Default value is `"smooth"`,
which corresponds to the C^∞ bump (compactified tanh)
function. The other option is `"cosine"`, which
corresponds to the Tukey (cosine-taper) function.
Returns
-------
TimeSeries
New object with transitioned data.
See Also
--------
transition_to_constant : Smoothly transition to a constant
value
sxs.utilities.transition_function : Function that does the
work
"""
from .utilities import transition_function
t2 = t2 or self.time[-1]
if transition_type == "smooth":
τ = transition_function(self.time, t1, t2, y1, y2)
elif transition_type == "cosine":
τ = np.empty_like(self.time)
τ[self.time <= t1] = y1
τ[(self.time > t1) & (self.time < t2)] = (y1 + y2) / 2 - (y2 - y1) / 2 * np.cos(np.pi * (self.time[(self.time > t1) & (self.time < t2)] - t1) / (t2 - t1))
τ[self.time >= t2] = y2
else:
raise ValueError(f"Unknown transition type {transition_type=}")
data = np.moveaxis(self.ndarray.copy(), self.time_axis, -1)
data = data * τ
data = np.moveaxis(data, -1, self.time_axis)
return type(self)(data, **self._metadata.copy())

def transition_to_constant(self, t1, t2=None):
"""Smoothly transition from the data to a constant
This function produces a copy of the input, where the data is
smoothly transitioned from the value at `t1` to a constant
value at `t2` (which is the last time step if not given). The
precise value of this constant will depend on the behavior of
the data between `t1` and `t2`.
Parameters
----------
t1 : float
Time after which the output will equal the input data.
t2 : float, optional
Time after which the output will be constant. Default
value is the last time step.
Returns
-------
TimeSeries
New object with transitioned data.
See Also
--------
taper : Smoothly transition the data
sxs.utilities.transition_to_constant : Function that does
the work
"""
from .utilities import transition_to_constant_inplace
t2 = t2 or self.time[-1]
if self.time_axis != 0:
data = np.moveaxis(self.ndarray.copy(), self.time_axis, 0)
else:
data = self.ndarray.copy()
data = transition_to_constant_inplace(data, self.time, t1, t2)
if self.time_axis != 0:
data = np.moveaxis(data, 0, self.time_axis)
return type(self)(data, **self._metadata.copy())

def window(self, t1, t2, t3, t4, y1=0, y23=1, y4=None, *, window_type="smooth"):
"""Window the data smoothly
This function creates a copy of the input and, by default,
zeroes it outside of the times `t1` and `t4` while reproducing
it precisely between `t2` and `t3`.
Parameters
----------
t1 : float
Time before which the output will be multiplied by `y1`.
t2 : float
Time after which the output will be multiplied by `y23`.
t3 : float
Time before which the output will be multiplied by `y23`.
t4 : float
Time after which the output will be multiplied by `y4`.
y1 : float, optional
Multiplicative value before `t1`. Default value is 0.
y23 : float, optional
Multiplicative value between `t2` and `t3`. Default value
is 1.
y4 : float, optional
Multiplicative value after `t4`. Default value is `y1`.
window_type : str, optional
Type of window to apply. Default value is `"smooth"`,
which corresponds to the C^∞ bump (compactified tanh)
function. The other option is `"cosine"`, which
corresponds to the Tukey (cosine-tapered) window.
Returns
-------
TimeSeries
New object with windowed data.
See Also
--------
sxs.utilities.bump_function : Function that does the work
"""
from .utilities import bump_function
y4 = y4 or y1
if window_type == "smooth":
τ = bump_function(self.time, t1, t2, t3, t4, y1, y23, y4)
elif window_type == "cosine":
t = self.time
τ = np.empty_like(t)
τ[t <= t1] = y1
τ[(t > t1) & (t < t2)] = (y1 + y23) / 2 - (y23 - y1) / 2 * np.cos(np.pi * (t[(t > t1) & (t < t2)] - t1) / (t2 - t1))
τ[(t >= t2) & (t <= t3)] = y23
τ[(t > t3) & (t < t4)] = (y23 + y4) / 2 - (y4 - y23) / 2 * np.cos(np.pi * (t[(t > t3) & (t < t4)] - t3) / (t4 - t3))
τ[t >= t4] = y4
else:
raise ValueError(f"Unknown window type {window_type=}")
data = np.moveaxis(self.ndarray.copy(), self.time_axis, -1)
data = data * τ
data = np.moveaxis(data, -1, self.time_axis)
return type(self)(data, **self._metadata.copy())

def pad(self, pad_length=None, mode="edge", **kwargs):
"""Pad the data values along the time axis
This function is based on `numpy.pad`, but the `pad_length`
argument is given in units of time, rather than numbers of
elements, and the `mode` argument defaults to `"edge"`.
As with `numpy.pad`, the `pad_length` argument may be a single
number, a tuple with one element, or a tuple with two
elements. For just a single number or tuple with one element,
the padding is applied symmetrically to both ends of the data.
Unlike `numpy.pad`, there may only be two elements of this
tuple; padding other dimensions is not allowed.
Parameters
----------
pad_length : {None, float, tuple}
Amount of time to pad on both sides (for a single float)
or the beginning and end (for a tuple) of the data. By
default, the full length of the input data is added to
both the beginning and end of the data; that is, the
length of the data is tripled.
mode : str, optional
Padding mode. Default value is `"edge"`.
kwargs : dict
Additional keyword arguments to pass to `numpy.pad`.
Returns
-------
TimeSeries
New object with padded data.
See Also
--------
numpy.pad : Similar function with padding in number of
elements, rather than time, and a different default
`mode`.
"""
if pad_length is None:
pad_length = self.time[-1] - self.time[0]
pad_length = np.asarray(pad_length)
if pad_length.shape == ():
pad_length = np.array([pad_length, pad_length])
elif pad_length.shape == (1,):
pad_length = np.array([pad_length[0], pad_length[0]])
elif pad_length.shape != (2,):
raise ValueError(
f"Input `pad_length` must be a scalar, a tuple with one element, "
f"or a tuple with two elements; it has shape {pad_length.shape}."
)
dt0 = self.time[1] - self.time[0]
dt1 = self.time[-1] - self.time[-2]
pad_width0 = int(np.ceil(pad_length[0] / dt0))
pad_width1 = int(np.ceil(pad_length[1] / dt1))
pad_width = list(
(0, 0) if self.time_axis != i else (pad_width0, pad_width1)
for i in range(self.ndarray.ndim)
)
data = np.pad(self.ndarray, pad_width, mode, **kwargs)
metadata = self._metadata.copy()
metadata["time"] = np.concatenate(
(
self.time[0] - np.arange(pad_width0, 0, -1) * dt0,
self.time,
self.time[-1] + np.arange(1, pad_width1+1) * dt1
)
)
return type(self)(data, **metadata)

def line_subtraction(self, treat_as_zero="begin"):
"""Subtract a linear function of time from the data
This is very similar to the `detrend` function found in the
signal-processing literature and in SciPy and MATLAB, for
example, except that rather than fitting a line to all of the
data (which is more appropriate for noisy data), this function
subtracts the line connecting the two ends (first and last
time steps) of the data. This ensures that there is no
discontinuity in the data at the boundaries, which is not
guaranteed by the usual `detrend` functions.
Parameters
----------
treat_as_zero : str, optional
Whether to treat the data at the boundaries as zero.
Default value is `"begin"`, meaning that we pretend that
the data at the beginning of the time array is oscillating
around zero, so we can treat it as if it *is* zero. If
the early data has already been tapered, it will probably
already be zero, so this shouldn't matter. In that case,
`"neither"` would also be correct. The option `"end"` is
also accepted, but this will be less likely to be needed.
Returns
-------
TimeSeries
New object with the linear function of time subtracted.
Notes
-----
There is a minor subtlety in the implementation of this
function. It is not quite enough to simply subtract the line
connecting the first and last data points. The discrete
Fourier transform implicitly assumes that the data is
periodic, so that the line should connect the first data point
to *one beyond the last data point*. It is not generally
clear how to define that, but assuming that the data has
roughly settled down to be roughly constant by the end, we can
say that the line should connect `(time[0], data[0])` to
`(time[-1]+dt, data[-1])`, where `dt` is the time-step size.
This is the approach taken here. Even this is only
approximate, but typically this should be such a tiny effect
as to not matter at all, though it could improve results for
very short or highly changing time series.
"""
N = self.n_times
data = np.moveaxis(self.ndarray.copy(), self.time_axis, -1)
data_begin = 0 if treat_as_zero == "begin" else data[..., [0]]
data_end = 0 if treat_as_zero == "end" else data[..., [-1]]
line = (
data_begin
+ (data_end - data_begin)
* ((self.time - self.time[0]) / (self.time[-1] - self.time[0]))
* ((N - 1) / N)
)
data -= line
data = np.moveaxis(data, -1, self.time_axis)
return type(self)(data, **self._metadata.copy())
6 changes: 6 additions & 0 deletions sxs/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
md5checksum, lock_file_manager, find_simulation_directories, find_files
)
from .dicts import KeyPassingDict
from .smooth_functions import (
transition_function, transition_function_inplace,
transition_function_derivative, transition_function_derivative_inplace,
transition_to_constant, transition_to_constant_inplace,
bump_function, bump_function_inplace
)


def version_info():
Expand Down
Loading

0 comments on commit 560d3ab

Please sign in to comment.