Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add smoothing spline baseline #299

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 143 additions & 1 deletion lumicks/pylake/piezo_tracking/baseline.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,90 @@
import numpy as np
import matplotlib.pyplot as plt
import numpy as np
from lumicks.pylake.channel import Slice, Continuous
from csaps import csaps
from sklearn.model_selection import RepeatedKFold
from sklearn.metrics import mean_squared_error


def unique_sorted(trap_position, force):
"""Sort and remove duplicates trap_position data to prepare for fit smoothing spline.
Parameters
----------
trap_position : lumicks.pylake.Slice
Trap mirror position
force : lumicks.pylake.Slice
Force data

Returns
--------
x : np.array
sorted trap position data
force : np.array
sorted force data"""

x = trap_position.data
u, c = np.unique(x, return_counts=True)
m = np.isin(x, [u[c < 2]])
ind = np.argsort(x[m])

return x[m][ind], force.data[m][ind]
Comment on lines +19 to +24
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could consider simplifying this to:

def unique_sorted(trap_position, force):
    """Sort and remove duplicates trap_position data to prepare for fit smoothing spline.

    Parameters
    ----------
    trap_position : array_like
        Trap mirror position
    force : array_like
        Force data
    """

    sorted_position, idx, count = np.unique(trap_position, return_index=True, return_counts=True)
    return sorted_position[count < 2], force[idx][count < 2]

Saves you a search and a sort (note that the result from np.unique is already sorted).

Given that you return the raw numpy arrays rather than slices, I would also consider just taking raw numpy arrays as input and extracting the data where its called (rather than passing a slice).



def optimize_smoothing_factor(
trap_position,
force,
smoothing_factors,
n_repeats,
plot_smoothingfactor_mse,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Considering that the function has smoothing_factor in the name, I reckon plot_mse would be sufficient.

):
"""Find optimal smoothing factor by choosing smoothing factor with lowest mse on test data
Parameters
----------
trap_position : lumicks.pylake.Slice
Trap mirror position data
force : lumicks.pylake.Slice
Force data
smoothing_factors : np.array float
Array of smoothing factors used for optimization fit, 0 <= smoothing_factor <= 1
n_repeats: int
number of times to repeat cross validation
plot_smoothingfactor_mse: bool
plot mse on test data vs smoothing factors used for optimization
"""

mse_test_vals = np.zeros(len(smoothing_factors))
x_sorted, y_sorted = unique_sorted(trap_position, force)
Comment on lines +49 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I find a bit surprising is that you choose to pass trap position and force to this function, despite already having x_sorted and y_sorted. Is there a specific reason you prefer this?

for i, smooth in enumerate(smoothing_factors):
mse_test_array = np.zeros(n_repeats * 2)

rkf = RepeatedKFold(n_splits=2, n_repeats=n_repeats)
for k, (train_index, test_index) in enumerate(rkf.split(x_sorted)):
x_train, x_test = x_sorted[train_index], x_sorted[test_index]
y_train, y_test = y_sorted[train_index], y_sorted[test_index]

smoothing_result_train_1x = csaps(x_train, y_train, smooth=smooth)
f_test = smoothing_result_train_1x(x_test)
mse_test_array[k] = mean_squared_error(y_test, f_test)

mse_test_vals[i] = np.mean(mse_test_array)
if plot_smoothingfactor_mse:
plot_mse_smoothing_factors(smoothing_factors, mse_test_vals)

return smoothing_factors[np.argmin(mse_test_vals)]


def plot_mse_smoothing_factors(smoothing_factors, mse_test_vals):
plt.figure()
plt.plot(
np.log(1 - smoothing_factors),
mse_test_vals,
label=f"optimal s= {smoothing_factors[np.argmin(mse_test_vals)]:0.6f}",
)
plt.ylabel("mse test")
plt.xticks(np.log(1 - smoothing_factors), smoothing_factors)
plt.xlabel("smoothing factor")
plt.legend()
plt.show()


class ForceBaseLine:
Expand Down Expand Up @@ -77,4 +161,62 @@ def polynomial_baseline(cls, trap_position, force, degree=7, downsampling_factor
)

model = np.poly1d(np.polyfit(trap_position.data, force.data, deg=degree))
return cls(model, trap_position, force)\


@classmethod
def smoothingspline_baseline(
cls,
trap_position,
force,
smoothing_factor=None,
downsampling_factor=None,
smoothing_factors=np.array([0.9, 0.99, 0.999, 0.9999, 0.99999, 0.999999]),
n_repeats=10,
plot_smoothingfactor_mse=False,
):
"""Generate a smoothing spline baseline from data.
Items of xdata in smoothing spline must satisfy: x1 < x2 < ... < xN,
therefore the trap_position data is sorted and duplicates are removed
Parameters
----------
trap_position : lumicks.pylake.Slice
Trap mirror position data
force : lumicks.pylake.Slice
Force data
smoothing_factor : float
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether it would make sense to merge smoothing_factor and smoothing_factors.

If the input is just a value or has only one element, you use it as fixed value and otherwise you perform the optimization. You can use np.atleast_1d to upconvert a value to a numpy array so that you can always use the len operation on it. The default can then just be a default list that generally works.

Smoothing factor for smoothing spline, 0 <= smoothing_factor <= 1
downsampling_factor : int
Factor by which to downsample before baseline determination
smoothing_factors : np.array float
Array of smoothing factors used for optimization fit, 0 <= smoothing_factor <= 1
n_repeats: int
number of times to repeat cross validation
plot_smoothingfactor_mse: bool
plot mse on test data vs smoothing factors used for optimization
"""
if not np.array_equal(force.timestamps, trap_position.timestamps):
raise RuntimeError(
"Provided force and trap position timestamps should match"
)

if downsampling_factor:
trap_position, force = (
ch.downsampled_by(downsampling_factor) for ch in (trap_position, force)
)

x_sorted, y_sorted = unique_sorted(trap_position, force)

if smoothing_factor:
model = csaps(x_sorted, y_sorted, smooth=smoothing_factor)
else:
smoothing_factor = optimize_smoothing_factor(
trap_position,
force,
smoothing_factors=smoothing_factors,
n_repeats=n_repeats,
plot_smoothingfactor_mse=plot_smoothingfactor_mse,
)
model = csaps(x_sorted, y_sorted, smooth=smoothing_factor)

return cls(model, trap_position, force)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One issue with calling the constructor like this is that you don't actually put the data that you used for fitting in now (you fitted to x_sorted and y_sorted but store the original raw data).