Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shared-data, api): add absorbance plate reader definition and module control #15167

Merged
merged 20 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/src/opentrons/drivers/absorbance_reader/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .abstract import AbstractAbsorbanceReaderDriver
from .driver import AbsorbanceReaderDriver
from .simulator import SimulatingDriver
from .hid_protocol import AbsorbanceHidInterface

__all__ = [
"AbstractAbsorbanceReaderDriver",
"AbsorbanceReaderDriver",
"SimulatingDriver",
"AbsorbanceHidInterface",
]
45 changes: 45 additions & 0 deletions api/src/opentrons/drivers/absorbance_reader/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from abc import ABC, abstractmethod
from typing import Dict, List
from opentrons.drivers.types import AbsorbanceReaderLidStatus


class AbstractAbsorbanceReaderDriver(ABC):
@abstractmethod
async def connect(self) -> None:
"""Connect to absorbance reader"""
...

@abstractmethod
async def disconnect(self) -> None:
"""Disconnect from absorbance reader"""
...

@abstractmethod
async def is_connected(self) -> bool:
"""Check connection to absorbance reader"""
...

@abstractmethod
async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
...

@abstractmethod
async def get_available_wavelengths(self) -> List[int]:
...

@abstractmethod
async def get_single_measurement(self, wavelength: int) -> List[float]:
...

@abstractmethod
async def initialize_measurement(self, wavelength: int) -> None:
...

@abstractmethod
async def get_status(self) -> None:
...

@abstractmethod
async def get_device_info(self) -> Dict[str, str]:
"""Get device info"""
...
274 changes: 274 additions & 0 deletions api/src/opentrons/drivers/absorbance_reader/async_byonoy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
from __future__ import annotations

import asyncio
import re
import subprocess
from concurrent.futures.thread import ThreadPoolExecutor
from functools import partial
from typing import Optional, List, Dict


from .hid_protocol import AbsorbanceHidInterface as AbsProtocol
from opentrons.drivers.types import (
AbsorbanceReaderLidStatus,
AbsorbanceReaderPlatePresence,
)


SN_PARSER = re.compile(r'ATTRS{serial}=="(?P<serial>.+?)"')


class AsyncByonoy:
"""Async wrapper around Byonoy Device Library."""

@staticmethod
def match_device_with_sn(
sn: str, devices: List[AbsProtocol.Device]
) -> AbsProtocol.Device:
for device in devices:
if device.sn == sn:
return device
raise RuntimeError(f"Unavailble module with serial number: {sn}")

@staticmethod
def serial_number_from_port(port: str) -> str:
"""
Get the serial number from a port using udevadm.

We need to walk up the chain of parent devices to look for the first
serial number value because the hid interface doesn't provide it.
"""
output = subprocess.check_output(
f"udevadm info --name {port} --attribute-walk | grep serial -m1", shell=True
).decode()
m = SN_PARSER.search(output)
if m:
return m.group("serial")
raise RuntimeError(f"Could not find serial number for port: {port}")

