From 0f11be57fd81d4f0110f11f0480c8d65acfe8841 Mon Sep 17 00:00:00 2001 From: Joep Vanlier Date: Mon, 9 Dec 2024 21:21:20 +0100 Subject: [PATCH] calibration: generalize slice recalibration --- lumicks/pylake/channel.py | 17 ++-- .../tests/test_channels/test_channels.py | 91 +++++++++++-------- 2 files changed, 60 insertions(+), 48 deletions(-) diff --git a/lumicks/pylake/channel.py b/lumicks/pylake/channel.py index 0e419ff48..51f73598e 100644 --- a/lumicks/pylake/channel.py +++ b/lumicks/pylake/channel.py @@ -277,23 +277,22 @@ def recalibrate_force(self, calibration): """ from lumicks.pylake.calibration import ForceCalibrationList - if len(self.calibration) > 1: - raise NotImplementedError( - "Slice contains multiple calibrations. Recalibration is only implemented for slices" - "that do not contain calibration events." - ) - if not self.calibration: raise RuntimeError( "Slice does not contain any calibration items. Is this an unprocessed force " "channel?" ) + # Grab sensitivities. If for whatever reason, we don't know part of the calibration, we + # can't reasonably recalibrate, so we return NaNs for those time points. + timestamps = self.timestamps + force_sensitivities = np.full(len(timestamps), np.nan) + for c in self.calibration: + force_sensitivities[timestamps >= c.applied_at] = c.force_sensitivity + return Slice( self._src._with_data( - self._unpack_other(self) - * calibration.force_sensitivity - / self.calibration[0].force_sensitivity + self._unpack_other(self) * calibration.force_sensitivity / force_sensitivities ), labels=self.labels, calibration=ForceCalibrationList(items=[calibration._with_timestamp(self.start)]), diff --git a/lumicks/pylake/tests/test_channels/test_channels.py b/lumicks/pylake/tests/test_channels/test_channels.py index c7a8fd141..8635fb5fd 100644 --- a/lumicks/pylake/tests/test_channels/test_channels.py +++ b/lumicks/pylake/tests/test_channels/test_channels.py @@ -1011,19 +1011,6 @@ def test_low_level_construction(): def test_recalibrate_force_wrong_number_of_calibrations(): - too_many = ForceCalibrationList._from_items( - items=[ - ForceCalibrationItem({"Calibration Data": 50, "Stop time (ns)": with_offset(50)}), - ForceCalibrationItem({"Calibration Data": 80, "Stop time (ns)": with_offset(80)}), - ], - ) - - cc = channel.Slice( - channel.Continuous(np.arange(100), int(with_offset(40)), 10), calibration=too_many - ) - with pytest.raises(NotImplementedError, match="Slice contains multiple calibrations"): - cc.recalibrate_force(None) - cc = channel.Slice( channel.Continuous(np.arange(100), int(with_offset(40)), 10), calibration=ForceCalibrationList([]), @@ -1032,36 +1019,38 @@ def test_recalibrate_force_wrong_number_of_calibrations(): cc.recalibrate_force(None) +def make_calibration_item(response, applied_at): + return ForceCalibrationItem( + { + "Calibration Data": 50, + "Stop time (ns)": with_offset(50), + "Timestamp (ns)": with_offset(applied_at), + "Response (pN/V)": response, + } + ) + + +def make_calibration_result(calibration_factor): + return psc.CalibrationResults( + model=None, + ps_model=None, + ps_data=None, + params={}, + results={ + "Rf": psc.CalibrationParameter("Rf", calibration_factor, "val"), + "kappa": psc.CalibrationParameter("kappa", 5, "val"), + }, + fitted_params=[], + ) + + def test_recalibrate_force(mock_datetime): slc = channel.Slice( channel.Continuous(np.arange(100), with_offset(40), 10), - calibration=ForceCalibrationList._from_items( - [ - ForceCalibrationItem( - { - "Calibration Data": 50, - "Stop time (ns)": with_offset(50), - "Response (pN/V)": 10, - } - ) - ], - ), + calibration=ForceCalibrationList._from_items([make_calibration_item(10, 40)]), ) - def make_item(calibration_factor): - return psc.CalibrationResults( - model=None, - ps_model=None, - ps_data=None, - params={}, - results={ - "Rf": psc.CalibrationParameter("Rf", calibration_factor, "val"), - "kappa": psc.CalibrationParameter("kappa", 5, "val"), - }, - fitted_params=[], - ) - - slc_recalibrated = slc.recalibrate_force(make_item(5)) + slc_recalibrated = slc.recalibrate_force(make_calibration_result(5)) np.testing.assert_allclose(slc.data, np.arange(100)) np.testing.assert_allclose(slc_recalibrated.data, np.arange(100) * 0.5) assert slc.calibration[0].force_sensitivity == 10 @@ -1076,8 +1065,32 @@ def make_item(calibration_factor): ) slc_recalibrated.calibration._repr_html_() - slc_recalibrate_twice = slc.recalibrate_force(make_item(10)) + slc_recalibrate_twice = slc.recalibrate_force(make_calibration_result(10)) np.testing.assert_allclose(slc_recalibrated.data, np.arange(100) * 0.5) np.testing.assert_allclose(slc_recalibrate_twice.data, np.arange(100)) assert slc_recalibrated.calibration[0].force_sensitivity == 5 assert slc_recalibrate_twice.calibration[0].force_sensitivity == 10 + + +@pytest.mark.parametrize( + "items, ref_forces", + [ + (((10, 40), (20, 60), (40, 80)), [10, 10, 5, 5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5]), + (((10, 40), (40, 60), (40, 80)), [10, 10, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5, 2.5]), + (((10, 40), (20, 60)), [10, 10, 5, 5, 5, 5, 5, 5, 5, 5]), + (((10, 40), (20, 61)), [10, 10, 10, 5, 5, 5, 5, 5, 5, 5]), + (((10, 40), (20, 50000)), [10, 10, 10, 10, 10, 10, 10, 10, 10, 10]), + # If there is no calibration at the start, we cannot meaningfully decalibrate, so we + # emit np.nan + ([(20, 60)], [np.nan, np.nan, 5, 5, 5, 5, 5, 5, 5, 5]), + ], +) +def test_recalibrate_force_multi(items, ref_forces): + slc = channel.Slice( + channel.Continuous(np.full(10, 10), with_offset(40), 10), + calibration=ForceCalibrationList._from_items( + [make_calibration_item(response, applied_at) for response, applied_at in items] + ), + ) + slc_recal = slc.recalibrate_force(make_calibration_result(10)) + np.testing.assert_equal(slc_recal.data, np.array(ref_forces))