Skip to content

Commit

Permalink
focal: improve uniformity API
Browse files Browse the repository at this point in the history
  • Loading branch information
JoepVanlier committed Dec 2, 2024
1 parent 009fa10 commit f11ee6c
Show file tree
Hide file tree
Showing 9 changed files with 107 additions and 101 deletions.
56 changes: 27 additions & 29 deletions lumicks/pylake/calibration.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
from lumicks.pylake.force_calibration.calibration_item import ForceCalibrationItem


def _filter_calibration(time_field, items, start, stop):
def _filter_calibration(items, start, stop):
"""filter calibration data based on time stamp range [ns]"""
if len(items) == 0:
return []

def timestamp(x):
return x[time_field]
return x.stop if x.stop else x.applied_at # Pylake items do not have a start and stop (yet)

items = sorted(items, key=timestamp)

Expand All @@ -38,25 +38,22 @@ class ForceCalibrationList:
calibration = f.force1x.calibration[1] # Grab a calibration item for force 1x
"""

def __init__(self, time_field, items, slice_start=None, slice_stop=None):
def __init__(self, items, slice_start=None, slice_stop=None):
"""Calibration item
Parameters
----------
time_field : string
name of the field used for time
items : list[ForceCalibrationItem]
list of force calibration items
slice_start, slice_stop : int
Start and stop index of the slice associated with these items
"""
self._time_field = time_field
self._src = items
self._slice_start = slice_start
self._slice_stop = slice_stop

def _with_src(self, _src):
return ForceCalibrationList(self._time_field, _src)
return ForceCalibrationList(_src)

def __getitem__(self, item):
if isinstance(item, slice):
Expand Down Expand Up @@ -86,14 +83,13 @@ def filter_calibration(self, start, stop):
stop : int
time stamp at stop [ns]"""
return ForceCalibrationList(
self._time_field,
_filter_calibration(self._time_field, self._src, start, stop),
_filter_calibration(self._src, start, stop),
start,
stop,
)

@staticmethod
def from_field(hdf5, force_channel, time_field="Stop time (ns)") -> "ForceCalibrationList":
def from_field(hdf5, force_channel) -> "ForceCalibrationList":
"""Fetch force calibration data from the HDF5 file
Parameters
Expand All @@ -102,23 +98,25 @@ def from_field(hdf5, force_channel, time_field="Stop time (ns)") -> "ForceCalibr
A Bluelake HDF5 file.
force_channel : str
Calibration field to access (e.g. "Force 1x").
time_field : str
Attribute which holds the timestamp of the item (e.g. "Stop time (ns)").
"""

if "Calibration" not in hdf5.keys():
return ForceCalibrationList(time_field=time_field, items=[])
return ForceCalibrationList(items=[])

items = []
for calibration_item in hdf5["Calibration"].values():
if force_channel in calibration_item:
attrs = dict(calibration_item[force_channel].attrs)
if time_field in attrs.keys():
# Copy the timestamp at which the calibration was applied into the item
attrs["Timestamp (ns)"] = calibration_item.attrs.get("Timestamp (ns)")
items.append(ForceCalibrationItem(attrs))

return ForceCalibrationList(time_field=time_field, items=items)
# Copy the timestamp at which the calibration was applied into the item
attrs["Timestamp (ns)"] = calibration_item.attrs.get("Timestamp (ns)")
items.append(ForceCalibrationItem(attrs))

return ForceCalibrationList._from_items(items)

@staticmethod
def _from_items(items: list[ForceCalibrationItem]):
return ForceCalibrationList(items=items)

def _print_summary(self, tablefmt):
def format_timestamp(timestamp):
Expand All @@ -139,11 +137,15 @@ def format_timestamp(timestamp):
),
item.hydrodynamically_correct,
item.distance_to_surface is not None,
bool(
self._slice_start
and (item.start >= self._slice_start)
and self._slice_stop
and (item.stop <= self._slice_stop)
(
bool(
self._slice_start
and (item.start >= self._slice_start)
and self._slice_stop
and (item.stop <= self._slice_stop)
)
if item.start and item.stop
else False
),
)
for idx, item in enumerate(self._src)
Expand All @@ -169,7 +171,7 @@ def __str__(self):
return self._print_summary(tablefmt="text")

