Skip to content

Commit

Permalink
calibration: generalize slice recalibration
Browse files Browse the repository at this point in the history
  • Loading branch information
JoepVanlier committed Dec 17, 2024
1 parent 72b6885 commit 0f11be5
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 48 deletions.
17 changes: 8 additions & 9 deletions lumicks/pylake/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,22 @@ def recalibrate_force(self, calibration):
"""
from lumicks.pylake.calibration import ForceCalibrationList

if len(self.calibration) > 1:
raise NotImplementedError(
"Slice contains multiple calibrations. Recalibration is only implemented for slices"
"that do not contain calibration events."
)

if not self.calibration:
raise RuntimeError(
"Slice does not contain any calibration items. Is this an unprocessed force "
"channel?"
)

# Grab sensitivities. If for whatever reason, we don't know part of the calibration, we
# can't reasonably recalibrate, so we return NaNs for those time points.
timestamps = self.timestamps
force_sensitivities = np.full(len(timestamps), np.nan)
for c in self.calibration:
force_sensitivities[timestamps >= c.applied_at] = c.force_sensitivity

return Slice(
self._src._with_data(
self._unpack_other(self)
* calibration.force_sensitivity
/ self.calibration[0].force_sensitivity
self._unpack_other(self) * calibration.force_sensitivity / force_sensitivities
),
labels=self.labels,
calibration=ForceCalibrationList(items=[calibration._with_timestamp(self.start)]),
Expand Down
91 changes: 52 additions & 39 deletions lumicks/pylake/tests/test_channels/test_channels.py
Original file line number Diff line number Diff line change
Expand Up @@ -1011,19 +1011,6 @@ def test_low_level_construction():


def test_recalibrate_force_wrong_number_of_calibrations():
too_many = ForceCalibrationList._from_items(
items=[
ForceCalibrationItem({"Calibration Data": 50, "Stop time (ns)": with_offset(50)}),
ForceCalibrationItem({"Calibration Data": 80, "Stop time (ns)": with_offset(80)}),
],
)

cc = channel.Slice(
channel.Continuous(np.arange(100), int(with_offset(40)), 10), calibration=too_many
)
with pytest.raises(NotImplementedError, match="Slice contains multiple calibrations"):
cc.recalibrate_force(None)

cc = channel.Slice(
channel.Continuous(np.arange(100), int(with_offset(40)), 10),
calibration=ForceCalibrationList([]),
Expand All @@ -1032,36 +1019,38 @@ def test_recalibrate_force_wrong_number_of_calibrations():
cc.recalibrate_force(None)


def make_calibration_item(response, applied_at):
return ForceCalibrationItem(
{
"Calibration Data": 50,
"Stop time (ns)": with_offset(50),
"Timestamp (ns)": with_offset(applied_at),
"Response (pN/V)": response,
}
)


def make_calibration_result(calibration_factor):
return psc.CalibrationResults(
model=None,
ps_model=None,
ps_data=None,
params={},
results={
"Rf": psc.CalibrationParameter("Rf", calibration_factor, "val"),
"kappa": psc.CalibrationParameter("kappa", 5, "val"),
},
fitted_params=[],
)


def test_recalibrate_force(mock_datetime):
slc = channel.Slice(
channel.Continuous(np.arange(100), with_offset(40), 10),
calibration=ForceCalibrationList._from_items(
[
ForceCalibrationItem(
{
"Calibration Data": 50,
"Stop time (ns)": with_offset(50),
"Response (pN/V)": 10,
}
)
],
),
calibration=ForceCalibrationList._from_items([make_calibration_item(10, 40)]),
)

def make_item(calibration_factor):
return psc.CalibrationResults(
model=None,
ps_model=None,
ps_data=None,
params={},
results={
"Rf": psc.CalibrationParameter("Rf", calibration_factor, "val"),
"kappa": psc.CalibrationParameter("kappa", 5, "val"),
},
fitted_params=[],
)

slc_recalibrated = slc.recalibrate_force(make_item(5))
slc_recalibrated = slc.recalibrate_force(make_calibration_result(5))
np.testing.assert_allclose(slc.data, np.arange(100))
np.testing.assert_allclose(slc_recalibrated.data, np.arange(100) * 0.5)
assert slc.calibration[0].force_sensitivity == 10
Expand All @@ -1076,8 +1065,32 @@ def make_item(calibration_factor):
)
slc_recalibrated.calibration._repr_html_()

slc_recalibrate_twice = slc.recalibrate_force(make_item(10))
slc_recalibrate_twice = slc.recalibrate_force(make_calibration_result(10))
np.testing.assert_allclose(slc_recalibrated.data, np.arange(100) * 0.5)
np.testing.assert_allclose(slc_recalibrate_twice.data, np.arange(100))
assert slc_recalibrated.calibration[0].force_sensitivity == 5
assert slc_recalibrate_twice.calibration[0].force_sensitivity == 10


@pytest.mark.parametrize(
"items, ref_forces",
[
(((10, 40), (20, 60), (40, 80)), [10, 10, 5, 5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5]),
(((10, 40), (40, 60), (40, 80)), [10, 10, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5]),
(((10, 40), (20, 60)), [10, 10, 5, 5, 5, 5, 5, 5, 5, 5]),
(((10, 40), (20, 61)), [10, 10, 10, 5, 5, 5, 5, 5, 5, 5]),
(((10, 40), (20, 50000)), [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]),
# If there is no calibration at the start, we cannot meaningfully decalibrate, so we
# emit np.nan
([(20, 60)], [np.nan, np.nan, 5, 5, 5, 5, 5, 5, 5, 5]),
],
)
def test_recalibrate_force_multi(items, ref_forces):
slc = channel.Slice(
channel.Continuous(np.full(10, 10), with_offset(40), 10),
calibration=ForceCalibrationList._from_items(
[make_calibration_item(response, applied_at) for response, applied_at in items]
),
)
slc_recal = slc.recalibrate_force(make_calibration_result(10))
np.testing.assert_equal(slc_recal.data, np.array(ref_forces))

0 comments on commit 0f11be5

Please sign in to comment.