@classmethod
async def create(
cls,
port: str,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> AsyncByonoy:
"""
Create an AsyncByonoy instance.

Args:
port: url or port name
baud_rate: the baud rate
timeout: optional timeout in seconds
write_timeout: optional write timeout in seconds
loop: optional event loop. if None get_running_loop will be used
reset_buffer_before_write: reset the serial input buffer before
writing to it
"""
loop = loop or asyncio.get_running_loop()
executor = ThreadPoolExecutor(max_workers=1)

import pybyonoy_device_library as byonoy # type: ignore[import-not-found]

interface: AbsProtocol = byonoy

device_sn = cls.serial_number_from_port(port)
found: List[AbsProtocol.Device] = await loop.run_in_executor(
executor=executor, func=byonoy.byonoy_available_devices
)
device = cls.match_device_with_sn(device_sn, found)

return cls(
interface=interface,
device=device,
executor=executor,
loop=loop,
)

def __init__(
self,
interface: AbsProtocol,
device: AbsProtocol.Device,
executor: ThreadPoolExecutor,
loop: asyncio.AbstractEventLoop,
) -> None:
"""
Constructor

Args:
serial: connected Serial object
executor: a thread pool executor
loop: event loop
"""
self._interface = interface
self._device = device
self._executor = executor
self._loop = loop
self._supported_wavelengths: Optional[list[int]] = None
self._device_handle: Optional[int] = None
self._current_config: Optional[AbsProtocol.MeasurementConfig] = None

def _cleanup(self) -> None:
self._device_handle = None

def _open(self) -> None:
err, device_handle = self._interface.byonoy_open_device(self._device)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error opening device: {err}")
self._device_handle = device_handle

def _free(self) -> None:
if self._device_handle:
self._interface.byonoy_free_device(self._device_handle)
self._cleanup()

def verify_device_handle(self) -> int:
assert self._device_handle is not None, RuntimeError(
"Device handle not set up."
)
return self._device_handle

def _get_device_information(self) -> AbsProtocol.DeviceInfo:
handle = self.verify_device_handle()
err, device_info = self._interface.byonoy_get_device_information(handle)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error getting device information: {err}")
return device_info

def _get_device_status(self) -> AbsProtocol.DeviceState:
handle = self.verify_device_handle()
err, status = self._interface.byonoy_get_device_status(handle)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error getting device status: {err}")
return status

def _get_slot_status(self) -> AbsProtocol.SlotState:
handle = self.verify_device_handle()
err, slot_status = self._interface.byonoy_get_device_slot_status(handle)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error getting slot status: {err}")
return slot_status

def _get_lid_status(self) -> bool:
handle = self.verify_device_handle()
lid_on: bool
err, lid_on = self._interface.byonoy_get_device_parts_aligned(handle)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error getting slot status: {err}")
return lid_on

def _get_supported_wavelengths(self) -> List[int]:
handle = self.verify_device_handle()
wavelengths: List[int]
err, wavelengths = self._interface.byonoy_abs96_get_available_wavelengths(
handle
)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error getting supported wavelengths: {err}")
self._supported_wavelengths = wavelengths
return wavelengths

def _initialize_measurement(self, conf: AbsProtocol.MeasurementConfig) -> None:
handle = self.verify_device_handle()
err = self._interface.byonoy_abs96_initialize_single_measurement(handle, conf)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error initializing measurement: {err}")
self._current_config = conf

def _single_measurement(self, conf: AbsProtocol.MeasurementConfig) -> List[float]:
handle = self.verify_device_handle()
measurements: List[float]
err, measurements = self._interface.byonoy_abs96_single_measure(handle, conf)
if err.name != "BYONOY_ERROR_NO_ERROR":
raise RuntimeError(f"Error getting single measurement: {err}")
return measurements

def _set_sample_wavelength(self, wavelength: int) -> AbsProtocol.MeasurementConfig:
if not self._supported_wavelengths:
self._get_supported_wavelengths()
assert self._supported_wavelengths
if wavelength in self._supported_wavelengths:
conf = self._interface.ByonoyAbs96SingleMeasurementConfig()
conf.sample_wavelength = wavelength
return conf
else:
raise ValueError(
f"Unsupported wavelength: {wavelength}, expected: {self._supported_wavelengths}"
)

def _initialize(self, wavelength: int) -> None:
conf = self._set_sample_wavelength(wavelength)
self._initialize_measurement(conf)

def _get_single_measurement(self, wavelength: int) -> List[float]:
initialized = self._current_config
assert initialized and initialized.sample_wavelength == wavelength
return self._single_measurement(initialized)

async def open(self) -> None:
"""
Open the connection.

Returns: None
"""
return await self._loop.run_in_executor(
executor=self._executor, func=self._open
)

async def close(self) -> None:
"""
Close the connection

Returns: None
"""
await self._loop.run_in_executor(executor=self._executor, func=self._free)

async def is_open(self) -> bool:
"""
Check if connection is open.

Returns: boolean
"""
return self._device_handle is not None

async def get_device_static_info(self) -> Dict[str, str]:
return {
"serial": self._device.sn,
"model": "ABS96",
}

async def get_device_information(self) -> Dict[str, str]:
device_info = await self._loop.run_in_executor(
executor=self._executor, func=self._get_device_information
)
return {
"serial_number": device_info.sn,
"reference_number": device_info.ref_no,
"version": device_info.version,
}

async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
lid_info = await self._loop.run_in_executor(
executor=self._executor, func=self._get_lid_status
)
return (
AbsorbanceReaderLidStatus.ON if lid_info else AbsorbanceReaderLidStatus.OFF
)

async def get_supported_wavelengths(self) -> list[int]:
return await self._loop.run_in_executor(
executor=self._executor, func=self._get_supported_wavelengths
)

async def initialize(self, wavelength: int) -> None:
return await self._loop.run_in_executor(
executor=self._executor, func=partial(self._initialize, wavelength)
)

async def get_single_measurement(self, wavelength: int) -> List[float]:
return await self._loop.run_in_executor(
executor=self._executor,
func=partial(self._get_single_measurement, wavelength),
)

async def get_plate_presence(self) -> AbsorbanceReaderPlatePresence:
return AbsorbanceReaderPlatePresence.UNKNOWN
61 changes: 61 additions & 0 deletions api/src/opentrons/drivers/absorbance_reader/driver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

import asyncio
from typing import Dict, Optional, List

from opentrons.drivers.types import AbsorbanceReaderLidStatus
from opentrons.drivers.absorbance_reader.abstract import AbstractAbsorbanceReaderDriver
from .async_byonoy import AsyncByonoy


class AbsorbanceReaderDriver(AbstractAbsorbanceReaderDriver):
@classmethod
async def create(
cls,
port: str,
loop: Optional[asyncio.AbstractEventLoop],
) -> AbsorbanceReaderDriver:
"""Create an absorbance reader driver."""
from .async_byonoy import AsyncByonoy

connection = await AsyncByonoy.create(port=port, loop=loop)
return cls(connection=connection)

def __init__(self, connection: AsyncByonoy) -> None:
self._connection = connection

async def get_device_info(self) -> Dict[str, str]:
"""Get device info"""
connected = await self.is_connected()
if not connected:
info = await self._connection.get_device_static_info()
else:
info = await self._connection.get_device_information()
return info

async def connect(self) -> None:
"""Connect to absorbance reader"""
await self._connection.open()

async def disconnect(self) -> None:
"""Disconnect from absorbance reader"""
await self._connection.close()

async def is_connected(self) -> bool:
"""Check connection to absorbance reader"""
return await self._connection.is_open()

async def get_lid_status(self) -> AbsorbanceReaderLidStatus:
return await self._connection.get_lid_status()

async def get_available_wavelengths(self) -> List[int]:
return await self._connection.get_supported_wavelengths()

async def get_single_measurement(self, wavelength: int) -> List[float]:
return await self._connection.get_single_measurement(wavelength)

async def initialize_measurement(self, wavelength: int) -> None:
await self._connection.initialize(wavelength)

async def get_status(self) -> None:
pass
Loading
Loading