From 893f1e33425f6ac13561e4b25095e83c4ca5dea4 Mon Sep 17 00:00:00 2001 From: "Ryan D. Lewis" Date: Wed, 11 Oct 2023 15:55:03 -0500 Subject: [PATCH] Paper Freeze (#39) * Refactor: Cleanup for Digital Discovery Paper * testing resetting with errors * Fix: reset state on reconnect --------- Co-authored-by: Dozgulbas --- Makefile | 12 +- README.md | 52 +- docs/source/conf.py | 4 +- ot2_driver/__init__.py | 2 +- ot2_driver/config.py | 8 +- ot2_driver/ot2_driver_http.py | 27 +- ot2_driver/protocol_20230221-123810.py | 15 +- ot2_driver/protopiler/config.py | 26 +- ot2_driver/protopiler/deconstructor.py | 87 ++-- ot2_driver/protopiler/protopiler.py | 371 ++++++++------- ot2_driver/protopiler/resource_manager.py | 98 ++-- .../protopiler/test_configs/basic_config.py | 1 - package.xml | 20 - requirements.txt | 1 + resource/ot2_driver | 0 scripts/delete_all_runs.py | 19 +- scripts/ot2_rest_client.py | 450 +++++++++--------- setup.cfg | 2 +- setup.py | 1 + tests/test_ot2_streaming.py | 4 +- 20 files changed, 614 insertions(+), 586 deletions(-) delete mode 100644 package.xml delete mode 100644 resource/ot2_driver diff --git a/Makefile b/Makefile index 4f8ef51..e7bbe4c 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .DEFAULT_GOAL := all -isort = isort ot2_driver -black = black --target-version py39 ot2_driver +isort = python -m isort . +black = python -m black --target-version py39 . .PHONY: format format: @@ -10,15 +10,15 @@ format: .PHONY: lint lint: $(black) --check --diff - flake8 ot2_driver/ - pydocstyle ot2_driver/ --count + python -m flake8 . + python -m pydocstyle . --count .PHONY: mypy mypy: - mypy --config-file setup.cfg --package ot2_driver/ - mypy --config-file setup.cfg ot2_driver/ + mypy --config-file setup.cfg --package . + mypy --config-file setup.cfg . .PHONY: all all: format lint diff --git a/README.md b/README.md index 4172e3e..f2b92f9 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,18 @@ -### Installation -1. `git clone https://github.com/KPHippe/ot2_driver.git` -2. Switch to my dev branch `git checkout dev-kyle` -3. I would recommend a conda/venv environment. The following assusumes conda. - 1. `conda create -n ot2-driver python=3.9` - 1. `conda activate ot2-driver` - 1. `pip install -r requirements.txt` - 1. `pip install -e .` - +### Installation + +``` +git clone https://github.com/AD-SDL/ot2_module.git +cd ot2_module +pip install -r requirements.txt +pip install -e . +``` + *This installs ot2_driver as a package* +Note: This module was developed using Python 3.9 + ### Getting the OT2 setup for ssh -*This is not required (or used) for the HTTP driver* +*This is not required (or used) for the HTTP driver* When setting up an ssh key to connect to the opentrons, it is helpful to make a new one without a passphrase. For more information on setting up an ssh connection see: *Note, you have to have the Opentrons App installed* @@ -18,11 +20,11 @@ When setting up an ssh key to connect to the opentrons, it is helpful to make a - https://support.opentrons.com/en/articles/3203681-setting-up-ssh-access-to-your-ot-2 - https://support.opentrons.com/en/articles/3287453-connecting-to-your-ot-2-with-ssh -For prototyping in the RPL, connect via the wire and wait for the robot to become visible on the application. Click `settings` then `network settings` and if you intend on running via the wire, use the `wired-ip` in the robot configuration file. If you intend to use the wireless IP, you must connect to the `snowcrash` network, but this does not have internet access. +For prototyping in the RPL, connect via the wire and wait for the robot to become visible on the application. Click `settings` then `network settings` and if you intend on running via the wire, use the `wired-ip` in the robot configuration file. If you intend to use the wireless IP, you must connect to the `snowcrash` network, but this does not have internet access. -### Robot config +### Robot config -Below is an example of what I refer to as the `robot_config` +Below is an example of what I refer to as the `robot_config` ``` # OT2 in lab - ip: IP.ADDRESS @@ -32,7 +34,7 @@ Below is an example of what I refer to as the `robot_config` ``` -### Running the driver +### Running the driver If you would like to use the script as I have made it, I have provided command line descriptions and example commands below @@ -57,14 +59,14 @@ optional arguments: ``` -To run the `protopiler/example_configs/basic_config.yaml` with verbose settings and default outs, run the following +To run the `protopiler/example_configs/basic_config.yaml` with verbose settings and default outs, run the following ``` python ot2_driver_ssh.py -rc [insert/robot/config/path] -pc protopiler/example_configs/basic_config.yaml -v ``` -To run the `protopiler/example_configs/basic_config.yaml` with verbose settings and specify the output files, run the following +To run the `protopiler/example_configs/basic_config.yaml` with verbose settings and specify the output files, run the following ``` -python ot2_driver_ssh.py -rc [insert/robot/config/path] -pc protopiler/example_configs/basic_config.yaml -po ./test_protocol.py -ro ./test_resources.json -v +python ot2_driver_ssh.py -rc [insert/robot/config/path] -pc protopiler/example_configs/basic_config.yaml -po ./test_protocol.py -ro ./test_resources.json -v ``` To run your own protocol.py file, replace data in the `-pc` option with the path to your protocol.py file @@ -72,12 +74,12 @@ To run your own protocol.py file, replace data in the `-pc` option with the path python ot2_driver_ssh.py -rc [insert/robot/config/path] -pc ./test_protocol.py ``` -*The process is the same for the HTTP driver* +*The process is the same for the HTTP driver* If you would like to write your own code using the ot2 driver you can start with something like this ```python # would have to specify path to ot2_driver_ssh.py if not in directory -from ot2_driver_ssh import OT2_Driver +from ot2_driver_ssh import OT2_Driver # Load one ot2 for ot2_raw_cfg in yaml.safe_load(open(`robot_config_path`)): @@ -92,22 +94,22 @@ if "py" not in str(`protocol_config`): protocol_out=`protocol_out`, resource_out=`resource_out`, ) - -# Transfer the protocol to the ot2 + +# Transfer the protocol to the ot2 transfer_returncode = ot2.transfer(protocol_file) if returncode: print("Exception raised when transferring...") -# Execute the protocol +# Execute the protocol ot2.execute(protocol_file) ``` ### Updating the Code on node computer -1. `cd ~/wei_ws/src/ot2_driver` +1. `cd ~/wei_ws/src/ot2_module` 2. `git pull` 3. `pip install -e .` -### Running Dev Tools - +### Running Dev Tools + 1. Install `pip install -r requirements/dev.txt` 2. Run `make` in project root diff --git a/docs/source/conf.py b/docs/source/conf.py index 0df6665..b13fd11 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -6,16 +6,16 @@ # -- Path setup -------------------------------------------------------------- +import datetime + # If extensions (or modules to document with autodoc) are in another directory, # add these directories to sys.path here. If the directory is relative to the # documentation root, use os.path.abspath to make it absolute, like shown here. # import os import sys -import datetime sys.path.insert(0, os.path.abspath("../..")) -import ot2_driver # -- Project information ----------------------------------------------------- diff --git a/ot2_driver/__init__.py b/ot2_driver/__init__.py index 27c5939..0bfe215 100644 --- a/ot2_driver/__init__.py +++ b/ot2_driver/__init__.py @@ -1,3 +1,3 @@ """Test descriptions, I am in the `ot2_driver.__init__`""" -__version__ = "0.0.1a1" +__version__ = "0.2.0" diff --git a/ot2_driver/config.py b/ot2_driver/config.py index 09f211a..cc7502c 100644 --- a/ot2_driver/config.py +++ b/ot2_driver/config.py @@ -1,10 +1,10 @@ """Stores dataclasses/args/config for the ot2 drivers""" -import yaml import json from argparse import ArgumentParser, Namespace from pathlib import Path -from typing import Union, Optional, Type, TypeVar +from typing import Optional, Type, TypeVar, Union +import yaml from pydantic import BaseModel as _BaseModel _T = TypeVar("_T") @@ -13,7 +13,7 @@ class BaseModel(_BaseModel): - """Allows any sub-class to inherit methods allowing for programatic description of protocols + """Allows any sub-class to inherit methods allowing for programmatic description of protocols Can load a yaml into a class and write a class into a yaml file. """ @@ -48,7 +48,7 @@ def json(self, **kwargs) -> str: return super().json(**kwargs) def write_yaml(self, cfg_path: PathLike) -> None: - """Allows programatic creation of ot2util objects and saving them into yaml. + """Allows programmatic creation of ot2util objects and saving them into yaml. Parameters ---------- cfg_path : PathLike diff --git a/ot2_driver/ot2_driver_http.py b/ot2_driver/ot2_driver_http.py index 6116904..337ec54 100644 --- a/ot2_driver/ot2_driver_http.py +++ b/ot2_driver/ot2_driver_http.py @@ -3,13 +3,13 @@ import time from enum import Enum from pathlib import Path -from typing import Dict, List, Optional, Tuple, Any +from typing import Any, Dict, List, Optional, Tuple import requests import yaml from urllib3 import Retry -from ot2_driver.config import PathLike, parse_ot2_args, OT2_Config +from ot2_driver.config import OT2_Config, PathLike, parse_ot2_args from ot2_driver.protopiler.protopiler import ProtoPiler @@ -77,7 +77,12 @@ def __init__( self.change_lights_status(status=True) def compile_protocol( - self, config_path, resource_file=None, resource_path=None, payload: Optional[Dict[str, Any]] = None, protocol_out_path=None + self, + config_path, + resource_file=None, + resource_path=None, + payload: Optional[Dict[str, Any]] = None, + protocol_out_path=None, ) -> Tuple[str, str]: """Compile the protocols via protopiler @@ -97,14 +102,20 @@ def compile_protocol( """ if ".py" not in str(config_path): self.protopiler.load_config( - config_path=config_path, resource_file=resource_file, resource_path=resource_path, protocol_out_path=protocol_out_path + config_path=config_path, + resource_file=resource_file, + resource_path=resource_path, + protocol_out_path=protocol_out_path, ) print("resource_file = {}".format(str(resource_file))) ( protocol_out_path, protocol_resource_file, ) = self.protopiler.yaml_to_protocol( - config_path, resource_file=resource_file, resource_file_out=resource_path, payload=payload + config_path, + resource_file=resource_file, + resource_file_out=resource_path, + payload=payload, ) return protocol_out_path, protocol_resource_file @@ -258,10 +269,10 @@ def get_robot_status(self) -> RobotStatus: Status Either IDLE or RUNNING """ - runs = self.get_runs() - if runs is None: + runs = self.get_runs() + if runs is None: return RobotStatus.OFFLINE.value - + for run in runs: run_status = run["status"] if ( diff --git a/ot2_driver/protocol_20230221-123810.py b/ot2_driver/protocol_20230221-123810.py index d61e8a0..c9d37e6 100644 --- a/ot2_driver/protocol_20230221-123810.py +++ b/ot2_driver/protocol_20230221-123810.py @@ -1,15 +1,14 @@ from opentrons import protocol_api - metadata = { "protocolName": "Color Mixing all", "author": "Kyle khippe@anl.gov", "description": "Mixing red colors", - "apiLevel": "2.12" + "apiLevel": "2.12", } -def run(protocol: protocol_api.ProtocolContext): +def run(protocol: protocol_api.ProtocolContext): deck = {} pipettes = {} @@ -20,8 +19,12 @@ def run(protocol: protocol_api.ProtocolContext): deck["7"] = protocol.load_labware("opentrons_6_tuberack_nest_50ml_conical", "7") deck["10"] = protocol.load_labware("opentrons_96_tiprack_300ul", "10") deck["11"] = protocol.load_labware("opentrons_96_tiprack_20ul", "11") - pipettes["left"] = protocol.load_instrument("p300_single_gen2", "left", tip_racks=[deck["10"]]) - pipettes["right"] = protocol.load_instrument("p20_single_gen2", "right", tip_racks=[deck["11"]]) + pipettes["left"] = protocol.load_instrument( + "p300_single_gen2", "left", tip_racks=[deck["10"]] + ) + pipettes["right"] = protocol.load_instrument( + "p20_single_gen2", "right", tip_racks=[deck["11"]] + ) #################### # execute commands # @@ -60,7 +63,6 @@ def run(protocol: protocol_api.ProtocolContext): pipettes["left"].blow_out() pipettes["left"].drop_tip() - # Mix color 2 pipettes["left"].pick_up_tip(deck["10"].wells()[3]) pipettes["left"].well_bottom_clearance.aspirate = 1.0 @@ -94,7 +96,6 @@ def run(protocol: protocol_api.ProtocolContext): pipettes["right"].blow_out() pipettes["right"].drop_tip() - # Mix color 3 pipettes["right"].pick_up_tip(deck["11"].wells()[2]) pipettes["right"].well_bottom_clearance.aspirate = 1.0 diff --git a/ot2_driver/protopiler/config.py b/ot2_driver/protopiler/config.py index 54076af..155e4e8 100644 --- a/ot2_driver/protopiler/config.py +++ b/ot2_driver/protopiler/config.py @@ -110,9 +110,10 @@ class Transfer(CommandBase): """blow out from tip into current location""" drop_tip: Union[bool, List[bool]] = True """Drop the tip once a transfer is done""" - return_tip:Union[bool, List[bool]] = False + return_tip: Union[bool, List[bool]] = False """puts tip back into tip box""" + class Multi_Transfer(CommandBase): multi_source: Union[str, List[List[str]]] """List of sources to be aspirated, each list within matrix presumed to be in single column""" @@ -133,6 +134,7 @@ class Multi_Transfer(CommandBase): multi_drop_tip: Union[bool, List[bool]] = True """Drop the tip once a transfer is done""" + class Mix(CommandBase): reps: Union[int, List[int]] """how many mix cycles""" @@ -141,26 +143,32 @@ class Mix(CommandBase): location: Union[List[str], str] """mixing destination""" + class Deactivate(CommandBase): deactivate: bool """Deactivates current module""" + class Temperature_Set(CommandBase): change_temp: int """Temperature to set temperature module to""" + class Replace_Tip(CommandBase): replace_tip: bool """Place tip back into tip rack""" + class Clear_Pipette(CommandBase): clear: bool """Blowout and remove any tip on pipette over trash""" + class Move_Pipette(CommandBase): move_to: int """Moves pipette to given deck position""" + # metadata container class Metadata(BaseSettings): """Container for the run metadata""" @@ -182,7 +190,19 @@ class ProtocolConfig(BaseSettings): """The additional resources (currently xls, xlsx files) to be used when compiling a protocol""" equipment: List[Union[Labware, Pipette]] """A list of the equipment you want to use on the OT2""" - commands: List[Union[Transfer, Multi_Transfer, Mix, Deactivate, Temperature_Set, Replace_Tip, Clear_Pipette, Move_Pipette, CommandBase]] + commands: List[ + Union[ + Transfer, + Multi_Transfer, + Mix, + Deactivate, + Temperature_Set, + Replace_Tip, + Clear_Pipette, + Move_Pipette, + CommandBase, + ] + ] """Commands to execute during run""" metadata: Metadata - """Information about the run""" \ No newline at end of file + """Information about the run""" diff --git a/ot2_driver/protopiler/deconstructor.py b/ot2_driver/protopiler/deconstructor.py index 2cc7ecd..5ccf2f1 100644 --- a/ot2_driver/protopiler/deconstructor.py +++ b/ot2_driver/protopiler/deconstructor.py @@ -18,7 +18,7 @@ class Deconstructor: """Pull apart a python protocol into a config""" def __init__(self, opentrons_simulate_bin: str = "opentrons_simulate") -> None: - """Initialize protocol decsonstructor + """Initialize protocol deconstructor Parameters ---------- @@ -58,57 +58,31 @@ def deconstruct( Path to the saved config.yml file, will be created if not given """ sim_command_commands = f"{self.opentrons_simulate_bin} {protocol_path}" - simultation_res = subprocess.run( + simulation_res = subprocess.run( sim_command_commands.split(), capture_output=True, text=True ) - if simultation_res.returncode: - print(f"Simulation failed with error: {simultation_res.stdout}") + if simulation_res.returncode: + print(f"Simulation failed with error: {simulation_res.stdout}") - raw_commands = simultation_res.stdout.strip() + raw_commands = simulation_res.stdout.strip() liminal_commands = [] for raw_command in raw_commands.split("\n"): liminal_commands.append(self._parse_raw_command(raw_command)) - potential_command = None - for command in liminal_commands: - potential_command = self._parse_liminal_command(command, potential_command) - if potential_command is not None: - if ( - potential_command.destination != "NA" - and potential_command.source != "NA" - and potential_command.volume != "NA" - ): - self.commands.append(potential_command) - potential_command = None - - # get the labware - resource_names = {} - for command in liminal_commands: - try: - key = command["info"]["labware_name"] - val = command["info"]["labware_location"] - if key in resource_names: - resource_names[key].add(val) - else: - resource_names[key] = set(val) - except KeyError: - pass - - for name, locations in resource_names.items(): - for loc in locations: - self.resources.append(Labware(name=name, location=loc)) + self._get_commands(liminal_commands) + self._get_labware(liminal_commands) # find the pipette tips sim_command_labware = f"{self.opentrons_simulate_bin} {protocol_path} -l info" - simulatation_labware_res = subprocess.run( + simulation_labware_res = subprocess.run( sim_command_labware.split(), capture_output=True, text=True ) - if simulatation_labware_res.returncode: + if simulation_labware_res.returncode: raise Exception("Cannot run simulation with long info") - for new_pipette in self._find_pipettes(simulatation_labware_res.stdout): + for new_pipette in self._find_pipettes(simulation_labware_res.stdout): occupied = False for resource in self.resources: if ( @@ -130,6 +104,42 @@ def deconstruct( ) protocol_config.dump_yaml(args.config_out) + def _get_commands(self, liminal_commands): + """ + Get the commands from the liminal commands + """ + potential_command = None + for command in liminal_commands: + potential_command = self._parse_liminal_command(command, potential_command) + if potential_command is not None: + if ( + potential_command.destination != "NA" + and potential_command.source != "NA" + and potential_command.volume != "NA" + ): + self.commands.append(potential_command) + potential_command = None + + def _get_labware(self, liminal_commands): + """ + Gets the labware from the commands and adds it to resources + """ + resource_names = {} + for command in liminal_commands: + try: + key = command["info"]["labware_name"] + val = command["info"]["labware_location"] + if key in resource_names: + resource_names[key].add(val) + else: + resource_names[key] = set(val) + except KeyError: + pass + + for name, locations in resource_names.items(): + for loc in locations: + self.resources.append(Labware(name=name, location=loc)) + def _parse_raw_command(self, command: str) -> Dict: # TODO remove brittle solution from parsing if possible for key, strategy in self.commands_parsing_strategy.items(): @@ -203,7 +213,6 @@ def _parse_dropping_tip(self, raw_command: str) -> Dict: def _parse_liminal_command( self, liminal_command: dict, partial_command: Command = None ) -> Optional[Command]: - if "pickup_tip" == liminal_command["command"]: return partial_command @@ -253,9 +262,9 @@ def _find_pipettes(self, simulation_out: str) -> Pipette: def main(args): # noqa: D103 - decon = Deconstructor() + deconstructor = Deconstructor() - decon.deconstruct(protocol_path=args.protocol, config_path=args.config_out) + deconstructor.deconstruct(protocol_path=args.protocol, config_path=args.config_out) if __name__ == "__main__": diff --git a/ot2_driver/protopiler/protopiler.py b/ot2_driver/protopiler/protopiler.py index cc51cc2..478c57f 100644 --- a/ot2_driver/protopiler/protopiler.py +++ b/ot2_driver/protopiler/protopiler.py @@ -1,15 +1,27 @@ """Protopiler is designed to compile a config yaml into a working protocol""" import argparse -from concurrent.futures import process import copy from datetime import datetime from itertools import repeat from pathlib import Path -from typing import List, Optional, Tuple, Union, Dict +from typing import Dict, List, Optional, Tuple, Union import pandas as pd -from ot2_driver.protopiler.config import CommandBase, Transfer, Multi_Transfer, Mix, Deactivate, Temperature_Set, Replace_Tip, Clear_Pipette, Move_Pipette, PathLike, ProtocolConfig, Resource +from ot2_driver.protopiler.config import ( + Clear_Pipette, + CommandBase, + Deactivate, + Mix, + Move_Pipette, + Multi_Transfer, + PathLike, + ProtocolConfig, + Replace_Tip, + Resource, + Temperature_Set, + Transfer, +) from ot2_driver.protopiler.resource_manager import ResourceManager """ Things to do: @@ -56,12 +68,16 @@ def __init__( self.load_config(config_path=config_path, resource_file=resource_file) def load_config( - self, config_path: PathLike, resource_file: Optional[PathLike] = None, resource_path: Optional[PathLike] = None, protocol_out_path: Optional[PathLike] = None + self, + config_path: PathLike, + resource_file: Optional[PathLike] = None, + resource_path: Optional[PathLike] = None, + protocol_out_path: Optional[PathLike] = None, ) -> None: - """Loading the config and generating necesary information for compiling a config + """Loading the config and generating necessary information for compiling a config This is what allows for nothing to be passed in during obj creation, if a user calls - this method, it will load all the necesary things + this method, it will load all the necessary things Parameters ---------- config_path : PathLike @@ -154,11 +170,11 @@ def _postprocess_commands(self) -> None: # Could use more testing and not peek_well[1:].isdigit() and "payload" not in peek_well ): - # read from file new_locations = [] for orig_command, loc in zip( - repeat(command.destination), self.resources[resource_key][peek_well] + repeat(command.destination), + self.resources[resource_key][peek_well], ): orig_deck_location = orig_command.split(":")[0] new_locations.append(f"{orig_deck_location}:{loc}") @@ -191,28 +207,29 @@ def _postprocess_commands(self) -> None: # Could use more testing peek_well: str = peek_elem.split(":")[-1] if isinstance(command.location, list): # No mixing and matching peek_elem = command.location[0] - - #external file + + # external file if ( not peek_well.isdigit() and not peek_well[1:].isdigit() and "payload" not in peek_well ): - # read from file new_locations = [] for orig_command, loc in zip( - repeat(command.location), self.resources[resource_key][peek_well] + repeat(command.location), + self.resources[resource_key][peek_well], ): orig_deck_location = orig_command.split(":")[0] new_locations.append(f"{orig_deck_location}:{loc}") command.location = new_locations - if isinstance(command, Multi_Transfer): if ":[" in command.multi_source: - command.multi_source = self._unpack_multi_alias(command_elem=command.multi_source) + command.multi_source = self._unpack_multi_alias( + command_elem=command.multi_source + ) # Add logic for taking well names from files # peek into the source, check the well destination part @@ -223,39 +240,38 @@ def _postprocess_commands(self) -> None: # Could use more testing # check if it follows naming convention`[A-Z,a-z]?[0-9]{1,3}` # TODO better way to check the naming conventions for the wells peek_well = peek_well.split(", ") - if ( - len(peek_well) == 1 - and "payload" not in peek_well - ): + if len(peek_well) == 1 and "payload" not in peek_well: # read from file new_locations = [] for orig_command, loc in zip( - repeat(command.multi_source), self.resources[resource_key][peek_well[0]] + repeat(command.multi_source), + self.resources[resource_key][peek_well[0]], ): orig_deck_location = orig_command.split(":")[0] new_locations.append(f"{orig_deck_location}:{loc}") command.multi_source = new_locations if ":[" in command.multi_destination: - command.multi_destination = self._unpack_multi_alias(command.multi_destination) + command.multi_destination = self._unpack_multi_alias( + command.multi_destination + ) # Add logic for reading well names from file # peek into the source, check the well destination part peek_elem = command.multi_destination - if isinstance(command.multi_destination, list): # No mixing and matching + if isinstance( + command.multi_destination, list + ): # No mixing and matching peek_elem = command.multi_destination[0] peek_well = peek_elem.split(":")[-1] peek_well = peek_well.split(", ") - if ( - len(peek_well) == 1 - and "payload" not in peek_well - ): - + if len(peek_well) == 1 and "payload" not in peek_well: # read from file new_locations = [] for orig_command, loc in zip( - repeat(command.multi_destination), self.resources[resource_key][peek_well[0]] + repeat(command.multi_destination), + self.resources[resource_key][peek_well[0]], ): orig_deck_location = orig_command.split(":")[0] new_locations.append(f"{orig_deck_location}:{loc}") @@ -280,9 +296,7 @@ def _unpack_alias(self, command_elem: Union[str, List[str]]) -> List[str]: new_locations = [] alias = command_elem.split(":")[0] process_source = copy.deepcopy(command_elem) - process_source = ":".join( - process_source.split(":")[1:] - ) + process_source = ":".join(process_source.split(":")[1:]) # split and rejoin after first colon process_source = process_source.strip("][").split(", ") for location in process_source: @@ -296,14 +310,14 @@ def _unpack_alias(self, command_elem: Union[str, List[str]]) -> List[str]: new_locations.append(location) return new_locations - - def _unpack_multi_alias(self, command_elem: Union[str, List[List[str]]]) -> List[List[str]]: + + def _unpack_multi_alias( + self, command_elem: Union[str, List[List[str]]] + ) -> List[List[str]]: new_locations = [] alias = command_elem.split(":")[0] process_source = copy.deepcopy(command_elem) - process_source = ":".join( - process_source.split(":")[1:] - ) + process_source = ":".join(process_source.split(":")[1:]) process_source = process_source.strip("][").split("], [") for i in range(len(process_source)): @@ -319,7 +333,6 @@ def _unpack_multi_alias(self, command_elem: Union[str, List[List[str]]]) -> List new_locations.append(new_location) return new_locations - def load_resources(self, resources: List[Resource]): """Load the other resources (files) specified in the config @@ -410,10 +423,10 @@ def yaml_to_protocol( ) else: protocol_out = Path( - self.protocol_out_path + f"./protocol_{datetime.now().strftime('%Y%m%d-%H%M%S')}.py" + self.protocol_out_path + + f"./protocol_{datetime.now().strftime('%Y%m%d-%H%M%S')}.py" ) - protocol = [] # Header and run() declaration with initial deck and pipette dicts @@ -552,12 +565,13 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: (self.template_dir / "aspirate_clearance.template") ).read() blow_out_template = open((self.template_dir / "blow_out.template")).read() - temp_change_template = open((self.template_dir / "set_temperature.template")).read() + temp_change_template = open( + (self.template_dir / "set_temperature.template") + ).read() deactivate_template = open((self.template_dir / "deactivate.template")).read() move_template = open((self.template_dir / "move_pipette.template")).read() tip_loaded = {"left": False, "right": False} for i, command_block in enumerate(self.commands): - block_name = ( command_block.name if command_block.name is not None else f"command {i}" ) @@ -565,7 +579,6 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: # TODO: Inject the payload here # Inject the payload if isinstance(payload, dict): - (arg_keys, arg_values) = zip(*command_block.__dict__.items()) for key, value in payload.items(): if "payload." not in key: @@ -592,19 +605,23 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: pass else: # determine which pipette to use - pipette_mount = self.resource_manager.determine_pipette(volume, False) + pipette_mount = self.resource_manager.determine_pipette( + volume, False + ) if pipette_mount is None: raise Exception( f"No pipette available for {block_name} with volume: {volume}" ) - # check for tip + # check for tip print(pipette_mount) if not tip_loaded[pipette_mount]: load_command = pick_tip_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) # TODO: think of some better software design for accessing members of resource manager - pipette_name = self.resource_manager.mount_to_pipette[pipette_mount] + pipette_name = self.resource_manager.mount_to_pipette[ + pipette_mount + ] # TODO: define flag to grab from specific well or just use the ones defined by the OT2 if True: @@ -627,8 +644,10 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: # aspirate and dispense # set aspirate clearance - aspirate_clearance_command = aspirate_clearance_template.replace( - "#pipette#", f'pipettes["{pipette_mount}"]' + aspirate_clearance_command = ( + aspirate_clearance_template.replace( + "#pipette#", f'pipettes["{pipette_mount}"]' + ) ) aspirate_clearance_command = aspirate_clearance_command.replace( "#height#", str(asp_height) @@ -641,7 +660,9 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: aspirate_command = aspirate_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) - aspirate_command = aspirate_command.replace("#volume#", str(volume)) + aspirate_command = aspirate_command.replace( + "#volume#", str(volume) + ) aspirate_command = aspirate_command.replace( "#src#", f'deck["{src_wellplate_location}"]["{src_well}"]' ) @@ -651,13 +672,15 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: ) # set dispense clearance - dispense_clearance_commmand = dispense_clearance_template.replace( - "#pipette#", f'pipettes["{pipette_mount}"]' + dispense_clearance_command = ( + dispense_clearance_template.replace( + "#pipette#", f'pipettes["{pipette_mount}"]' + ) ) - dispense_clearance_commmand = dispense_clearance_commmand.replace( + dispense_clearance_command = dispense_clearance_command.replace( "#height#", str(disp_height) ) - commands.append(dispense_clearance_commmand) + commands.append(dispense_clearance_command) dst_wellplate_location = self._parse_wellplate_location(dst) dst_well = dst.split(":")[ @@ -666,7 +689,9 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: dispense_command = dispense_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) - dispense_command = dispense_command.replace("#volume#", str(volume)) + dispense_command = dispense_command.replace( + "#volume#", str(volume) + ) dispense_command = dispense_command.replace( "#dst#", f'deck["{dst_wellplate_location}"]["{dst_well}"]' ) @@ -682,12 +707,16 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: mix_command = mix_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) - mix_command = mix_command.replace("#volume#", str(mix_vol)) + mix_command = mix_command.replace( + "#volume#", str(mix_vol) + ) mix_command = mix_command.replace( "#loc#", f'deck["{dst_wellplate_location}"]["{dst_well}"]', # same as destination ) - mix_command = mix_command.replace("#reps#", str(mix_cycles)) + mix_command = mix_command.replace( + "#reps#", str(mix_cycles) + ) commands.append(mix_command) @@ -704,7 +733,7 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: ) commands.append(drop_command) tip_loaded[pipette_mount] = False - + if return_tip: return_command = return_tip_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' @@ -728,36 +757,42 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: if volume <= 0: pass else: - # determine which pipette to use - pipette_mount = self.resource_manager.determine_pipette(volume, True) + pipette_mount = self.resource_manager.determine_pipette( + volume, True + ) if pipette_mount is None: raise Exception( f"No pipette available for {block_name} with volume: {volume}" ) # check to make sure that appropriate pipette is a multi-channel - #TODO: need to change, what if we have single and multi channel of same volume? - if "multi" not in self.resource_manager.mount_to_pipette[pipette_mount]: - raise Exception( - f"Selected pipette is not multi-channel" - ) + # TODO: need to change, what if we have single and multi channel of same volume? + if ( + "multi" + not in self.resource_manager.mount_to_pipette[pipette_mount] + ): + raise Exception("Selected pipette is not multi-channel") # check for tip if not tip_loaded[pipette_mount]: load_command = pick_tip_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) - pipette_name = self.resource_manager.mount_to_pipette[pipette_mount] + pipette_name = self.resource_manager.mount_to_pipette[ + pipette_mount + ] new_src = copy.copy(src) new_src = new_src.replace("'", "") new_src = new_src.split(":")[-1] - new_src = new_src.strip('][').split(', ') - + new_src = new_src.strip("][").split(", ") + # TODO: define flag to grab from specific well or just use the ones defined by the OT2 if True: ( rack_location, well_location, - ) = self.resource_manager.get_next_tip(pipette_name, len(new_src)) + ) = self.resource_manager.get_next_tip( + pipette_name, len(new_src) + ) location_string = ( f'deck["{rack_location}"].wells()[{well_location}]' @@ -771,17 +806,19 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: commands.append(load_command) tip_loaded[pipette_mount] = True - + else: new_src = copy.copy(src) new_src = new_src.replace("'", "") new_src = new_src.split(":")[-1] - new_src = new_src.strip('][').split(', ') + new_src = new_src.strip("][").split(", ") # aspirate and dispense # set aspirate clearance - aspirate_clearance_command = aspirate_clearance_template.replace( - "#pipette#", f'pipettes["{pipette_mount}"]' + aspirate_clearance_command = ( + aspirate_clearance_template.replace( + "#pipette#", f'pipettes["{pipette_mount}"]' + ) ) aspirate_clearance_command = aspirate_clearance_command.replace( "#height#", str(asp_height) @@ -794,35 +831,43 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: aspirate_command = aspirate_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) - aspirate_command = aspirate_command.replace("#volume#", str(volume)) + aspirate_command = aspirate_command.replace( + "#volume#", str(volume) + ) aspirate_command = aspirate_command.replace( "#src#", f'deck["{src_wellplate_location}"]["{src_well}"]' ) commands.append(aspirate_command) - + self.resource_manager.update_well_usage( src_wellplate_location, new_src ) # set dispense clearance - dispense_clearance_commmand = dispense_clearance_template.replace( - "#pipette#", f'pipettes["{pipette_mount}"]' + dispense_clearance_command = ( + dispense_clearance_template.replace( + "#pipette#", f'pipettes["{pipette_mount}"]' + ) ) - dispense_clearance_commmand = dispense_clearance_commmand.replace( + dispense_clearance_command = dispense_clearance_command.replace( "#height#", str(disp_height) ) - commands.append(dispense_clearance_commmand) + commands.append(dispense_clearance_command) dst_wellplate_location = self._parse_wellplate_location(dst) new_dst = copy.copy(dst) new_dst = new_dst.replace("'", "") new_dst = new_dst.split(":")[-1] - new_dst = new_dst.strip('][').split(', ') - dst_well = new_dst[0] # should handle things not formed like loc:well + new_dst = new_dst.strip("][").split(", ") + dst_well = new_dst[ + 0 + ] # should handle things not formed like loc:well dispense_command = dispense_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) - dispense_command = dispense_command.replace("#volume#", str(volume)) + dispense_command = dispense_command.replace( + "#volume#", str(volume) + ) dispense_command = dispense_command.replace( "#dst#", f'deck["{dst_wellplate_location}"]["{dst_well}"]' ) @@ -838,12 +883,16 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: mix_command = mix_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) - mix_command = mix_command.replace("#volume#", str(mix_vol)) + mix_command = mix_command.replace( + "#volume#", str(mix_vol) + ) mix_command = mix_command.replace( "#loc#", f'deck["{dst_wellplate_location}"]["{dst_well}"]', # same as destination ) - mix_command = mix_command.replace("#reps#", str(mix_cycles)) + mix_command = mix_command.replace( + "#reps#", str(mix_cycles) + ) commands.append(mix_command) @@ -864,10 +913,8 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: commands.append("") if isinstance(command_block, Temperature_Set): if type(command_block.change_temp) is not int: - raise Exception( - "temperature for module must be an integer" - ) - + raise Exception("temperature for module must be an integer") + temp_change_command = temp_change_template.replace( "#temp#", str(command_block.change_temp) ) @@ -885,10 +932,10 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: mix_command = mix_command.replace( "#volume#", str(command_block.mix_volume) ) - wellplate_location = self._parse_wellplate_location(command_block.location) - well = command_block.location.split(":")[ - -1 - ] + wellplate_location = self._parse_wellplate_location( + command_block.location + ) + well = command_block.location.split(":")[-1] mix_command = mix_command.replace( "#loc#", f'deck["{wellplate_location}"]["{well}"]' ) @@ -896,7 +943,7 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: "#pipette#", f'pipettes["{pipette_mount}"]' ) commands.append(mix_command) - + else: iter_len = 0 if isinstance(command_block.location, list): @@ -904,27 +951,27 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: command_block.location = command_block.location[0] else: iter_len = len(command_block.location) - + if isinstance(command_block.mix_volume, list): if iter_len != 0 and len(command_block.mix_volume) != iter_len: if len(command_block.mix_volume) == 1: command_block.mix_volume = command_block.mix_volume[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.mix_volume) - + if isinstance(command_block.reps, list): if iter_len != 0 and len(command_block.reps) != iter_len: if len(command_block.reps) == 1: command_block.reps = command_block.reps[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.reps) - + if not isinstance(command_block.location, list): locations = repeat(command_block.location, iter_len) else: @@ -940,25 +987,11 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: else: mix_reps = command_block.reps - for( - loc, - mix_vols, - rep - ) in zip( - locations, - mix_volumes, - mix_reps - ): - mix_command = mix_template.replace( - "#reps#", str(rep) - ) - mix_command = mix_command.replace( - "#volume#", str(mix_vols) - ) + for loc, mix_vols, rep in zip(locations, mix_volumes, mix_reps): + mix_command = mix_template.replace("#reps#", str(rep)) + mix_command = mix_command.replace("#volume#", str(mix_vols)) wellplate_location = self._parse_wellplate_location(loc) - well = loc.split(":")[ - -1 - ] + well = loc.split(":")[-1] mix_command = mix_command.replace( "#loc#", f'deck["{wellplate_location}"]["{well}"]' ) @@ -966,52 +999,39 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: "#pipette#", f'pipettes["{pipette_mount}"]' ) commands.append(mix_command) - if isinstance(command_block, Deactivate): if type(command_block.deactivate) is not bool: - raise Exception( - "deactivate command must be bool" - ) - deactivate_command = deactivate_template.replace( - "#turn_off#", '' - ) + raise Exception("deactivate command must be bool") + deactivate_command = deactivate_template.replace("#turn_off#", "") commands.append(deactivate_command) if isinstance(command_block, Replace_Tip): if type(command_block.replace_tip) is not bool: - raise Exception( - "replace_tip must be bool" - ) + raise Exception("replace_tip must be bool") replace_tip_command = return_tip_template.replace( - "#pipette#", f'pipettes["{pipette_mount}"]' - ) + "#pipette#", f'pipettes["{pipette_mount}"]' + ) commands.append(replace_tip_command) tip_loaded[pipette_mount] = False if isinstance(command_block, Clear_Pipette): if type(command_block.clear) is not bool: - raise Exception( - "clear command must be True or False" - ) + raise Exception("clear command must be True or False") clear_command = drop_tip_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) commands.append(clear_command) tip_loaded[pipette_mount] = False - + if isinstance(command_block, Move_Pipette): if type(command_block.move_to) is not int: - raise Exception( - "Given deck position must be an int" - ) + raise Exception("Given deck position must be an int") if command_block.move_to > 12 or command_block.move_to < 1: - raise Exception( - "number must be a valid deck position 1-12" - ) - #TODO: need to establish pipette mount to move - #pipette_mount = self.resource_manager.pipette_to_mount[] + raise Exception("number must be a valid deck position 1-12") + # TODO: need to establish pipette mount to move + # pipette_mount = self.resource_manager.pipette_to_mount[] move_command = move_template.replace( "#pipette#", f'pipettes["{pipette_mount}"]' ) @@ -1019,9 +1039,6 @@ def _create_commands(self, payload: Optional[Dict]) -> List[str]: "#location#", str(command_block.move_to) ) commands.append(move_command) - - - for mount, status in tip_loaded.items(): if status: @@ -1113,11 +1130,9 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: and type(command_block.drop_tip) is bool and type(command_block.return_tip) is bool ): - yield command_block.volume, command_block.source, command_block.destination, command_block.mix_cycles, command_block.mix_volume, command_block.aspirate_clearance, command_block.dispense_clearance, command_block.blow_out, command_block.drop_tip, command_block.return_tip else: - - # could be one source (either list of volumes or one volume) to many desitnation + # could be one source (either list of volumes or one volume) to many destination # could be many sources (either list of volumes or one volume) to one destination # could be one source/destination, many volumes @@ -1140,7 +1155,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: command_block.source = command_block.source[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.source) if isinstance(command_block.destination, list): @@ -1150,7 +1165,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: command_block.destination = command_block.destination[0] else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) iter_len = len(command_block.destination) @@ -1161,7 +1176,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: command_block.mix_cycles = command_block.mix_cycles[0] else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) iter_len = len(command_block.mix_cycles) @@ -1172,7 +1187,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: command_block.mix_volume = command_block.mix_volume[0] else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) iter_len = len(command_block.mix_volume) @@ -1185,7 +1200,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: ) else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) if isinstance(command_block.dispense_clearance, list): @@ -1197,7 +1212,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: ) else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) if isinstance(command_block.blow_out, list): if iter_len != 0 and len(command_block.blow_out) != iter_len: @@ -1206,7 +1221,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: command_block.blow_out = command_block.blow_out[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.blow_out) @@ -1217,10 +1232,10 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: command_block.drop_tip = command_block.drop_tip[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.drop_tip) - + if isinstance(command_block.return_tip, list): if iter_len != 0 and len(command_block.return_tip) != iter_len: # handle if user forgot to change list of one value to scalar @@ -1228,7 +1243,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: command_block.return_tip = command_block.return_tip[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.return_tip) @@ -1283,7 +1298,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: disp_height, blowout, d_tip, - r_tip + r_tip, ) in zip( volumes, sources, @@ -1299,7 +1314,7 @@ def _process_instruction(self, command_block: CommandBase) -> List[str]: yield vol, src, dst, mix_cycles, mix_vol, asp_height, disp_height, blowout, d_tip, r_tip def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: - """ mimics _proccess_instruction but for multi channel transfers""" + """mimics _process_instruction but for multi channel transfers""" iter_len = 0 if isinstance(command_block.multi_volume, list): @@ -1309,18 +1324,18 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: else: iter_len = len(command_block.multi_volume) if isinstance(command_block.multi_source, list): - #organize into list[list[str]] format + # organize into list[list[str]] format if iter_len != 0 and len(command_block.multi_source) != iter_len: # handle if user forgot to change list of one value to scalar if len(command_block.multi_source) == 1: command_block.multi_source = command_block.multi_source[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) # TODO: need smarter fix for resource importing eventually iter_len = len(command_block.multi_source) - + if isinstance(command_block.multi_destination, list): if iter_len != 0 and len(command_block.multi_destination) != iter_len: # handle if user forgot to change list of one value to scalar @@ -1328,7 +1343,7 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: command_block.multi_destination = command_block.multi_destination[0] else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) iter_len = len(command_block.multi_destination) @@ -1339,7 +1354,7 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: command_block.multi_mix_cycles = command_block.multi_mix_cycles[0] else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) iter_len = len(command_block.multi_mix_cycles) @@ -1350,12 +1365,15 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: command_block.multi_mix_volume = command_block.multi_mix_volume[0] else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) iter_len = len(command_block.multi_mix_volume) if isinstance(command_block.multi_aspirate_clearance, list): - if iter_len != 0 and len(command_block.multi_aspirate_clearance) != iter_len: + if ( + iter_len != 0 + and len(command_block.multi_aspirate_clearance) != iter_len + ): # handle if user forgot to change list of one value to scalar if len(command_block.multi_aspirate_clearance) == 1: command_block.multi_aspirate_clearance = ( @@ -1363,11 +1381,14 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: ) else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) if isinstance(command_block.multi_dispense_clearance, list): - if iter_len != 0 and len(command_block.multi_dispense_clearance) != iter_len: + if ( + iter_len != 0 + and len(command_block.multi_dispense_clearance) != iter_len + ): # handle if user forgot to change list of one value to scalar if len(command_block.multi_dispense_clearance) == 1: command_block.multi_dispense_clearance = ( @@ -1375,7 +1396,7 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: ) else: raise Exception( - "Multiple iterables of differnet lengths found, cannot deterine dimension to iterate over" + "Multiple iterables of different lengths found, cannot determine dimension to iterate over" ) if isinstance(command_block.multi_blow_out, list): if iter_len != 0 and len(command_block.multi_blow_out) != iter_len: @@ -1384,7 +1405,7 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: command_block.multi_blow_out = command_block.multi_blow_out[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.multi_blow_out) @@ -1395,7 +1416,7 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: command_block.multi_drop_tip = command_block.multi_drop_tip[0] else: raise Exception( - "Multiple iterables found, cannot deterine dimension to iterate over" + "Multiple iterables found, cannot determine dimension to iterate over" ) iter_len = len(command_block.multi_drop_tip) @@ -1420,11 +1441,15 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: else: mixing_volume = command_block.multi_mix_volume if not isinstance(command_block.multi_aspirate_clearance, list): - aspirate_clearance = repeat(command_block.multi_aspirate_clearance, iter_len) + aspirate_clearance = repeat( + command_block.multi_aspirate_clearance, iter_len + ) else: aspirate_clearance = command_block.multi_aspirate_clearance if not isinstance(command_block.multi_dispense_clearance, list): - dispense_clearance = repeat(command_block.multi_dispense_clearance, iter_len) + dispense_clearance = repeat( + command_block.multi_dispense_clearance, iter_len + ) else: dispense_clearance = command_block.multi_dispense_clearance if not isinstance(command_block.multi_blow_out, list): @@ -1436,7 +1461,6 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: else: drop_tip = command_block.multi_drop_tip - for ( vol, src, @@ -1461,16 +1485,13 @@ def _process_multi_instruction(self, command_block: CommandBase) -> List[str]: yield vol, src, dst, mix_cycles, mix_vol, asp_height, disp_height, blowout, d_tip - - - def main(args): # noqa: D103 # TODO: Think about how a user would want to interact with this, do they want to interact with something like a # SeqIO from Biopython? Or more like a interpreter kind of thing? That will guide some of this... not sure where # its going right now - ppiler = ProtoPiler(args.config) + protopiler = ProtoPiler(args.config) - ppiler.yaml_to_protocol( + protopiler.yaml_to_protocol( config_path=args.config, protocol_out=args.protocol_out, resource_file=args.resource_in, diff --git a/ot2_driver/protopiler/resource_manager.py b/ot2_driver/protopiler/resource_manager.py index e5ec6f9..7f1786d 100644 --- a/ot2_driver/protopiler/resource_manager.py +++ b/ot2_driver/protopiler/resource_manager.py @@ -28,7 +28,7 @@ class ResourceManager: def __init__( self, - equiment_config: Optional[List[Union[Labware, Pipette]]] = None, + equipment_config: Optional[List[Union[Labware, Pipette]]] = None, resource_file: Optional[PathLike] = None, ) -> None: """This class manages the resources used as specified by a config @@ -42,9 +42,9 @@ def __init__( """ self.init = False - if equiment_config: + if equipment_config: self.load_equipment( - equipment_config=equiment_config, resource_file=resource_file + equipment_config=equipment_config, resource_file=resource_file ) self.init = True @@ -63,7 +63,7 @@ def load_equipment( path to the resource file, will be loaded if exists, by default None """ self.resource_file = resource_file - # setup the necesary data relationships + # setup the necessary data relationships self._generate_location_name_relationships(equipment_config=equipment_config) # setup the resource tracker, if exists leave as is, else, create it @@ -288,39 +288,40 @@ def get_next_tip(self, pipette_name: str, tip_num: int) -> str: # leveraging the 0 indexing of the rack and the 1 indexing of the count # has to be a str because of the protocol writing - if tip_num ==1: #TODO:, maybe change to if using single channel + if tip_num == 1: # TODO:, maybe change to if using single channel # next_tip = str(self.resources[loc]["used"]) # TODO: need to change, pick up from bottom row for i in range(96): - curr_tip = i if str(i) not in self.resources[loc]["wells_used"]: next_tip = str(i) break self.update_tip_count(loc, next_tip, tip_num) # multi transfer, need to determine uppermost tip pick up point # iterate through "wells_used", find "tip_num" number of tips all still in same column - elif tip_num >1: + elif tip_num > 1: next_tip = self.find_multi_pickup_spot(loc, tip_num) self.update_tip_count(loc, next_tip, tip_num) - # update usage - # self.resources[loc]["used"] += tip_num - # if self.resources[loc]["used"] == capacity: - # self.resources[loc]["depleted"] = True + # update usage + # self.resources[loc]["used"] += tip_num + # if self.resources[loc]["used"] == capacity: + # self.resources[loc]["depleted"] = True next_tip = next_tip[0] return loc, next_tip raise Exception(f"Not enough tips found for '{pipette_name}'...") - - def find_multi_pickup_spot(self, loc: str, tip_num: int) -> str: #TODO iterate through locations not just fixed at one - """ Finds best location to pick up tip for multi transfer - + + def find_multi_pickup_spot( + self, loc: str, tip_num: int + ) -> str: # TODO iterate through locations not just fixed at one + """Finds best location to pick up tip for multi transfer + Parameters ---------- loc: str deck location of chosen tip rack tip_num: int number of tips to be mounted on pipette - + Returns ------- well: int @@ -328,54 +329,53 @@ def find_multi_pickup_spot(self, loc: str, tip_num: int) -> str: #TODO iterate t """ # iterate through sets of tip_num throughout tip rack, checking if values are located in wells_used tips = [] - good = False # signifies valid tip group - for i in range(97-tip_num): # TODO: need more intelligent, faster, way to do this + good = False # signifies valid tip group + for i in range( + 97 - tip_num + ): # TODO: need more intelligent, faster, way to do this tips = [] for j in range(tip_num): - tips.append(i+j) + tips.append(i + j) # check if any tips selected are in wells_used for p in range(len(tips)): if str(tips[p]) in self.resources[loc]["wells_used"]: tips = [] break - for q in range(len(tips)-1): + for q in range(len(tips) - 1): # check if tip grouping exceeds column - if tips[q]%8 == 7: # bottom of column + if tips[q] % 8 == 7: # bottom of column tips = [] break else: - #check if rest of wells below tips are empty - if tips[-1]%8 == 7: # already reaches bottom of tip rack, valid group + # check if rest of wells below tips are empty + if ( + tips[-1] % 8 == 7 + ): # already reaches bottom of tip rack, valid group good = True - + else: - bottom = tips[-1] # bottom of tip group + bottom = tips[-1] # bottom of tip group done = False t = 1 - while done == False: - curr_val = bottom+t - if str(curr_val) in self.resources[loc]["wells_used"]: # check if tip in next position - t = t+1 - if curr_val%8 == 7: # is bottom of column + while not done: + curr_val = bottom + t + if ( + str(curr_val) in self.resources[loc]["wells_used"] + ): # check if tip in next position + t = t + 1 + if curr_val % 8 == 7: # is bottom of column done = True good = True - else: # tips below group, move on + else: # tips below group, move on done = True continue break - if good == True: + if good: break if tips == []: raise Exception("No available group of tips for multi dispensing") return tips - - - - - - - def update_tip_count(self, loc, well, tip_num) -> None: """Tell the resource manager a new tip has been used @@ -407,7 +407,6 @@ def update_tip_count(self, loc, well, tip_num) -> None: if self.resources[loc]["used"] == capacity: self.resources[loc]["depleted"] = True - def update_tip_usage(self, pipette_name: str) -> None: """Tell the resource manager a new tip has been used @@ -451,7 +450,7 @@ def update_well_usage(self, location: str, well: Optional[str] = None) -> None: the location of the well on the wellplate, by default None """ if well: - if type(well) == list: + if isinstance(well, list): for i in range(len(well)): self.resources[location]["wells_used"].add(well[i]) @@ -505,7 +504,7 @@ def find_valid_tipracks(self, pipette_name: str) -> List[str]: return valid_tipracks def determine_pipette(self, target_volume: int, is_multi: bool) -> str: - """Determines which pippette to use for a given volume + """Determines which pipette to use for a given volume Parameters ---------- @@ -521,13 +520,10 @@ def determine_pipette(self, target_volume: int, is_multi: bool) -> str: min_available = float("inf") pip_volume_pattern = re.compile(r"\d{2,}") for mount, name in self.mount_to_pipette.items(): - - - pip_volume = int(pip_volume_pattern.search(name).group()) # TODO: make sure the pipettes can handle the max they are labeled as - if is_multi == True: + if is_multi: if "multi" in name: if pip_volume >= target_volume: if pip_volume < min_available: @@ -541,9 +537,6 @@ def determine_pipette(self, target_volume: int, is_multi: bool) -> str: min_available = pip_volume pipette = mount - - - return pipette def dump_resource_json(self, out_file: Optional[PathLike] = None) -> str: @@ -575,7 +568,10 @@ def dump_resource_json(self, out_file: Optional[PathLike] = None) -> str: else: out_path = self.resource_file else: - out_path = out_file+f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_resources.json" + out_path = ( + out_file + + f"{datetime.now().strftime('%Y%m%d-%H%M%S')}_resources.json" + ) with open(out_path, "w") as f: json.dump(out_resources, f, indent=2) @@ -586,7 +582,7 @@ def dump_resource_json(self, out_file: Optional[PathLike] = None) -> str: def main(args): # noqa: D103 config = ProtocolConfig.from_yaml(args.config) rm = ResourceManager( - equiment_config=config.equipment, resource_file=args.resource_file + equipment_config=config.equipment, resource_file=args.resource_file ) print(rm.resources) diff --git a/ot2_driver/protopiler/test_configs/basic_config.py b/ot2_driver/protopiler/test_configs/basic_config.py index c2d8a76..15df90e 100644 --- a/ot2_driver/protopiler/test_configs/basic_config.py +++ b/ot2_driver/protopiler/test_configs/basic_config.py @@ -9,7 +9,6 @@ def run(protocol: protocol_api.ProtocolContext): - deck = {} pipettes = {} diff --git a/package.xml b/package.xml deleted file mode 100644 index 949dfb6..0000000 --- a/package.xml +++ /dev/null @@ -1,20 +0,0 @@ - - - - ot2_driver - 0.0.0 - TODO: Package description - rpl - TODO: License declaration - - - ament_copyright - ament_flake8 - ament_pep257 - python3-pytest - - - ament_python - - - diff --git a/requirements.txt b/requirements.txt index add0ae5..8c4bb47 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ paramiko==2.8 cryptography==36.0.2 requests pandas +fastapi==0.103.2 diff --git a/resource/ot2_driver b/resource/ot2_driver deleted file mode 100644 index e69de29..0000000 diff --git a/scripts/delete_all_runs.py b/scripts/delete_all_runs.py index ab2146a..d4dbd55 100644 --- a/scripts/delete_all_runs.py +++ b/scripts/delete_all_runs.py @@ -1,6 +1,7 @@ -import requests from argparse import ArgumentParser +import requests + def main(args): base_url = "http://{ip_address}:31950/{extension}" @@ -22,7 +23,9 @@ def main(args): if run["status"] != "running" and run["current"] != "true": delete_resp = requests.delete( - url=base_url.format(ip_address=args.ip_address, extension=f"runs/{run_id}"), + url=base_url.format( + ip_address=args.ip_address, extension=f"runs/{run_id}" + ), headers=headers, ) else: @@ -30,7 +33,9 @@ def main(args): continue if delete_resp.status_code != 200: - print(f"Could not delete run with ID {run_id}, response: {delete_resp.json()}") + print( + f"Could not delete run with ID {run_id}, response: {delete_resp.json()}" + ) else: print(f"Run {run_id} deleted...") @@ -38,8 +43,12 @@ def main(args): if __name__ == "__main__": - parser = ArgumentParser(description="Delete all runs stored on an ot2 with a given ip address") - parser.add_argument("-ip", "--ip_address", help="Robot IP to delete all runs from", type=str) + parser = ArgumentParser( + description="Delete all runs stored on an ot2 with a given ip address" + ) + parser.add_argument( + "-ip", "--ip_address", help="Robot IP to delete all runs from", type=str + ) args = parser.parse_args() main(args) diff --git a/scripts/ot2_rest_client.py b/scripts/ot2_rest_client.py index 3450576..a851beb 100644 --- a/scripts/ot2_rest_client.py +++ b/scripts/ot2_rest_client.py @@ -1,55 +1,30 @@ #! /usr/bin/env python3 -"""OT2 Node""" -import os +"""The server for the OT2 that takes incoming WEI flow requests from the experiment application""" import glob import json +import os +import time import traceback -import yaml -from typing import List, Tuple -from pathlib import Path -from datetime import datetime +from argparse import ArgumentParser +from contextlib import asynccontextmanager from copy import deepcopy -import time - - - - -from ot2_driver.ot2_driver_http import OT2_Config, OT2_Driver -import opentrons.simulate -from opentrons.simulate import format_runlog +from datetime import datetime +from pathlib import Path from urllib.error import HTTPError, URLError -from urllib3.exceptions import ConnectionError, ConnectTimeoutError -from urllib3.connection import HTTPException, HTTPConnection -import requests - -#! /usr/bin/env python3 - - - - -from time import sleep -import threading -import asyncio - -from time import sleep -import json - -from threading import Thread - -"""The server that takes incoming WEI flow requests from the experiment application""" -import json -from argparse import ArgumentParser -from contextlib import asynccontextmanager -import time -from fastapi import FastAPI, File, Form, UploadFile +import requests +import yaml +from fastapi import FastAPI from fastapi.responses import JSONResponse +from urllib3.exceptions import ConnectTimeoutError + +from ot2_driver.ot2_driver_http import OT2_Config, OT2_Driver workcell = None -global sealer, state -serial_port = '/dev/ttyUSB0' -local_ip = 'parker.alcf.anl.gov' -local_port = '8000' +global state +serial_port = "/dev/ttyUSB0" +local_ip = "parker.alcf.anl.gov" +local_port = "8000" global ot2 resources_folder_path = "" @@ -58,58 +33,61 @@ resource_file_path = "" ip = "" + def check_protocols_folder(): - """ - Description: Checks if the protocols folder path exists. Creates the resource folder path if it doesn't already exists - """ - global protocols_folder_path - isPathExist = os.path.exists(protocols_folder_path) - if not isPathExist: - os.makedirs(protocols_folder_path) - + """ + Description: Checks if the protocols folder path exists. Creates the resource folder path if it doesn't already exist + """ + global protocols_folder_path + isPathExist = os.path.exists(protocols_folder_path) + if not isPathExist: + os.makedirs(protocols_folder_path) + def check_resources_folder(): - """ - Description: Checks if the resources folder path exists. Creates the resource folder path if it doesn't alIDLE exists - """ - global resources_folder_path - isPathExist = os.path.exists(resources_folder_path) - if not isPathExist: - os.makedirs(resources_folder_path) - #get_lresource_file_pathth.exists(protocols_folder_path) - if not isPathExist: - os.makedirs(protocols_folder_path) - # get_logger().warn("Protocols path doesn't exists") - print("Creating: " + protocols_folder_path) + """ + Description: Checks if the resources folder path exists. Creates the resource folder path if it doesn't already exist + """ + global resources_folder_path + isPathExist = os.path.exists(resources_folder_path) + if not isPathExist: + os.makedirs(resources_folder_path) + if not isPathExist: + os.makedirs(protocols_folder_path) + print("Creating: " + protocols_folder_path) + def connect_robot(): - global ot2, state, node_name, ip - try: - print(ip) - ot2 = OT2_Driver(OT2_Config(ip = ip)) + global ot2, state, node_name, ip + try: + print(ip) + ot2 = OT2_Driver(OT2_Config(ip=ip)) + state = "IDLE" - except ConnectTimeoutError as connection_err: - state = "ERROR" - print("Connection error code: " + connection_err) - - except HTTPError as http_error: - print("HTTP error code: " + http_error) - - except URLError as url_err: - print("Url error code: " + url_err) - - except requests.exceptions.ConnectionError as conn_err: - print("Connection error code: "+ str(conn_err)) - - except Exception as error_msg: - state = "ERROR" - print("-------" + str(error_msg) + " -------") + except ConnectTimeoutError as connection_err: + state = "ERROR" + print("Connection error code: " + connection_err) - else: - print(str(node_name) + " online") -def download_config_files( protocol_config: str, resource_config = None): + except HTTPError as http_error: + print("HTTP error code: " + http_error) + + except URLError as url_err: + print("Url error code: " + url_err) + + except requests.exceptions.ConnectionError as conn_err: + print("Connection error code: " + str(conn_err)) + + except Exception as error_msg: + state = "ERROR" + print("-------" + str(error_msg) + " -------") + + else: + print(str(node_name) + " online") + + +def download_config_files(protocol_config: str, resource_config=None): """ - Saves protocol_config string to a local yaml file locaton + Saves protocol_config string to a local yaml file location Parameters: ----------- @@ -124,19 +102,14 @@ def download_config_files( protocol_config: str, resource_config = None): global node_name, resource_file_path config_dir_path = Path.home().resolve() / protocols_folder_path config_dir_path.mkdir(exist_ok=True, parents=True) - + resource_dir_path = Path.home().resolve() / resources_folder_path resource_dir_path.mkdir(exist_ok=True, parents=True) - - time_str = datetime.now().strftime('%Y%m%d-%H%m%s') - config_file_path = ( - config_dir_path - / f"protocol-{time_str}.yaml" - ) - print( - "Writing protocol config to {} ...".format(str(config_file_path)) - ) + time_str = datetime.now().strftime("%Y%m%d-%H%m%s") + config_file_path = config_dir_path / f"protocol-{time_str}.yaml" + + print("Writing protocol config to {} ...".format(str(config_file_path))) with open(config_file_path, "w", encoding="utf-8") as pc_file: yaml.dump(protocol_config, pc_file, indent=4, sort_keys=False) @@ -148,11 +121,12 @@ def download_config_files( protocol_config: str, resource_config = None): return config_file_path, resource_file_path else: return config_file_path, None - -def execute(protocol_path, payload=None, resource_config = None): + + +def execute(protocol_path, payload=None, resource_config=None): """ Compiles the yaml at protocol_path into .py file; - Transfers and Exececutes the .py file + Transfers and Executes the .py file Parameters: ----------- @@ -170,33 +144,29 @@ def execute(protocol_path, payload=None, resource_config = None): ( protocol_file_path, resource_file_path, - ) = ot2.compile_protocol(protocol_path, payload=payload, resource_file = resource_config, resource_path = resources_folder_path, protocol_out_path = protocols_folder_path) + ) = ot2.compile_protocol( + protocol_path, + payload=payload, + resource_file=resource_config, + resource_path=resources_folder_path, + protocol_out_path=protocols_folder_path, + ) protocol_file_path = Path(protocol_file_path) print(f"{protocol_file_path.resolve()=}") protocol_id, run_id = ot2.transfer(protocol_file_path) print("OT2 " + node_name + " protocol transfer successful") resp = ot2.execute(run_id) - print("OT2 "+ node_name +" executed a protocol") - # get_logger().warn(str(resp)) + print("OT2 " + node_name + " executed a protocol") if resp["data"]["status"] == "succeeded": # poll_OT2_until_run_completion() - response_msg = "OT2 "+ node_name +" successfully IDLE running a protocol" + response_msg = "OT2 " + node_name + " successfully IDLE running a protocol" return True, response_msg - else: - response_msg = "OT2 "+ node_name +" failed running a protocol" + else: + response_msg = "OT2 " + node_name + " failed running a protocol" return False, response_msg - - # except FileNotFoundError: - # from pathlib import Path - - # response_msg = "Could not find protocol config file at {}, {}".format(protocol_path, Path(protocol_path).exists()) - # print(response_msg) - # stateCallback() - except Exception as err: - if "no route to host" in str(err.args).lower(): response_msg = "No route to host error. Ensure that this container \ has network access to the robot and that the environment \ @@ -208,34 +178,26 @@ def execute(protocol_path, payload=None, resource_config = None): print(response_msg) return False, response_msg - # rclpy.shutdown() ## TODO: Could alternatively indent into the if block. - ## TODO: Changed to as is to forestall any unexpected exceptions def poll_OT2_until_run_completion(): """Queries the OT2 run state until reported as 'succeeded'""" - global run_id + global run_id, state print("Polling OT2 run until completion") while state != "IDLE": - run_status = ot2.get_run(run_id) - if ( - run_status["data"]["status"] - and run_status["data"]["status"] == "succeeded" - ): + if run_status["data"]["status"] and run_status["data"]["status"] == "succeeded": state = "IDLE" print("Stopping Poll") - elif ( - run_status["data"]["status"] - and run_status["data"]["status"] == "running" - ): + elif run_status["data"]["status"] and run_status["data"]["status"] == "running": state = "BUSY" + @asynccontextmanager async def lifespan(app: FastAPI): - global ot2, state, node_name, resources_folder_path, protocols_folder_path, ip - """Initial run function for the app, parses the worcell argument + global ot2, state, node_name, resources_folder_path, protocols_folder_path, ip + """Initial run function for the app, parses the workcell argument Parameters ---------- app : FastApi @@ -244,144 +206,152 @@ async def lifespan(app: FastAPI): Returns ------- None""" - parser = ArgumentParser() - parser.add_argument("--alias", type=str, help="Name of the Node") - parser.add_argument("--host", type=str, help="Host for rest") - parser.add_argument("--ot2_ip", type=str, help="ip value") - parser.add_argument("--port", type=int, help="port value") - args = parser.parse_args() - node_name = args.alias - ip = args.ot2_ip - state = "UNKNOWN" - resources_folder_path = '/home/rpl/.ot2_temp/' + node_name + "/" + "resources/" - protocols_folder_path = '/home/rpl/.ot2_temp/' + node_name + "/" + "protocols/" - check_resources_folder() - check_protocols_folder() - connect_robot() - state = "IDLE" - description = { - "name": node_name, - "type": "", - "actions": { - "execute": "config : %s", ## takes in the yaml content as second string arg - "run_protocol": "config_path: %s", ## Temp inclusion - }, - } - yield - pass + parser = ArgumentParser() + parser.add_argument("--alias", type=str, help="Name of the Node") + parser.add_argument("--host", type=str, help="Host for rest") + parser.add_argument("--ot2_ip", type=str, help="ip value") + parser.add_argument("--port", type=int, help="port value") + args = parser.parse_args() + node_name = args.alias + ip = args.ot2_ip + state = "UNKNOWN" + resources_folder_path = "/home/rpl/.ot2_temp/" + node_name + "/" + "resources/" + protocols_folder_path = "/home/rpl/.ot2_temp/" + node_name + "/" + "protocols/" + check_resources_folder() + check_protocols_folder() + connect_robot() + yield + pass + +app = FastAPI( + lifespan=lifespan, +) -app = FastAPI(lifespan=lifespan, ) @app.get("/state") def get_state(): - global sealer - return JSONResponse(content={"State":state}) + global state + return JSONResponse(content={"State": state}) + @app.get("/description") async def description(): global state return JSONResponse(content={"State": state}) + @app.get("/resources") async def resources(): global resource_file_path resource_info = "" - if not(resource_file_path == ""): - with open(resource_file_path) as f: - resource_info = f.read() + if not (resource_file_path == ""): + with open(resource_file_path) as f: + resource_info = f.read() return JSONResponse(content={"State": resource_info}) @app.post("/action") -def do_action( - action_handle: str, - action_vars -): - global ot2, state - response={"action_response": "", "action_msg": "", "action_log": ""} +def do_action(action_handle: str, action_vars): + global ot2, state + response = {"action_response": "", "action_msg": "", "action_log": ""} + if state == "ERROR": + # Try to reconnect + check_resources_folder() + check_protocols_folder() + connect_robot() if state == "ERROR": - msg = "Can not accept the job! OT2 CONNECTION ERROR" - # get_logger.error(msg) - response["action_response"] = -1 - response["action_msg"] = msg - return response - - while state != "IDLE": - # get_logger().warn("Waiting for OT2 to switch IDLE state...") - time.sleep(0.5) - - state="BUSY" - action_command = action_handle - action_vars = json.loads(action_vars) - print(f"{action_vars=}") - - print(f"In action callback, command: {action_command}") - - if "run_protocol" == action_command: - - protocol_config = action_vars.get("config_path", None) - resource_config = action_vars.get("resource_path", None) #TODO: This will be enbaled in the future - resource_file_flag = action_vars.get("use_existing_resources", "False") #Returns True to use a resource file or False to not use a resource file. - - if resource_file_flag: - try: - list_of_files = glob.glob(resources_folder_path + '*.json') #Get list of files - if len(list_of_files) > 0: - resource_config = max(list_of_files, key=os.path.getctime) #Finding the latest added file - print("Using the resource file: " + resource_config) - - except Exception as er: - print(er) - if protocol_config: - config_file_path, resource_config_path = download_config_files(protocol_config, resource_config) - payload = deepcopy(action_vars) - payload.pop("config_path") - - print(f"ot2 {payload=}") - print(f"config_file_path: {config_file_path}") - - response_flag, response_msg = execute(config_file_path, payload, resource_config_path) - - if response_flag == True: - state = "IDLE" - response["action_response"] = 0 - response["action_msg"] = response_msg - #if resource_config_path: - # response.resources = str(resource_config_path) - - elif response_flag == False: - state = "ERROR" - response["action_response"] = -1 - response["action_msg"] = response_msg - #if resource_config_path: - # response.resources = str(resource_config_path) - - print("Finished Action: " + action_handle) - return response - - else: - response["action_msg"] = ( - "Required 'config' was not specified in action_vars" - ) - response["action_response"] = -1 - print(response["action_msg"]) + msg = "Can not accept the job! OT2 CONNECTION ERROR" + response["action_response"] = -1 + response["action_msg"] = msg + return response + + while state != "IDLE": + # get_logger().warn("Waiting for OT2 to switch IDLE state...") + time.sleep(0.5) + + state = "BUSY" + action_command = action_handle + action_vars = json.loads(action_vars) + print(f"{action_vars=}") + + print(f"In action callback, command: {action_command}") + + if "run_protocol" == action_command: + protocol_config = action_vars.get("config_path", None) + resource_config = action_vars.get( + "resource_path", None + ) # TODO: This will be enabled in the future + resource_file_flag = action_vars.get( + "use_existing_resources", "False" + ) # Returns True to use a resource file or False to not use a resource file. + + if resource_file_flag: + try: + list_of_files = glob.glob( + resources_folder_path + "*.json" + ) # Get list of files + if len(list_of_files) > 0: + resource_config = max( + list_of_files, key=os.path.getctime + ) # Finding the latest added file + print("Using the resource file: " + resource_config) + + except Exception as er: + print(er) + if protocol_config: + config_file_path, resource_config_path = download_config_files( + protocol_config, resource_config + ) + payload = deepcopy(action_vars) + payload.pop("config_path") + + print(f"ot2 {payload=}") + print(f"config_file_path: {config_file_path}") + + response_flag, response_msg = execute( + config_file_path, payload, resource_config_path + ) + + if response_flag: + state = "IDLE" + response["action_response"] = 0 + response["action_msg"] = response_msg + # if resource_config_path: + # response.resources = str(resource_config_path) + + elif not response_flag: state = "ERROR" + response["action_response"] = -1 + response["action_msg"] = response_msg + # if resource_config_path: + # response.resources = str(resource_config_path) + + print("Finished Action: " + action_handle) + return response - return response else: - msg = "UNKOWN ACTION REQUEST! Available actions: run_protocol" + response[ + "action_msg" + ] = "Required 'config' was not specified in action_vars" response["action_response"] = -1 - response["action_msg"]= msg - print('Error: ' + msg) - state = "IDLE" + print(response["action_msg"]) + state = "ERROR" return response - + else: + msg = "UNKNOWN ACTION REQUEST! Available actions: run_protocol" + response["action_response"] = -1 + response["action_msg"] = msg + print("Error: " + msg) + state = "IDLE" + + return response if __name__ == "__main__": import uvicorn + parser = ArgumentParser() parser.add_argument("--alias", type=str, help="Name of the Node") parser.add_argument("--host", type=str, help="Host for rest") @@ -390,4 +360,10 @@ def do_action( args = parser.parse_args() node_name = args.alias ip = args.ot2_ip - uvicorn.run("ot2_rest_client:app", host=args.host, port=args.port, reload=False, ws_max_size=100000000000000000000000000000000000000) + uvicorn.run( + "ot2_rest_client:app", + host=args.host, + port=args.port, + reload=False, + ws_max_size=100000000000000000000000000000000000000, + ) diff --git a/setup.cfg b/setup.cfg index 0ccac8b..baf782e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -16,7 +16,7 @@ install_scripts=$base/lib/ot2_driver [options] packages = find: python_requires = >=3.6 -install_requires = +install_requires = opentrons pydantic==1.8.2 pyyaml diff --git a/setup.py b/setup.py index 96560aa..af054d3 100644 --- a/setup.py +++ b/setup.py @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- from pathlib import Path + from setuptools import setup from setuptools.config import read_configuration diff --git a/tests/test_ot2_streaming.py b/tests/test_ot2_streaming.py index 209d258..3b4f038 100644 --- a/tests/test_ot2_streaming.py +++ b/tests/test_ot2_streaming.py @@ -1,7 +1,9 @@ # TODO: this is not ready to actually be considered a test yet... from argparse import ArgumentParser from pathlib import Path -from ot2_driver.ot2_driver_http import OT2_Driver, OT2_Config + +from ot2_driver.ot2_driver_http import OT2_Config, OT2_Driver + # cant name it test because it does not actually test anything right now... def streaming_t(ot2: OT2_Driver):