@staticmethod
def from_dataset(hdf5, n, xy, time_field="Stop time (ns)") -> "ForceCalibrationList":
def from_dataset(hdf5, n, xy) -> "ForceCalibrationList":
"""Fetch the force calibration data from the HDF5 file
Parameters
Expand All @@ -180,14 +182,10 @@ def from_dataset(hdf5, n, xy, time_field="Stop time (ns)") -> "ForceCalibrationL
Trap index.
xy : str
Force axis (e.g. "x").
time_field : str
Attribute which holds the timestamp of the item (e.g. "Stop time (ns)").
"""

if xy:
return ForceCalibrationList.from_field(
hdf5, force_channel=f"Force {n}{xy}", time_field=time_field
)
return ForceCalibrationList.from_field(hdf5, force_channel=f"Force {n}{xy}")
else:
raise NotImplementedError(
"Calibration is currently only implemented for single axis data"
Expand Down
30 changes: 0 additions & 30 deletions lumicks/pylake/force_calibration/calibration_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ def _get_parameter(self, _, bluelake_key):
if bluelake_key in self:
return self[bluelake_key]

@property
def applied_at(self):
"""Time the calibration was applied in nanoseconds since epoch"""
return self.data.get("Timestamp (ns)")

@property
def _fitted_diode(self):
"""Diode parameters were fitted"""
Expand Down Expand Up @@ -241,28 +236,3 @@ def num_points_per_block(self):
"""Number of points per block used for spectral down-sampling"""
if "Points per block" in self.data:
return int(self.data["Points per block"]) # BL returns float which API doesn't accept

@property
def start(self):
"""Starting timestamp of this calibration
Examples
--------
::
import lumicks.pylake as lk
f = lk.File("file.h5")
item = f.force1x.calibration[1] # Grab a calibration item for force 1x
# Slice the data corresponding to this item
calibration_data = f.force1x[item.start : item.stop]
# or alternatively:
calibration_data = f.force1x[item]
"""
return self.data.get("Start time (ns)")

@property
def stop(self):
return self.data.get("Stop time (ns)")
9 changes: 9 additions & 0 deletions lumicks/pylake/force_calibration/calibration_results.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import copy

import numpy as np

from lumicks.pylake.force_calibration.detail.calibration_properties import (
Expand Down Expand Up @@ -48,6 +50,13 @@ def __call__(self, frequency):
"""
return self.model(frequency, *self.fitted_params)

def _with_timestamp(self, applied_timestamp):
"""Return a copy of this item with a timestamp of when it was applied"""
item = copy.copy(self)
item.params = copy.deepcopy(self.params)
item.params["Timestamp (ns)"] = applied_timestamp
return item

def __contains__(self, key):
return key in self.params or key in self.results

Expand Down
31 changes: 31 additions & 0 deletions lumicks/pylake/force_calibration/detail/calibration_properties.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,34 @@ def kind(self):
return "Active" if self.active_calibration else "Passive"
else:
return kind if kind is not None else "Unknown"

@property
def applied_at(self):
"""Time the calibration was applied in nanoseconds since epoch"""
return self._get_parameter("Timestamp", "Timestamp (ns)")

@property
def start(self):
"""Starting timestamp of this calibration
Examples
--------
::
import lumicks.pylake as lk
f = lk.File("file.h5")
item = f.force1x.calibration[1] # Grab a calibration item for force 1x
# Slice the data corresponding to this item
calibration_data = f.force1x[item.start : item.stop]
# or alternatively:
calibration_data = f.force1x[item]
"""
return self._get_parameter("Start time", "Start time (ns)")

@property
def stop(self):
"""Stop time stored in the calibration item"""
return self._get_parameter("Stop time", "Stop time (ns)")
10 changes: 4 additions & 6 deletions lumicks/pylake/force_calibration/tests/test_calibration_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,10 @@ def create_item(t_start, t_stop, **kwargs):
create_item(11, 12),
create_item(15, 25),
]
items = ForceCalibrationList("Stop time (ns)", fcs)
same_items = ForceCalibrationList("Stop time (ns)", fcs2)
different_item = ForceCalibrationList(
"Stop time (ns)", fcs2[:-1] + [create_item(15, 25, extra=5)]
)
shorter_list = ForceCalibrationList("Stop time (ns)", fcs[:-1])
items = ForceCalibrationList(fcs)
same_items = ForceCalibrationList(fcs2)
different_item = ForceCalibrationList(fcs2[:-1] + [create_item(15, 25, extra=5)])
shorter_list = ForceCalibrationList(fcs[:-1])

assert len(items) == 3
assert items == same_items
Expand Down
2 changes: 1 addition & 1 deletion lumicks/pylake/piezo_tracking/piezo_tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def piezo_track(self, trap_position, force1, force2, downsampling_factor=None):

trap_trap_dist = self.trap_calibration(trap_position)
bead_displacements = 1e-3 * sum(
sign * force / force.calibration[0]["kappa (pN/nm)"]
sign * force / force.calibration[0].stiffness
for force, sign in zip((force1, force2), self._signs)
)

Expand Down
18 changes: 10 additions & 8 deletions lumicks/pylake/piezo_tracking/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pytest

from lumicks.pylake.channel import Slice, Continuous, TimeSeries
from lumicks.pylake.calibration import ForceCalibrationList
from lumicks.pylake.calibration import ForceCalibrationItem, ForceCalibrationList
from lumicks.pylake.fitting.models import ewlc_odijk_force


Expand Down Expand Up @@ -77,13 +77,13 @@ def piezo_tracking_test_data(poly_baseline_data, camera_calibration_data):
If we assume that the baseline force leads to a real displacement, then our function for the
trap position becomes implicit, since the displacement depends on the baseline which in turn
depends on the trap position:
trap_trap_distance = tether_length + 2 * bead_radius + 2 * displacement_um
And displacement_um is given by (wlc_force + baseline(trap_position)) / stiffness
So we solve the following to obtain the trap position:
displacement = 2 * (wlc_force + baseline(trap_position)) / stiffness
0 = tether_length + 2 * bead_radius + displacement - (trap_position - trap2_ref)
"""
Expand All @@ -95,8 +95,10 @@ def piezo_tracking_test_data(poly_baseline_data, camera_calibration_data):

def implicit_trap_position_equation(x):
trap_trap_dist = x - trap2_ref
displacement = 2 * (force + baseline(x)) / stiffness_um
return (tether_dist + 2 * bead_radius + displacement - trap_trap_dist) ** 2
displacement = 2 * (force + baseline(x)) / stiffness_um # noqa: B023
return (
tether_dist + 2 * bead_radius + displacement - trap_trap_dist # noqa: B023
) ** 2

trap_position.append(minimize_scalar(implicit_trap_position_equation, [12.95, 13.35]).x)

Expand All @@ -105,8 +107,8 @@ def implicit_trap_position_equation(x):
# Add our baseline force (assumption is that the baseline force leads to a real displacement)
force_pn = wlc_force + baseline(trap_position.data)

calibration = ForceCalibrationList(
"Stop time (ns)", [{"Stop time (ns)": 1, "kappa (pN/nm)": stiffness}]
calibration = ForceCalibrationList._from_items(
[ForceCalibrationItem({"Stop time (ns)": 1, "kappa (pN/nm)": stiffness})]
)

force_1x = Slice(Continuous(force_pn, 0, dt), calibration=calibration)
Expand Down
8 changes: 4 additions & 4 deletions lumicks/pylake/tests/test_channels/test_arithmetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import pytest

from lumicks.pylake.channel import Slice, TimeTags, Continuous, TimeSeries
from lumicks.pylake.calibration import ForceCalibrationList
from lumicks.pylake.calibration import ForceCalibrationItem, ForceCalibrationList

start = 1 + int(1e18)
calibration = ForceCalibrationList(
"Stop time (ns)", [{"Stop time (ns)": start, "kappa (pN/nm)": 0.45}]
calibration = ForceCalibrationList._from_items(
[ForceCalibrationItem({"Stop time (ns)": start, "kappa (pN/nm)": 0.45})]
)
time_series = np.array([1, 2, 3, 4, 5], dtype=np.int64) + int(1e18)
slice_continuous_1 = Slice(Continuous([1, 2, 3, 4, 5], start=start, dt=1), calibration=calibration)
Expand Down Expand Up @@ -181,7 +181,7 @@ def test_negation(channel_slice):

def test_negation_timetags_not_implemented():
with pytest.raises(NotImplementedError):
negated_timetags = -timetags
_ = -timetags


def test_labels_slices():
Expand Down
Loading

0 comments on commit f11ee6c

Please sign in to comment.