diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0e70236..db41200 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,6 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.5.0 + rev: v4.6.0 hooks: - id: check-yaml - id: check-json @@ -12,12 +12,12 @@ repos: - id: end-of-file-fixer - id: trailing-whitespace - repo: https://github.com/kynan/nbstripout - rev: 0.6.1 + rev: 0.7.1 hooks: - id: nbstripout - repo: https://github.com/astral-sh/ruff-pre-commit # Ruff version. - rev: v0.1.10 + rev: v0.4.10 hooks: # Run the linter. - id: ruff diff --git a/Dockerfile b/Dockerfile index e0d2fb8..7cf5dc7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM ghcr.io/ad-sdl/wei +FROM ghcr.io/ad-sdl/wei:v0.5.9 LABEL org.opencontainers.image.source=https://github.com/AD-SDL/hudson_platecrane_module LABEL org.opencontainers.image.description="Drivers and REST API's for the Hudson Platecrane and Sciclops robots" diff --git a/Makefile b/Makefile index b7d26ba..0c84d78 100644 --- a/Makefile +++ b/Makefile @@ -1,14 +1,11 @@ # Python Configuration PYPROJECT_TOML := pyproject.toml -PROJECT_VERSION := $(shell grep -oP '(?<=version = ")[^"]+' $(PYPROJECT_TOML) | head -n 1) .DEFAULT_GOAL := init .PHONY += init paths checks test clean init: # Do the initial configuration of the project @test -e .env || cp example.env .env - @sed -i 's/^PROJECT_VERSION=.*/PROJECT_VERSION=$(PROJECT_VERSION)/' .env - @sed -i 's/^PROJECT_PATH=.*/PROJECT_PATH=$(shell pwd | sed 's/\//\\\//g')/' .env .env: init diff --git a/example.env b/example.env index 6f16d0b..abe66f8 100644 --- a/example.env +++ b/example.env @@ -1,10 +1,9 @@ # Note: all paths are relative to the docker compose file DEVICE=/dev/ttyUSB2 -PROJECT_PATH= -PROJECT_VERSION=1.2.0 +PROJECT_VERSION=1.3.0 WEI_DATA_DIR=~/.wei WORKCELL_FILENAME=test_workcell.yaml -WORKCELLS_DIR=${PROJECT_PATH}/tests/workcell_defs +WORKCELLS_DIR=./tests/workcell_defs IMAGE=ghcr.io/ad-sdl/hudson_platecrane_module DOCKERFILE=Dockerfile REDIS_DIR=~/.wei/redis diff --git a/pyproject.toml b/pyproject.toml index 03554eb..636d562 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "hudson_platecrane_module" -version = "1.0.0" +version = "1.4.1" description = "Driver for the Platecrane and Sciclops" authors = [ {name = "Ryan D. Lewis", email="ryan.lewis@anl.gov"}, @@ -17,6 +17,7 @@ dependencies = [ "libusb", "pyserial", "ad_sdl.wei", + "pydantic>=2.7", "pytest" ] requires-python = ">=3.8.1" diff --git a/src/platecrane_driver/__init__.py b/src/platecrane_driver/__init__.py index 4501616..e44b21d 100644 --- a/src/platecrane_driver/__init__.py +++ b/src/platecrane_driver/__init__.py @@ -1,2 +1,3 @@ """Driver for Hudson Robotics PlateCranes.""" + __version__ = "0.1.0" diff --git a/src/platecrane_driver/error_codes.py b/src/platecrane_driver/error_codes.py index 91d4229..2efa23c 100644 --- a/src/platecrane_driver/error_codes.py +++ b/src/platecrane_driver/error_codes.py @@ -1,4 +1,5 @@ """Defines exceptions for error codes returned by the plate crane controller.""" + from __future__ import annotations diff --git a/src/platecrane_driver/plate_resources.json b/src/platecrane_driver/plate_resources.json index 9d9dc0c..5eaab88 100644 --- a/src/platecrane_driver/plate_resources.json +++ b/src/platecrane_driver/plate_resources.json @@ -10,10 +10,10 @@ "96_well":{ "plate_above_height": 800, - "plate_pick_steps_module": 1500, - "plate_pick_steps_stack": 2400, + "plate_pick_steps_module": 1700, + "plate_pick_steps_stack": 2700, "plate_lid_steps": 1300, - "lid_destination_height": 1400 + "lid_destination_height": 1300 }, "96_deep_well":{ @@ -50,9 +50,9 @@ }, "test_96_well":{ "plate_above_height": 800, - "plate_pick_steps_module": 1500, - "plate_pick_steps_stack": 2400, - "plate_lid_steps": 1000, + "plate_pick_steps_module": 1700, + "plate_pick_steps_stack": 1500, + "plate_lid_steps": 1300, "lid_destination_height": 1300 } diff --git a/src/platecrane_driver/platecrane_driver.py b/src/platecrane_driver/platecrane_driver.py index 80c0a08..e9224d2 100644 --- a/src/platecrane_driver/platecrane_driver.py +++ b/src/platecrane_driver/platecrane_driver.py @@ -1,145 +1,83 @@ """Handle Proper Interfacing with the PlateCrane""" -import json + import re -from pathlib import Path +import time + +from platecrane_driver.resource_defs import locations, plate_definitions +from platecrane_driver.resource_types import PlateResource +from platecrane_driver.serial_port import ( + SerialPort, # use when running through WEI REST clients +) + +# from serial_port import SerialPort # use when running through the driver +# from resource_defs import locations, plate_definitions +# from resource_types import PlateResource + -from platecrane_driver.serial_port import SerialPort -#from serial_port import SerialPort +""" +# TODOs: + * combine two initialization functions + * Look into how to slow speed of stack pick and place + * should we be using error_codes.py to be doing some of the error checking/raising + + * Crash error outputs 21(R axis),14(z axis), 02 Wrong location name. 1400 (Z axis hits the plate), 00 success + * Need a response handler function. Unknown error messages T1, ATS, TU these are about connection issues (multiple access?) + * Maybe create a plate detect function within pick stack plate function +""" class PlateCrane: - """ - Description: - Python interface that allows remote commands to be executed to the plate_crane. - """ + """Python interface that allows remote commands to be executed to the plate_crane.""" __serial_port: SerialPort - def __init__(self, host_path="/dev/ttyUSB2", baud_rate=9600): - """[Summary] + def __init__(self, host_path="/dev/ttyUSB4", baud_rate=9600): + """Initialization function + + Args: + host_path (str): usb path of PlateCrane EX device + baud_rate (int): baud rate to use for communication with the PlateCrane EX device - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + Returns: + None """ + # define variables self.__serial_port = SerialPort(host_path=host_path, baud_rate=baud_rate) self.robot_error = "NO ERROR" self.status = 0 self.error = "" - self.gripper_length = 0 - self.plate_above_height = 700 - self.plate_pick_steps_stack = 1600 - self.plate_pick_steps_module = 1400 - self.plate_lid_steps = 800 - self.lid_destination_height = 1400 - - self.stack_exchange_Z_height = -31887 - self.stack_exchange_Y_axis_steps = 200 # TODO: Find the correct number of steps to move Y axis from the stack to the exchange location - self.exchange_location = "LidNest2" self.robot_status = "" self.movement_state = "READY" self.platecrane_current_position = None - self.plate_resources = json.load( - open(Path(__file__).parent / "plate_resources.json") - ) - self.stack_resources = json.load( - open(Path(__file__).parent / "stack_resources.json") - ) - + # initialize actions self.initialize() def initialize(self): - """[Summary] - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ - + """Initialization actions""" self.get_status() if self.robot_status == "0": self.home() self.platecrane_current_position = self.get_position() - def home(self, timeout=28): + def home(self): """Homes all of the axes. Returns to neutral position (above exchange) + Args: + timeout (int): Seconds to wait for plate crane response after sending serial command, defaults to 28 seconds. - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + Returns: + None """ # Moves axes to home position command = "HOME\r\n" - self.__serial_port.send_command(command, timeout) - - def get_robot_movement_state(self): - """Summary - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ - - # current_postion = self.get_position() - # print(current_postion) - # print(self.platecrane_current_position) - # if self.platecrane_current_position != current_postion: - # self.movement_state = "BUSY" - # self.platecrane_current_position = current_postion - # else: - # self.movement_state = "READY" - # print(self.movement_state) - self.movement_state = "READY" - - def wait_robot_movement(self): - """Summary - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ - - self.get_robot_movement_state() - if self.movement_state != "READY": - self.wait_robot_movement() + self.__serial_port.send_command(command, timeout=60) def get_status(self): - """Checks status of plate_crane - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ - + """Checks status of plate_crane""" command = "STATUS\r\n" self.robot_status = self.__serial_port.send_command(command) @@ -153,17 +91,22 @@ def lock_joints(self): command = "limp FALSE\r\n" self.__serial_port.send_command(command) - def get_location_list(self): - """Checks status of plate_crane - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + def set_speed(self, speed: int): + """Sets the speed of the plate crane arm. + + Args: + speed (int): (units = % of full speed) Speed at which to move the PlateCrane EX. Appies to all axes + + Returns: + None """ + command = "SPEED " + str(speed) + self.__serial_port.send_command(command, timeout=0, delay=1) + self.get_position() + print(f"SPEED SET TO {speed}%") + + def get_location_list(self): + """Displays all location information stored in the Plate Crane EX robot's memory""" command = "LISTPOINTS\r\n" out_msg = self.__serial_port.send_command(command) @@ -180,45 +123,24 @@ def get_location_list(self): print("Error in get_status") self.robot_error = err - def __update_locations(self, robot_onboard: list, known_locations: list) -> None: - """Checks the location database on the robot and saves the missing locations to robot onboard + def get_location_joint_values(self, location: str = None) -> list: + """Returns list of 4 joint values associated with a position name - :param robot_onboard: List of string locations that are saved on the robot - :type robot_onboard: list + Note: right now this returns the joint values stored in the + PlateCrane EX device memory, not the locations stored in + resource_defs.py - :param known_locations: List of known locations that should exist on robot database - :type known_locations: list + TODO: delete this function or associate it with resource_defs.py - :return: None + Args: + location (str): Name of location - """ - - for loc in known_locations: - if loc not in robot_onboard: - loc_values = loc.replace(",", "").split( - " " - ) # Removing the ',' caracter - loc_values[0] = loc_values[0][ - loc_values[0].find(":") + 1 : - ] # Removing the index number from the location name - self.set_location( - loc_values[0], - int( - loc_values[1], - int(loc_values[2], int(loc_values[3]), int(loc_values[4])), - ), - ) - - def get_location_joint_values(self, location: str = None) -> list: - """Checks status of plate_crane - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + Returns: + joint_values ([int]): [R, Z, P, Y] joint values + - R (base rotation) + - Z (arm vertical axis) + - P (gripper rotation) + - Y (arm extension) """ command = "GETPOINT " + location + "\r\n" @@ -229,49 +151,34 @@ def get_location_joint_values(self, location: str = None) -> list: return joint_values def get_position(self) -> list: - """ - Requests and stores plate_crane position. - Coordinates: - Z: Vertical axis - R: Base turning axis - Y: Extension axis - P: Gripper turning axis - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ - - command = "GETPOS\r\n" - current_position = list(self.__serial_port.send_command(command).split(" ")) - current_position = [eval(x.strip(",")) for x in current_position] - - return current_position + """Returns list of joint values for current position of the PlateCrane EX arm - def get_safe_height_jog_steps(self, location: list) -> int: - """Summary + Args: + None - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + Returns: + current_position ([int]): [R, Z, P, Y] joint values + - R (base rotation) + - Z (arm vertical axis) + - P (gripper rotation) + - Y (arm extension) """ - joint_values = self.get_location_joint_values(location) - current_pos = self.get_position() - - module_safe_height = joint_values[1] + self.plate_above_height + command = "GETPOS\r\n" - height_jog_steps = current_pos[1] - module_safe_height + try: + # collect coordinates of current position + current_position = list(self.__serial_port.send_command(command).split(" ")) + print(current_position) + current_position = [eval(x.strip(",")) for x in current_position] + print(current_position) + except Exception: + # Fall back: overlapping serial responses were detected. Wait 5 seconds then resend latest command + time.sleep(5) + current_position = list(self.__serial_port.send_command(command).split(" ")) + current_position = [eval(x.strip(",")) for x in current_position] - return height_jog_steps + return current_position def set_location( self, @@ -281,15 +188,17 @@ def set_location( P: int = 0, Y: int = 0, ): - """Saves a new location onto robot - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + """Saves a new location into PlateCrane EX device memory + + Args: + location_name (str): Name of location to be saved + R (int): base rotation (units: motor steps) + Z (int): vertical axis (units: motor steps) + P (int): gripper rotation (units: motor steps) + Y (int): arm extension (units: motor steps) + + Returns: + None """ command = "LOADPOINT %s, %s, %s, %s, %s\r\n" % ( @@ -302,117 +211,73 @@ def set_location( self.__serial_port.send_command(command) def delete_location(self, location_name: str = None): - """Deletes a location from the robot's database - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + """Deletes an existing location from the PlateCrane EX device memory + + Args: + location_name (str): Name of location to delete + + Returns: + None """ if not location_name: raise Exception("No location name provided") - command = "DELETEPOINT %s\r\n" % ( - location_name - ) # Command interpreted by Sciclops + command = "DELETEPOINT %s\r\n" % (location_name) self.__serial_port.send_command(command) def gripper_open(self): - """Opens gripper - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ + """Opens gripper""" - command = "OPEN\r\n" # Command interpreted by Sciclops + command = "OPEN\r\n" self.__serial_port.send_command(command) def gripper_close(self): - """Closes gripper - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ + """Closes gripper""" - command = "CLOSE\r\n" # Command interpreted by Sciclops + command = "CLOSE\r\n" self.__serial_port.send_command(command) def check_open(self): - """Checks if gripper is open - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ + """Checks if gripper is open""" - command = "GETGRIPPERISOPEN\r\n" # Command interpreted by Sciclops + command = "GETGRIPPERISOPEN\r\n" self.__serial_port.send_command(command) def check_closed(self): - """Checks if gripper is closed - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] - """ + """Checks if gripper is closed""" - command = "GETGRIPPERISCLOSED\r\n" # Command interpreted by Sciclops + command = "GETGRIPPERISCLOSED\r\n" self.__serial_port.send_command(command) def jog(self, axis, distance) -> None: """Moves the specified axis the specified distance. - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + Args: + axis (str): "R", "Z", "P", or "Y" + distance (int): distance to move along the axis (units = motor steps) + + Returns: + None """ command = "JOG %s,%d\r\n" % (axis, distance) - self.__serial_port.send_command(command, timeout=1.5) + self.__serial_port.send_command(command) def move_joint_angles(self, R: int, Z: int, P: int, Y: int) -> None: - """Moves on a single axis, using an existing location on robot's database - - :param [ParamName]: [ParamDescription], defaults to [DefaultParamVal] - :type [ParamName]: [ParamType](, optional) - ... - :raises [ErrorType]: [ErrorDescription] - ... - :return: [ReturnDescription] - :rtype: [ReturnType] + """Move to a specified location + + Args: + R (int): base rotation (unit = motor steps) + Z (int): vertical axis (unit = motor steps) + P (int): gripper rotation (unit = motor steps) + Y (int): arm extension (unit = motor steps) """ self.set_location("TEMP", R, Z, P, Y) - command = "MOVE TEMP\r\n" try: - self.__serial_port.send_command(command) + self.__serial_port.send_command(command, timeout=60) except Exception as err: print(err) @@ -421,261 +286,464 @@ def move_joint_angles(self, R: int, Z: int, P: int, Y: int) -> None: self.move_status = "COMPLETED" pass - self.deletepoint("TEMP", R, Z, P, Y) + self.delete_location("TEMP") + + def move_single_axis(self, axis: str, loc: str) -> None: + """Moves on a single axis, using an existing location in PlateCrane EX device memory as reference - def move_single_axis(self, axis: str, loc: str, delay_time=1.5) -> None: - """Moves on a single axis using an existing location on robot's database + Args: + axis (str): axis to move along + loc (str): name of location in PlateCrane EX device memory to use as reference - :param axis: Axis name (R,Z,P,Y) - :type axis: str - :param loc: Name of the location. - :type loc: str + Raises: + TODO - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None + Returns: + None + + TODO: + * Handle errors using error_codes.py + * Reference locations in resource_defs.py, not device memory """ - # TODO:Handle the error raising within error_codes.py if not loc: raise Exception( "PlateCraneLocationException: NoneType variable is not compatible as a location" ) command = "MOVE_" + axis.upper() + " " + loc + "\r\n" - self.__serial_port.send_command(command, timeout=delay_time) + self.__serial_port.send_command(command) self.move_status = "COMPLETED" - def move_location(self, loc: str = None, move_time: float = 4.7) -> None: + def move_location(self, loc: str = None) -> None: """Moves all joint to the given location. - :param loc: Name of the location. - :type loc: str - :param move_time: Number of seconds that will take to complete this movement. Defaults to 4.7 seconds which is the longest possible movement time. - :type move_time: float - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None - """ + Args: + loc (str): location to move to + + Returns: + None - # TODO:Handle the error raising within error_codes.py + TODO: + * Handle the error raising within error_codes.py + """ if not loc: raise Exception( "PlateCraneLocationException: NoneType variable is not compatible as a location" ) cmd = "MOVE " + loc + "\r\n" - self.__serial_port.send_command(cmd, timeout=move_time) + self.__serial_port.send_command(cmd) def move_tower_neutral(self) -> None: """Moves the tower to neutral position - :return: None + TODO: + * This still creates a TEMP position, moves to it, then deletes it after. + Change this, and other related methods below, to use only access + locations in resource_defs """ - - self.move_single_axis("Z", "Safe", delay_time=1.5) + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations["Safe"].joint_angles[1], + P=current_pos[2], + Y=current_pos[3], + ) def move_arm_neutral(self) -> None: - """Moves the arm to neutral position + """Moves the arm to neutral position""" - :return: None - """ - self.move_single_axis("Y", "Safe", delay_time=1) + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=current_pos[2], + Y=locations["Safe"].joint_angles[3], + ) def move_gripper_neutral(self) -> None: - """Moves the gripper to neutral position + """Moves the gripper to neutral position""" - :return: None - """ - self.move_single_axis("P", "Safe", delay_time=0.3) + self.move_single_axis("P", "Safe") - def move_joints_neutral(self) -> None: - """Moves all joints neutral position + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=locations, + Y=current_pos[3], + ) - :return: None - """ + def move_joints_neutral(self) -> None: + """Moves all joints neutral position""" self.move_arm_neutral() self.move_tower_neutral() - def get_module_plate( - self, source: str = None, height_jog_steps: int = 0, height_offset: int = 0 + def pick_plate_safe_approach( + self, + source: str, + plate_type: str, + grip_height_in_steps: int, ) -> None: - """picks up the plate from a module location by moving each joint step by step - - :param source: Name of the source location. - :type source: str - :param height_jog_steps: Number of jogging steps that will be used to move the Z axis to the plate location - :type height_jog_steps: int - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None + """Picks a plate from a source type "nest" using a safe travel path. + + Args: + source (str): source location name defined in resource_defs.py + plate_type (str): plate definition name defined in resource_defs.py + grip_height_in_steps (int): z axis steps distance from bottom of plate to grip the plate + + Returns: + None """ + print("PICK PLATE SAFE APPROACH CALLED") - # TODO:Handle the error raising within error_codes.py - if not source: - raise Exception( - "PlateCraneLocationException: NoneType variable is not compatible as a location" - ) + # open the gripper + self.gripper_open() - self.move_single_axis("Y", source) - self.jog("Z", -(self.plate_pick_steps_module - height_offset)) - self.gripper_close() - self.jog("Z", self.plate_pick_steps_module) + # Rotate base (R axis) toward plate location + current_pos = self.get_position() + self.move_joint_angles( + R=locations[source].joint_angles[0], + Z=current_pos[1], + P=current_pos[2], + Y=current_pos[3], + ) - def put_module_plate( - self, target: str = None, height_jog_steps: int = 0, height_offset: int = 0 - ) -> None: - """Places the plate onto a module location by moving each joint step by step - - :param target: Name of the target location. - :type target: str - :param height_jog_steps: Number of jogging steps that will be used to move the Z axis to the plate location - :type height_jog_steps: int - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None - """ + # Rotate gripper + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=locations[source].joint_angles[2], + Y=current_pos[3], + ) - # TODO:Handle the error raising within error_codes.py - if not target: - raise Exception( - "PlateCraneLocationException: NoneType variable is not compatible as a location" - ) + # Lower z axis to safe_approach_z height + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations[source].safe_approach_height, + P=current_pos[2], + Y=current_pos[3], + ) - self.move_single_axis("Y", target) - self.jog("Z", -(self.plate_pick_steps_module - height_offset)) - self.gripper_open() - self.jog("Z", self.plate_pick_steps_module) + # extend arm over plate and rotate gripper to correct orientation + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=current_pos[2], + Y=locations[source].joint_angles[3], + ) - def move_module_entry(self, source: str = None, height_jog_steps: int = 0) -> None: - """Moves to the entry location of the location that is given. It moves the R,P and Z joints step by step to aviod collisions. + # Lower arm (z axis) to correct plate grip height + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations[source].joint_angles[1] + grip_height_in_steps, + P=current_pos[2], + Y=current_pos[3], + ) - :param source: Name of the source location. - :type source: str - :param height_jog_steps: Number of jogging steps that will be used to move the Z axis to the plate location - :type height_jog_steps: int - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None - """ - # TODO:Handle the error raising within error_codes.py + # grip the plate + self.gripper_close() - if not source: - raise Exception( - "PlateCraneLocationException: NoneType variable is not compatible as a location" - ) + # Move arm with plate back to safe approach height + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations[source].safe_approach_height, + P=current_pos[2], + Y=current_pos[3], + ) - if not height_jog_steps: - height_jog_steps = self.get_safe_height_jog_steps(source) + # retract arm (Y axis) as much as possible (to same Y axis value as Safe location) + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=current_pos[2], + Y=locations["Safe"].joint_angles[3], + ) - self.move_single_axis("R", source) - self.move_single_axis("P", source) - self.jog("Z", -height_jog_steps) + # move rest of joints to neutral location + self.move_tower_neutral() + self.move_arm_neutral() - def pick_module_plate( - self, source: str = None, height_jog_steps: int = 0, height_offset: int = 0 + def place_plate_safe_approach( + self, + target: str, + grip_height_in_steps: int, ) -> None: - """Pick a module plate from a module location. - - :param source: Name of the source location. - :type source: str - :param height_jog_steps: Number of jogging steps that will be used to move the Z axis to the plate location - :type height_jog_steps: int - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None + """Places a plate to a target location of type "nest" using a safe travel path. + + Args: + target (str): source location name defined in resource_defs.py + plate_type (str): plate definition name defined in resource_defs.py + grip_height_in_steps (int): z axis steps distance from bottom of plate to grip the plate + + Returns: + None """ - if not source: - raise Exception( - "PlateCraneLocationException: NoneType variable is not compatible as a location" - ) - self.move_joints_neutral() + # Rotate base (R axis) toward target location + current_pos = self.get_position() + self.move_joint_angles( + R=locations[target].joint_angles[0], + Z=current_pos[1], + P=current_pos[2], + Y=current_pos[3], + ) + + # Rotate gripper to correct orientation + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=locations[target].joint_angles[2], + Y=current_pos[3], + ) + + # Lower z axis to safe_approach_z height + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations[target].safe_approach_height, + P=current_pos[2], + Y=current_pos[3], + ) + + # extend arm over plate + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=current_pos[2], + Y=locations[target].joint_angles[3], + ) + + # lower arm (z axis) to correct plate grip height + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations[target].joint_angles[1] + grip_height_in_steps, + P=current_pos[2], + Y=current_pos[3], + ) + self.gripper_open() - self.move_module_entry(source, height_jog_steps) - self.get_module_plate(source, height_jog_steps, height_offset) + # Back away using safe approach path + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations[target].safe_approach_height, + P=current_pos[2], + Y=current_pos[3], + ) + + # retract arm (Y axis) as much as possible (to same Y axis value as Safe location) + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=current_pos[2], + Y=locations["Safe"].joint_angles[3], + ) + # move arm to safe location + self.move_tower_neutral() self.move_arm_neutral() - def place_module_plate( - self, target: str = None, height_jog_steps: int = 0, height_offset: int = 0 + def pick_plate_direct( + self, + source: str, + source_type: str, + plate_type: str, + grip_height_in_steps: int, + has_lid: bool, + incremental_lift: bool = False, ) -> None: - """Place a module plate onto a module location. - - :param target: Name of the target location. - :type target: str - :param height_jog_steps: Number of jogging steps that will be used to move the Z axis to the plate location - :type height_jog_steps: int - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None - """ - if not target: - raise Exception( - "PlateCraneLocationException: NoneType variable is not compatible as a location" - ) + """Picks a plate from a source location of type either "nest" or "stack" using a direct travel path - self.move_joints_neutral() + "nest" transfers: gripper open, direct to grab plate z height + "stack" transfers: gripper closed, touch top of plate, z up, z down to correct grab plate height - self.move_module_entry(target, height_jog_steps) - self.put_module_plate(target, height_jog_steps, height_offset) + Args: + source (str): source location name defined in resource_defs.py + source_type (str): either "nest" or "stack" + plate_type (str): plate definition name defined in resource_defs.py + grip_height_in_steps (int): z axis steps distance from bottom of plate to grip the plate + has_lid (bool): True if plate has lid, False otherwise + incremental_lift (bool): True if you want to use incremental lift, False otherwise (default False) + incremental lift (good for ensuring lids are removed gently and correctly): + - grab plate at grip_height_in_steps + - raise 100 steps along z axis (repeat 5x) + - continue with rest of transfer - self.move_arm_neutral() + Returns: + None + """ - def pick_stack_plate(self, source: str = None, height_offset: int = 0) -> None: - """Pick a stack plate from stack location. + # Rotate R axis (base rotation) over the plate + current_pos = self.get_position() + self.move_joint_angles( + R=locations[source].joint_angles[0], + Z=current_pos[1], + P=current_pos[2], + Y=current_pos[3], + ) - :param source: Name of the source location. - :type source: str - :raises [PlateCraneLocationException]: [Error for None type locations] - :return: None - """ - if not source: - raise Exception( - "PlateCraneLocationException: NoneType variable is not compatible as a location" + if source_type == "stack": + # close the gripper + self.gripper_close() + + # move the arm directly above the stack + self.move_joint_angles( + R=locations[source].joint_angles[0], + Z=current_pos[1], + P=locations[source].joint_angles[2], + Y=locations[source].joint_angles[3], ) - self.move_joints_neutral() - self.move_single_axis("R", source) + # decrease the plate crane speed + self.set_speed(50) + + # move down in z height to tap the top of the plates in stack + self.move_joint_angles( + R=locations[source].joint_angles[0], + Z=locations[source].joint_angles[ + 1 + ], # this is the only axis that should need to move + P=locations[source].joint_angles[2], + Y=locations[source].joint_angles[3], + ) - if "stack" in source.lower(): - self.gripper_close() - self.move_location(source) - self.jog("Z", self.plate_above_height) + # set plate crane back to full speed + self.set_speed(100) + + # Move up, open gripper, grab plate at correct height + self.jog("Z", 1000) self.gripper_open() - self.jog("Z", -self.plate_pick_steps_stack + height_offset) - else: + + # Calculate z travel from grip height with/without lid + if has_lid: + z_jog_down_from_plate_top = ( + PlateResource.convert_to_steps( + plate_definitions[plate_type].plate_height_with_lid + ) + - grip_height_in_steps + ) + else: + z_jog_down_from_plate_top = ( + PlateResource.convert_to_steps( + plate_definitions[plate_type].plate_height + ) + - grip_height_in_steps + ) + + # Move down to correct z height to grip plate + self.jog("Z", -(1000 + z_jog_down_from_plate_top)) + + else: # if source_type == nest: self.gripper_open() - self.move_location(source) - # self.jog("Z", -self.plate_pick_steps_stack + height_offset) + + self.move_joint_angles( + R=locations[source].joint_angles[0], + Z=locations[source].joint_angles[1] + grip_height_in_steps, + P=locations[source].joint_angles[2], + Y=locations[source].joint_angles[3], + ) + + # open the gripper self.gripper_close() + + if incremental_lift: + self.jog("Z", 100) + self.jog("Z", 100) + self.jog("Z", 100) + self.jog("Z", 100) + self.jog("Z", 100) + + # return arm to safe location self.move_tower_neutral() self.move_arm_neutral() - def place_stack_plate(self, target: str = None, height_offset: int = 0) -> None: - """Place a stack plate either onto the exhange location or into a stack + def place_plate_direct( + self, + target: str, + target_type: str, # TODO: use later to slow speed for target_type = "stack" + grip_height_in_steps: str, + ) -> None: + """Places a plate onto a target location of type either "nest" or "stack" using a direct travel path + + Args: + target (str): target location name defined in resource_defs.py + target_type (str): either "nest" or "stack" + plate_type (str): plate definition name defined in resource_defs.py + grip_height_in_steps (int): z axis steps distance from bottom of plate to grip the plate + + Returns: + None - :param target: Name of the target location. Defults to None if target is None, it will be set to exchange location. - :type target: str - :return: None + TODO: + * use target_type variable to slow approach in "stack" transfers to avoid striking other plates """ - self.move_joints_neutral() - self.move_single_axis("R", target) - self.move_location(target) + # Rotate base (R axis) to target location + current_pos = self.get_position() + self.move_joint_angles( + R=locations[target].joint_angles[0], + Z=current_pos[1], + P=current_pos[2], + Y=current_pos[3], + ) + + # Extend arm over plate location (Y axis) and rotate gripper to correct orientation (P axis) + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=current_pos[1], + P=locations[target].joint_angles[2], + Y=locations[target].joint_angles[3], + ) + + if target_type == "stack": + # lower plate crane speed + self.set_speed(50) + + # Lower arm (z axis) to plate grip height + current_pos = self.get_position() + self.move_joint_angles( + R=current_pos[0], + Z=locations[target].joint_angles[1] + grip_height_in_steps, + P=current_pos[2], + Y=current_pos[3], + ) + + if target_type == "stack": + # return plate crane to sull speed + self.set_speed(100) + + # open gripper to release the plate self.gripper_open() + self.move_tower_neutral() - self.move_arm_neutral() + self.move_joints_neutral() def _is_location_joint_values(self, location: str, name: str = "temp") -> str: """ - If the location was provided as joint values, transfer joint values into a saved location on the robot and return the location name. - If location parameter is a name of an already saved location, do nothing. - - :param location: Location to be checked if this is an already saved location on the robot database or a new location with 4 joint values - :type location: string - :param name: Location name to be used to save a new location if the location parameter was provided as 4 joint values - :type name: string - :raises [ErrorType]: [ErrorDescription] - :return: location_name = Returns the location name that is saved on robot database with location joint values - :rtype: str + If the location was provided as joint values, transfer joint values into a saved location + on the robot and return the location name. If location parameter is a name of an already saved + location, do nothing. + + TODO: + * Is there any reason we should keep this function? """ try: - location = eval(location) + # location = eval(location) # replacing with checking config + from platecrane_driver.resource_defs import location except NameError: # Location was given as a location name print(name + ": " + location) @@ -692,269 +760,200 @@ def _is_location_joint_values(self, location: str, name: str = "temp") -> str: def remove_lid( self, - source: str = None, - target: str = "Stack2", - plate_type: str = "96_well", + source: str, + target: str, + plate_type: str, height_offset: int = 0, ) -> None: + """Removes lid from a plate at source location and places lid at target location + + Args: + source (str): source location name defined in resource_defs.py + target (str): target location name defined in resource_defs.py + plate_type (str): plate definition name defined in resource_defs.py + height_offset (int): change in z height to be applied to grip location on the lid (units = mm) + defaults to 0mm + + Returns: + None """ - Remove the plate lid - - :param source: Source location, provided as either a location name or 4 joint values. - :type source: str - :param target: Target location, provided as either a location name or 4 joint values. - :type target: str - :param plate_type: Type of the plate - :type plate_type: str - :raises [ErrorType]: [ErrorDescription] - :return: None - """ - self.get_new_plate_height(plate_type) - - # TESTING - print("LID HEIGHT") - print(self.lid_destination_height) - - target_offset = ( - 2 * self.plate_above_height - self.plate_pick_steps_stack + self.lid_destination_height - # + height_offset - ) # Finding the correct target hight when only transferring the plate lid - target_loc = self.get_location_joint_values(target) - remove_lid_target = "Temp_Lid_Target_Loc" - - self.set_location( - remove_lid_target, - target_loc[0], - target_loc[1] - target_offset, - target_loc[2], - target_loc[3], + + # Calculate grip height in motor steps + source_grip_height_in_steps = PlateResource.convert_to_steps( + plate_definitions[plate_type].lid_removal_grip_height + height_offset + ) + target_grip_height_in_steps = PlateResource.convert_to_steps( + plate_definitions[plate_type].plate_height_with_lid + - plate_definitions[plate_type].lid_height + + height_offset ) - self.plate_pick_steps_stack = self.plate_lid_steps + height_offset + # Pass to transfer function but specify that it is a lid we're transferring self.transfer( source=source, - target=remove_lid_target, - source_type="stack", - target_type="stack", + target=target, + plate_type=plate_type, + height_offset=height_offset, + is_lid=True, + source_grip_height_in_steps=source_grip_height_in_steps, + target_grip_height_in_steps=target_grip_height_in_steps, + incremental_lift=True, ) def replace_lid( self, - source: str = "Stack2", - target: str = None, - plate_type: str = "96_well", + source: str, + target: str, + plate_type: str, height_offset: int = 0, ) -> None: - """ - Replace the lid back to the plate - - :param source: Source location, provided as either a location name or 4 joint values. - :type source: str - :param target: Target location, provided as either a location name or 4 joint values. - :type target: str - :param plate_type: Type of the plate - :type plate_type: str - :raises [ErrorType]: [ErrorDescription] - :return: None - """ + """ "Replaces lid at source location onto a plate at the target location - self.get_new_plate_height(plate_type) + Args: + source (str): source location name defined in resource_defs.py + target (str): target location name defined in resource_defs.py + plate_type (str): plate definition name defined in resource_defs.py + height_offset (int): change in z height to be applied to grip location on the lid (units = mm) + defaults to 0mm - target_offset = ( - 2 * self.plate_above_height - self.plate_pick_steps_stack + self.lid_destination_height + Returns: + None + """ + # Calculate grip height in motor steps + source_grip_height_in_steps = PlateResource.convert_to_steps( + plate_definitions[plate_type].lid_grip_height + height_offset ) - - source_loc = self.get_location_joint_values(source) - remove_lid_source = "Temp_Lid_Source_loc" - - self.set_location( - remove_lid_source, - source_loc[0], - source_loc[1] - target_offset, - source_loc[2], - source_loc[3], + target_grip_height_in_steps = PlateResource.convert_to_steps( + plate_definitions[plate_type].lid_removal_grip_height + height_offset ) - self.plate_pick_steps_stack = self.plate_lid_steps + height_offset + # Pass to transfer function but specify that it is a lid we're transferring self.transfer( - source=remove_lid_source, + source=source, target=target, - source_type="stack", - target_type="stack", + plate_type=plate_type, + height_offset=height_offset, + source_grip_height_in_steps=source_grip_height_in_steps, + target_grip_height_in_steps=target_grip_height_in_steps, + is_lid=True, ) - def stack_transfer( + def transfer( self, - source: str = None, - target: str = None, - source_type: str = "stack", - target_type: str = "module", - height_offset: int = 0, + source: str, + target: str, + plate_type: str, + height_offset: int = 0, # units = mm + is_lid: bool = False, + has_lid: bool = False, + source_grip_height_in_steps: int = None, # if removing/replacing lid + target_grip_height_in_steps: int = None, # if removing/replacing lid + incremental_lift: bool = False, ) -> None: - """ - Transfer a plate plate from a plate stack to the exchange location or make a transfer in between stacks and stack entry locations - - :param source: Source location, provided as either a location name or 4 joint values. - :type source: str - :param target: Target location, provided as either a location name or 4 joint values. - :type target: str - :raises [ErrorType]: [ErrorDescription] - :return: None - """ + """Handles the transfer request + + Args: + source (str): source location name defined in resource_defs.py + target (str): target location name defined in resource_defs.py + plate_type (str): plate definition name defined in resource_defs.py + height_offset (int): change in z height to be applied to grip location on the lid (units = mm) + defaults to 0mm + is_lid (bool): True if transferring a lid, False otherwise + defaults to False + has_lid (bool): True if plate being transferred has a lid, otherwise False + defaults to False + source_grip_height_in_steps (int): z axis steps distance from bottom of plate to grip the plate at source location + defaults to None + only used if transfer function is called from remove/replace_lid functions + target_grip_height_in_steps (int): z axis steps distance from bottom of plate to grip the plate at target location + defaults to None + only used if transfer function is called from remove/replace_lid functions + incremental_lift (bool): True if you want to use incremental lift, False otherwise (default False) + incremental lift (good for ensuring lids are removed gently and correctly): + - grab plate at grip_height_in_steps + - raise 100 steps along z axis (repeat 5x) + - continue with rest of transfer + + Raises: + TODO + + Returns: + None + """ + + # Extract the source and target location_types + source_type = locations[source].location_type + target_type = locations[target].location_type + + # Determine source and target grip heights from bottom of plate (converted from mm to z motor steps) + """If the transfer function is called from either remove_lid() or replace_lid(), + these values will be precalculated and passed in. Otherwise they need to be calculated here.""" + if not is_lid: + grip_height_in_steps = PlateResource.convert_to_steps( + plate_definitions[plate_type].grip_height + height_offset + ) + source_grip_height_in_steps = grip_height_in_steps + target_grip_height_in_steps = grip_height_in_steps - if not source or not target: - print("Please provide a source location") - # TODO: Raise an exception here - return + # is safe approach required for source and/or target? + source_use_safe_approach = ( + False if locations[source].safe_approach_height == 0 else True + ) + target_use_safe_approach = ( + False if locations[target].safe_approach_height == 0 else True + ) - source = self._is_location_joint_values(location=source, name="source") - target = self._is_location_joint_values(location=target, name="target") + # PICK PLATE FROM SOURCE LOCATION + if source_type == "stack": + self.pick_plate_direct( + source=source, + source_type=source_type, # "stack" + plate_type=plate_type, + grip_height_in_steps=source_grip_height_in_steps, + has_lid=has_lid, + incremental_lift=incremental_lift, + ) - if source_type.lower() == "stack": - source_loc = self.get_location_joint_values(source) - if "stack" in source.lower(): - stack_source = "stack_source_loc" - source_offset = self.plate_above_height + height_offset + elif source_type == "nest": + if source_use_safe_approach: + self.pick_plate_safe_approach( + source=source, + plate_type=plate_type, + grip_height_in_steps=source_grip_height_in_steps, + ) else: - stack_source = "source_loc" - source_offset = ( - 2 * self.plate_above_height - - self.plate_pick_steps_stack - + height_offset + self.pick_plate_direct( + source=source, + source_type=source_type, # nest + plate_type=plate_type, + grip_height_in_steps=source_grip_height_in_steps, + has_lid=has_lid, + incremental_lift=incremental_lift, ) - - self.set_location( - stack_source, - source_loc[0], - source_loc[1] + source_offset, - source_loc[2], - source_loc[3], - ) - self.pick_stack_plate(stack_source, height_offset=height_offset) - - elif source_type.lower() == "module": - self.pick_module_plate(source, height_offset=height_offset) - - target_height_jog_steps = self.get_safe_height_jog_steps(target) - if target_type.lower() == "stack": - target_loc = self.get_location_joint_values(target) - target_offset = ( - 2 * self.plate_above_height - - self.plate_pick_steps_stack - + height_offset - ) - stack_target = "target_loc" - self.set_location( - stack_target, - target_loc[0], - target_loc[1] + target_offset, - target_loc[2], - target_loc[3], - ) - self.place_stack_plate(stack_target, height_offset=height_offset) - - elif target_type.lower() == "module": - self.place_module_plate( - target, - height_jog_steps=target_height_jog_steps, - height_offset=height_offset, + else: + raise Exception("Source location type not defined correctly") + + # PLACE PLATE AT TARGET LOCATION + if target_type == "stack": + self.place_plate_direct( + target=target, + target_type=target_type, + grip_height_in_steps=target_grip_height_in_steps, ) - - def module_transfer(self, source: str, target: str, height_offset: int = 0) -> None: - """ - Transfer a plate in between two modules using source and target locations - - :param source: Source location, provided as either a location name or 4 joint values. - :type source: str - :param target: Target location, provided as either a location name or 4 joint values. - :type target: str - :raises [ErrorType]: [ErrorDescription] - :return: None - """ - self.move_joints_neutral() - source = self._is_location_joint_values(location=source, name="source") - target = self._is_location_joint_values(location=target, name="target") - - source_height_jog_steps = self.get_safe_height_jog_steps(source) - target_height_jog_steps = self.get_safe_height_jog_steps(target) - - self.pick_module_plate(source, source_height_jog_steps, height_offset) - self.place_module_plate(target, target_height_jog_steps, height_offset) - - def get_new_plate_height(self, plate_type): - """ - Gets the new plate height values for the given plate_type - :param plate_type: Plate type. - :type source: str - :return: None - """ - if plate_type not in self.plate_resources.keys(): - raise Exception("Unkown plate type") - self.plate_above_height = self.plate_resources[plate_type]["plate_above_height"] - self.plate_lid_steps = self.plate_resources[plate_type]["plate_lid_steps"] - self.plate_pick_steps_stack = self.plate_resources[plate_type][ - "plate_pick_steps_stack" - ] - self.plate_pick_steps_module = self.plate_resources[plate_type][ - "plate_pick_steps_module" - ] - self.lid_destination_height = self.plate_resources[plate_type]["lid_destination_height"] - - def get_stack_resource( - self, - ): - """ - Gets the new plate height values for the given plate_type - :param plate_type: Plate type. - :type source: str - :return: None - """ - pass - - def update_stack_resource(self): - """ - Gets the new plate height values for the given plate_type - :param plate_type: Plate type. - :type source: str - :return: None - """ - pass - - def transfer( - self, - source: str = None, - target: str = None, - source_type: str = "stack", - target_type: str = "stack", - height_offset: int = 0, - plate_type: str = None, - ) -> None: - """ - Handles the transfer request - - :param source: Source location, provided as either a location name or 4 joint values. - :type source: str - :param target: Target location, provided as either a location name or 4 joint values. - :type target: str - :param plate_type: Type of the plate - :type plate_type: str - :raises [ErrorType]: [ErrorDescription] - :return: None - """ - - self.get_stack_resource() - - if plate_type: - self.get_new_plate_height(plate_type) - - if source_type == "stack" or target_type == "stack": - self.stack_transfer(source, target, source_type, target_type, height_offset) - elif source_type == "module" and target_type == "module": - self.module_transfer(source, target, height_offset) - - self.move_joints_neutral() - self.move_location("Safe") - self.update_stack_resource() # + elif target_type == "nest": + if target_use_safe_approach: + self.place_plate_safe_approach( + target=target, + grip_height_in_steps=target_grip_height_in_steps, + ) + else: + self.place_plate_direct( + target=target, + target_type=target_type, + grip_height_in_steps=target_grip_height_in_steps, + ) + else: + raise Exception("Target location type not defined correctly") if __name__ == "__main__": @@ -962,62 +961,3 @@ def transfer( Runs given function. """ s = PlateCrane("/dev/ttyUSB4") - # s.initialize() - # s.home() - stack4 = "Stack4" - stack5 = "Stack5" - solo6 = "Solo.Position6" - solo4 = "Solo.Position4" - solo3 = "Solo.Position3" - target_loc = "HidexNest2" - lidnest3 = "LidNest3" - sealer = "SealerNest" - # s.move_location("Safe") - -# TESTING -# s.pick_stack_plate("Stack1") -# a = s.get_position() -# s.set_location("LidNest3",R=231449,Z=-31500,P=484,Y=-306) - -# s.set_location("Hidex.Nest",R=a[0],Z=a[1],P=a[2],Y=a[3]) -# s.place_module_plate("Hidex.Nest") -# s.move_single_axis("Z","Hidex.Nest") -# s.transfer("Hidex.Nest","Solo.Position1",source_type="module",target_type="stack",height_offset=800) -# s.transfer("Stack1", "PeelerNest",source_type="stack",target_type="stack") - -# s.place_module_plate() -# s.get_location_list() - -# s.move_joints_neutral() -# s.move_single_axis("R", "Safe", delay_time=1) -# s.set_location("Safe",R=195399,Z=0,P=0,Y=0) -# s.set_location("LidNest2",R=131719,Z=-31001,P=-5890,Y=-315) -# s.transfer(source="LidNest1",target="LidNest2",source_type="stack",target_type="stack", plate_type="96_well") - -# s.transfer(source="LidNest2",target="LidNest3",source_type="stack",target_type="stack", plate_type="96_well") -# s.transfer("Stack1","Stack1") -# s.free_joints() -# s.lock_joints() - -# s.set_location("LidNest3",R=99817,Z=-31001,P=-5890,Y=-315) - -# s.get_location_joint_values("HidexNest2") -# s.set_location("HidexNest2", R=210015,Z=-30400,P=490,Y=2323) - -# s.transfer(stack5, solo4, source_type = "stack", target_type = "module", plate_type = "96_deep_well") -# s.transfer(solo4, stack5, source_type = "module", target_type = "stack", plate_type = "96_deep_well") - -# s.remove_lid(source = "LidNest1", target="LidNest2", plate_type="96_well") -# s.transfer("Stack4", solo3, source_type = "stack", target_type = "stack", plate_type = "tip_box_lid_off") -# s.remove_lid(source = solo6, target="LidNest3", plate_type="tip_box_lid_on") -# s.replace_lid(source = "LidNest3", target = solo6, plate_type = "tip_box_lid_on") -# s.replace_lid(source = "LidNest2", target = solo4, plate_type = "96_well") -# s.transfer(solo4, stack5, source_type = "module", target_type = "stack", plate_type = "96_well") -# s.transfer(solo6, "Stack2", source_type = "module", target_type = "stack", plate_type = "tip_box_lid_on") - - -# Crash error outputs 21(R axis),14(z axis), 02 Wrong location name. 1400 (Z axis hits the plate), 00 success -# TODO: Need a response handler function. Unkown error messages T1, ATS, TU these are about connection issues (multiple access?) -# TODO: Slow the arm before hitting the plate in pick_stack_plate -# TODO: Create a plate detect function within pick stack plate function -# TODO: Maybe write another pick stack funtion to remove the plate detect movement diff --git a/src/platecrane_driver/platecrane_joint_limits.py b/src/platecrane_driver/platecrane_joint_limits.py index 50f9c02..55217bb 100644 --- a/src/platecrane_driver/platecrane_joint_limits.py +++ b/src/platecrane_driver/platecrane_joint_limits.py @@ -1,4 +1,5 @@ """Joint limits for the platecrane.""" + platecrane_joint_limits = { "R": [-1200, 10200], "Z": [-13600, 300], diff --git a/src/platecrane_driver/platecrane_locations.py b/src/platecrane_driver/platecrane_locations.py index f86bce2..a7f3583 100644 --- a/src/platecrane_driver/platecrane_locations.py +++ b/src/platecrane_driver/platecrane_locations.py @@ -1,4 +1,4 @@ -""" THIS IS JUST FOR REFERENCE AND IT IS NOT INTEGRATED WITH THE DRIVER""" +"""THIS IS JUST FOR REFERENCE AND IT IS NOT INTEGRATED WITH THE DRIVER""" platecrane_locations = { "Safe": "117902 2349 -5882 0", diff --git a/src/platecrane_driver/resource_defs.py b/src/platecrane_driver/resource_defs.py new file mode 100644 index 0000000..25fd579 --- /dev/null +++ b/src/platecrane_driver/resource_defs.py @@ -0,0 +1,126 @@ +"""Resource definitions for the platecrane in BIO 350.""" + +# from resource_types import Location, PlateResource # through driver +from platecrane_driver.resource_types import Location, PlateResource # through WEI + +# Locations accessible by the PlateCrane EX. [R (base), Z (vertical axis), P (gripper rotation), Y (arm extension)] +locations = { + "Safe": Location( + name="Safe", + joint_angles=[182220, 2500, 460, -308], + location_type="nest", + safe_approach_height=0, + ), + "Stack1": Location( + name="Stack1", + joint_angles=[164672, -32703, 472, 5389], + location_type="stack", + safe_approach_height=0, + ), + "Stack2": Location( + name="Stack2", + joint_angles=[182220, -32703, 460, 5420], + location_type="stack", + safe_approach_height=0, + ), + "Stack3": Location( + name="Stack3", + joint_angles=[199708, -32703, 514, 5484], + location_type="stack", + safe_approach_height=0, + ), + "Stack4": Location( + name="Stack4", + joint_angles=[217401, -32703, 546, 5473], + location_type="stack", + safe_approach_height=0, + ), + "Stack5": Location( + name="Stack5", + joint_angles=[235104, -32703, 532, 5453], + location_type="stack", + safe_approach_height=0, + ), + "LidNest1": Location( + name="LidNest1", + joint_angles=[168355, -31800, 484, -306], + location_type="nest", + safe_approach_height=0, + ), + "LidNest2": Location( + name="LidNest2", + joint_angles=[199805, -31800, 484, -306], + location_type="nest", + safe_approach_height=0, + ), + "LidNest3": Location( + name="LidNest3", + joint_angles=[231449, -31800, 484, -306], + location_type="nest", + safe_approach_height=0, + ), + "Solo.Position1": Location( + name="Solo.Position1", + joint_angles=[36703, -27951, -1000, 3630], + location_type="nest", + safe_approach_height=0, + ), + "Solo.Position2": Location( + name="Solo.Position2", + joint_angles=[53182, -27951, -413, 834], + location_type="nest", + safe_approach_height=0, + ), + "Hidex.Nest": Location( + name="Hidex.Nest", + joint_angles=[102327, -31090, -5840, 2389], + location_type="nest", + safe_approach_height=-27033, + ), + "Sealer.Nest": Location( + name="Sealer.Nest", + joint_angles=[117412, 920, -4766, 4514], + location_type="nest", + safe_approach_height=0, + ), + "Peeler.Nest": Location( + name="Peeler.Nest", + joint_angles=[292635, -31008, -4521, 4235], + location_type="nest", + safe_approach_height=0, + ), + "Liconic.Nest": Location( + name="Liconic.Nest", + joint_angles=[265563, -19800, -5413, 4978], + location_type="nest", + safe_approach_height=0, + ), +} + +# Dimensions of labware used on the BIO_Workcells +plate_definitions = { + "flat_bottom_96well": PlateResource( + plate_height=14, + grip_height=3, + plate_height_with_lid=16, + lid_height=10, + lid_grip_height=4, + lid_removal_grip_height=12, + ), + "tip_box_180uL": PlateResource( + plate_height=0, + grip_height=0, + plate_height_with_lid=0, + lid_height=0, + lid_grip_height=0, + lid_removal_grip_height=0, + ), + "pcr_96well": PlateResource( + plate_height=0, + grip_height=0, + plate_height_with_lid=0, + lid_height=0, + lid_grip_height=0, + lid_removal_grip_height=0, + ), +} diff --git a/src/platecrane_driver/resource_types.py b/src/platecrane_driver/resource_types.py new file mode 100644 index 0000000..68170aa --- /dev/null +++ b/src/platecrane_driver/resource_types.py @@ -0,0 +1,48 @@ +"""This module contains the Pydantic models for the PlateCrane resource types""" + +from typing import List, Optional + +from pydantic import BaseModel + + +class Location(BaseModel): + """A location accessible by the PlateCrane EX""" + + name: str + """Internal name of the location""" + joint_angles: List[int] + """List of 4 joint angles (unit: integer stepper values)""" + location_type: str + """Type of location, either stack or nest. This will be used to determine gripper path for interactions with the location""" + safe_approach_height: Optional[int] = None + """A safe height (unit: integer stepper value for Z axis) from which + to extend the arm when approaching this location.""" + + +class PlateResource(BaseModel): + """A plate resource that can be manipulated by the PlateCrane EX""" + + # Plate Properties + + plate_height: float + """The height measured from the bottom of the plate to the top""" + grip_height: float + """The height at which to grip the plate, measured from the bottom of the plate""" + plate_height_with_lid: Optional[float] = None + """The height of the plate when lidded, measured from the bottom of the plate to the top of the lid. + Only required if the resource supports lids""" + + # Lid Properties + + lid_height: Optional[float] = None + """The height of the lid alone, measured from the bottom of the lid to the top of the lid""" + lid_grip_height: Optional[float] = None + """The height at which to grip the lid itself, measured from the bottom of the lid""" + lid_removal_grip_height: Optional[float] = None + """The height at which to grip the lid when removing it, measured from the bottom of the lidded plate""" + + def convert_to_steps(plate_measurement_in_mm: float) -> int: + """Converts plate measurements in mm to PlateCrane EX motor steps on the z-axis""" + steps_per_mm = 80.5 + steps = int(plate_measurement_in_mm * steps_per_mm) + return steps diff --git a/src/platecrane_driver/sciclops_driver.py b/src/platecrane_driver/sciclops_driver.py index 7e7d80f..1bd7356 100644 --- a/src/platecrane_driver/sciclops_driver.py +++ b/src/platecrane_driver/sciclops_driver.py @@ -1,4 +1,5 @@ """Driver for the Hudson Robotics Sciclops robot.""" + import asyncio import re diff --git a/src/platecrane_driver/serial_port.py b/src/platecrane_driver/serial_port.py index 4d57b38..ef7f2ed 100644 --- a/src/platecrane_driver/serial_port.py +++ b/src/platecrane_driver/serial_port.py @@ -1,4 +1,5 @@ """Provides SerialPort class to interface with the plate_crane.""" + import time from serial import Serial, SerialException @@ -48,12 +49,15 @@ def __disconnect_robot(self): else: print("Robot is successfully disconnected") - def send_command(self, command, timeout=0): + def send_command(self, command, timeout=10, delay=0): """ Sends provided command to Peeler and stores data outputted by the peeler. Indicates when the confirmation that the Peeler received the command by displaying 'ACK TRUE.' """ + print_command = command.strip("\r\n") + print(f"Sending command '{print_command}'") + send_time = time.time() try: self.connection.write(command.encode("utf-8")) @@ -64,14 +68,18 @@ def send_command(self, command, timeout=0): response_msg = "" initial_command_msg = "" - time.sleep(timeout) + time.sleep(delay) - while initial_command_msg == "": - response_msg, initial_command_msg = self.receive_command(timeout) + response_msg, initial_command_msg = self.receive_command( + initial_command_msg=command.strip("\r\n"), timeout=timeout + ) # Print the full output message including the initial command that was sent - print(initial_command_msg) - print(response_msg) + print_command = initial_command_msg.strip("\r\n") + print_response = response_msg.strip("\r\n") + print( + f"Command '{print_command}': {print_response} (elapsed time: {time.time() - send_time} seconds)" + ) error_codes = { "21": "R axis error", @@ -88,7 +96,7 @@ def send_command(self, command, timeout=0): return response_msg - def receive_command(self, time_wait): + def receive_command(self, initial_command_msg="", timeout=0): """ Records the data outputted by the plate_crane and sets it to equal "" if no data is outputted in the provided time. """ @@ -96,16 +104,22 @@ def receive_command(self, time_wait): # response_string = self.connection.read_until(expected=b'\r').decode('utf-8') response = "" response_string = "" - initial_command_msg = "" - - if self.connection.in_waiting != 0: - response = self.connection.readlines() - initial_command_msg = response[0].decode("utf-8").strip("\r\n") - if len(response) > 1: - for line_index in range(1, len(response)): - response_string += "\n" + response[line_index].decode( - "utf-8" - ).strip("\r\n") - else: - response_string = "" - return response_string, initial_command_msg + response_command_msg = "" + + start_wait = time.time() + while True: + if self.connection.in_waiting != 0: + response = self.connection.readlines() + if response[0].decode("utf-8").strip("\r\n") == initial_command_msg: + response_command_msg = initial_command_msg + if len(response) > 1: + for line_index in range(1, len(response)): + response_string += "\n" + response[line_index].decode( + "utf-8" + ).strip("\r\n") + else: + response_string = response[0].decode("utf-8").strip("\r\n") + if time.time() - start_wait > timeout or response_string != "": + break + time.sleep(0.25) + return response_string, response_command_msg diff --git a/src/platecrane_driver/test.py b/src/platecrane_driver/test.py index afc79f4..1d8055f 100644 --- a/src/platecrane_driver/test.py +++ b/src/platecrane_driver/test.py @@ -20,7 +20,59 @@ # print(s.lid_height) - s.remove_lid(source="Solo.Position2", target="LidNest1", plate_type="test_96_well") + # s.home() + + # s.home() + + # print(s.get_location_list()) + # s.set_location("Solo.Position2", 53182, -27797, -413, 834) + # print(s.get_location_list()) + # s.set_location("Solo.Position2", 53182, -27797, -413, 834) + # s.set_location("HidexSafe", 209959, -2500, 490, -262) + # s.transfer("Solo.Position1","Solo.Position2", plate_type="flat_bottom_96well") # works if don't specify plate type (picks up lower) + # s.transfer("Stack1","Solo.Position1", plate_type="flat_bottom_96well", has_lid=True) # works if don't specify plate type (picks up lower) + # s.transfer("Solo.Position1","Hidex.Nest", plate_type="flat_bottom_96well", height_offset = 8) # works if don't specify plate type (picks up lower) + + # s.remove_lid(source="Solo.Position1", target="LidNest1", plate_type="flat_bottom_96well") + # s.replace_lid(source="LidNest1", target="Solo.Position1", plate_type="flat_bottom_96well") + + # s.transfer("Hidex.Nest","Sealer.Nest", plate_type="flat_bottom_96well", height_offset = 8) # works if don't specify plate type (picks up lower) + # s.transfer("Solo.Position2", "Sealer.Nest", plate_type="flat_bottom_96well", has_lid=False) # works if don't specify plate type (picks up lower) + + # DEMO MOVEMENTS + # s.transfer("Stack1", "Solo.Position2", plate_type="flat_bottom_96well", has_lid=True) + # s.remove_lid("Solo.Position2", "LidNest1", plate_type="flat_bottom_96well") + # s.transfer("Solo.Position2", "Hidex.Nest", plate_type="flat_bottom_96well", has_lid=False, height_offset=8) + # s.transfer("Sealer.Nest", "Liconic.Nest", plate_type="flat_bottom_96well", has_lid=False) + # s.transfer("Stack1", "Solo.Position2", plate_type="flat_bottom_96well", has_lid=False) + # s.transfer("Solo.Position2", "Stack1", plate_type="flat_bottom_96well", has_lid=False) + + # s.set_speed(100) + s.transfer( + "Stack1", "Solo.Position2", plate_type="flat_bottom_96well", has_lid=False + ) + # s.transfer("Solo.Position2", "Stack1", plate_type="flat_bottom_96well", has_lid=False) + + # s.transfer("Peeler.Nest", "Hidex.Nest", plate_type="flat_bottom_96well", has_lid=False, height_offset =8) + + # s.transfer("Solo.Position2", "Liconic.Nest", plate_type="flat_bottom_96well", has_lid=False) + + # s.transfer("Peeler.Nest", "Solo.Position2", plate_type="flat_bottom_96well", has_lid=True) # works if don't specify plate type (picks up lower) + # s.transfer("Stack1","Stack2", plate_type="flat_bottom_96well", has_lid=True) # works if don't specify plate type (picks up lower) + + # s.transfer("Solo.Position2","Solo.Position2",source_type="stack",target_type="stack", plate_type="96_well", height_offset=-200) # works if don't specify plate type (picks up lower) + # s.transfer("Stack1","Solo.Position2",source_type="stack",target_type="stack", plate_type="96_well", height_offset=-250) # works if don't specify plate type (picks up lower) + # s.replace_lid("LidNest1", "Solo.Position2", plate_type="flat_bottom_96well") + + # s.transfer("Solo.Position2","Solo.Position1", source_type="stack", target_type="stack", plate_type="flat_bottom_96", height_offset=0) + # s.transfer("Stack1","Solo.Position2",source_type="stack",target_type="stack", plate_type="96_well") # doesn't work with plate type through driver (picks up higher) + # s.get_position() + + # s.transfer("Solo.Position2","Solo.Position1", source_type="stack", target_type="module", plate_type="96_well", height_offset=-250) + # s.transfer("Stack1","Solo.Position2",source_type="stack",target_type="stack", plate_type="96_well") # doesn't work with plate type through driver (picks up higher) + # s.get_position() + + # s.remove_lid(source="Solo.Position2", target="LidNest1", plate_type="test_96_well", height_offset = -100) # print(s.lid_height) # s.move_location("Solo.Position1") @@ -31,7 +83,7 @@ # s.move_location("Safe") # s.move_location("Stack4") - # s.move_single_axis("Z", "Safe", delay_time=1) # move all the way up in z height + # s.move_single_axis("Z", "Safe") # move all the way up in z height # print(s.get_location_list()) @@ -164,7 +216,7 @@ # s.get_location_list() # s.move_joints_neutral() -# s.move_single_axis("R", "Safe", delay_time=1) +# s.move_single_axis("R", "Safe") # s.set_location("Safe",R=195399,Z=0,P=0,Y=0) # s.set_location("LidNest2",R=131719,Z=-31001,P=-5890,Y=-315) # s.transfer(source="LidNest1",target="LidNest2",source_type="stack",target_type="stack", plate_type="96_well") diff --git a/src/platecrane_rest_node.py b/src/platecrane_rest_node.py index b01b8a5..0905208 100644 --- a/src/platecrane_rest_node.py +++ b/src/platecrane_rest_node.py @@ -1,332 +1,135 @@ #! /usr/bin/env python3 """The server for the Hudson Platecrane/Sciclops that takes incoming WEI flow requests from the experiment application""" -import json -import traceback -from argparse import ArgumentParser, Namespace -from contextlib import asynccontextmanager from pathlib import Path +from typing import List, Union -from fastapi import FastAPI -from fastapi.responses import JSONResponse +from fastapi.datastructures import State from platecrane_driver.platecrane_driver import PlateCrane -from wei.core.data_classes import ( - ModuleAbout, - ModuleAction, - ModuleActionArg, - ModuleStatus, - StepResponse, - StepStatus, +from typing_extensions import Annotated +from wei.modules.rest_module import RESTModule +from wei.types.step_types import StepResponse +from wei.utils import extract_version + +rest_module = RESTModule( + name="platecrane_node", + version=extract_version(Path(__file__).parent.parent / "pyproject.toml"), + description="A node to control the Hudson PlateCrane robot", + model="Hudson PlateCrane EX", ) -from wei.helpers import extract_version -global platecrane, state - - -def parse_args() -> Namespace: - """Argument parser""" - parser = ArgumentParser() - parser.add_argument( - "--host", - type=str, - default="0.0.0.0", - help="Hostname that the REST API will be accessible on", +rest_module.arg_parser.add_argument("--device", type=str, default="/dev/ttyUSB0") + +rest_module.state.platecrane = None + + +@rest_module.startup() +def platecrane_startup(state: State): + """Handles initializing the platecrane driver.""" + state.platecrane = None + state.platecrane = PlateCrane(host_path=state.device) + print("PLATECRANE online") + + +@rest_module.action() +def transfer( + state: State, + source: Annotated[ + Union[List[float], str], "The workcell location to grab the plate from" + ], + target: Annotated[ + Union[List[float], str], "The workcell location to place the plate at" + ], + plate_type: Annotated[str, "The type of plate being manipulated"] = "96_well", + height_offset: Annotated[ + int, "Amount to adjust the vertical grip point on the plate, in mm" + ] = 0, + has_lid: Annotated[ + bool, "Whether or not the plate currently has a lid on it" + ] = False, +) -> StepResponse: + """This action picks up a plate from one location and transfers is to another.""" + platecrane: PlateCrane = state.platecrane + platecrane.transfer( + source, + target, + plate_type=plate_type, + height_offset=int(height_offset), + has_lid=has_lid, ) - parser.add_argument("--port", type=int, default=2002) - parser.add_argument("--device", type=str, default="/dev/ttyUSB0") - return parser.parse_args() - - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Initial run function for the app, parses the workcell argument - Parameters - ---------- - app : FastApi - The REST API app being initialized - - Returns - ------- - None""" - global platecrane, state - - args = parse_args() - - try: - platecrane = PlateCrane(host_path=args.device) - - except Exception as error_msg: - traceback.print_exc() - state = "PLATECRANE CONNECTION ERROR" - print("------- PlateCrane Error message: " + str(error_msg) + (" -------")) - - else: - print("PLATECRANE online") - state = ModuleStatus.IDLE - yield - pass - - -app = FastAPI( - lifespan=lifespan, -) - - -@app.get("/state") -def get_state(): - """Returns the current state of the platecrane""" - global platecrane, state - return JSONResponse(content={"State": state}) - - -@app.get("/about") -async def about(): - """Returns a description of the actions and resources the module supports""" - global state - about = ModuleAbout( - name="Hudson Platecrane", - description="Platecrane is a robotic arm module that can pick up and move plates between locations.", - interface="wei_rest_node", - version=extract_version(Path(__file__).parent.parent / "pyproject.toml"), - actions=[ - ModuleAction( - name="transfer", - description="This action picks up a plate from one location and transfers it to another.", - args=[ - ModuleActionArg( - name="source", - description="The workcell location to grab the plate from", - type="List[float], str", - required=True, - ), - ModuleActionArg( - name="target", - description="The workcell location to put the plate at", - type="List[float], str", - required=True, - ), - ModuleActionArg( - name="plate_type", - description="Type of plate.", - type="str", - required=False, - default="96_well", - ), - ], - ), - ModuleAction( - name="remove_lid", - description="This action picks up a plate's lid from one location and places it at another.", - args=[ - ModuleActionArg( - name="source", - description="The workcell location to grab the lid from", - type="List[float], str", - required=True, - ), - ModuleActionArg( - name="target", - description="The workcell location to put the lid", - type="List[float], str", - required=True, - ), - ModuleActionArg( - name="plate_type", - description="Type of plate.", - type="str", - required=False, - default="96_well", - ), - ], - ), - ModuleAction( - name="replace_lid", - description="This action picks up an unattached plate lid and places it on a plate.", - args=[ - ModuleActionArg( - name="source", - description="The workcell location to grab the lid from", - type="List[float], str", - required=True, - ), - ModuleActionArg( - name="target", - description="The workcell location to put the lid", - type="List[float], str", - required=True, - ), - ModuleActionArg( - name="plate_type", - description="Type of plate.", - type="str", - required=False, - default="96_well", - ), - ], - ), - ModuleAction( - name="move_safe", - description="This action moves the arm to a safe location.", - args=[], - ), - ], - resource_pools=[], + return StepResponse.step_succeeded("Transfer complete") + + +@rest_module.action() +def remove_lid( + state: State, + source: Annotated[ + Union[List[float], str], "The workcell location to grab the lib from" + ], + target: Annotated[ + Union[List[float], str], "The workcell location to place the lid at" + ], + plate_type: Annotated[str, "The type of plate the lid is on"] = "96_well", + height_offset: Annotated[ + int, "Amount to adjust the vertical grip point on the plate, in mm" + ] = 0, +): + """This action picks up a plate lid from a plate and transfers is to another location.""" + platecrane: PlateCrane = state.platecrane + platecrane.remove_lid( + source=source, + target=target, + plate_type=plate_type, + height_offset=height_offset, ) - return JSONResponse(content=about.model_dump(mode="json")) - - -@app.get("/resources") -async def resources(): - """Returns the current resources available to the module""" - global platecrane - return JSONResponse(content={"State": platecrane.get_status()}) - - -@app.post("/action") -def do_action(action_handle: str, action_vars): - """ - Runs an action on the module - - Parameters - ---------- - action_handle : str - The name of the action to be performed - action_vars : str - Any arguments necessary to run that action. - This should be a JSON object encoded as a string. - - Returns - ------- - response: StepResponse - A response object containing the result of the action - """ - global state, platecrane - response = StepResponse() - if state == "PLATECRANE CONNECTION ERROR": - message = "Connection error, cannot accept a job!" - response.action_response = StepStatus.FAILED - response.action_log = message - return response - if state == ModuleStatus.ERROR: - return response - - action_args = json.loads(action_vars) - - state = ModuleStatus.BUSY - - source = action_args.get("source") - print("Source location: " + str(source)) - target = action_args.get("target") - print("Target location: " + str(target)) - plate_type = action_args.get("plate_type", "96_well") - print("Plate type: " + str(target)) - height_offset = action_args.get("height_offset", 0) - print("Height Offset: " + str(height_offset)) - - if action_handle == "transfer": - print("Starting the transfer request") - - source_type = action_args.get("source_type", None) - print("Source Type: " + str(source_type)) - - target_type = action_args.get("target_type", None) - print("Target Type: " + str(target_type)) - - if not source_type or not target_type: - print("Please provide source and target transfer types!") - state = ModuleStatus.ERROR + return StepResponse.step_succeeded("Removed lid") + + +@rest_module.action() +def replace_lid( + state: State, + source: Annotated[ + Union[List[float], str], "The workcell location to grab the lib from" + ], + target: Annotated[ + Union[List[float], str], "The workcell location to place the lid at" + ], + plate_type: Annotated[str, "The type of plate the lid is on"] = "96_well", + height_offset: Annotated[ + int, "Amount to adjust the vertical grip point on the plate, in mm" + ] = 0, +): + """This action picks up a plate lid from a location and places it on a plate.""" + platecrane: PlateCrane = state.platecrane + platecrane.replace_lid( + source=source, + target=target, + plate_type=plate_type, + height_offset=height_offset, + ) + return StepResponse.step_succeeded("Replaced lid") - try: - platecrane.transfer( - source, - target, - source_type=source_type.lower(), - target_type=target_type.lower(), - height_offset=int(height_offset), - plate_type=plate_type, - ) - except Exception as err: - response.action_response = StepStatus.FAILED - response.action_log = "Transfer failed. Error:" + str(err) - print(str(err)) - state = ModuleStatus.ERROR - else: - response.action_response = StepStatus.SUCCEEDED - response.action_msg = "Transfer successfully completed" - state = ModuleStatus.IDLE - print("Finished Action: " + action_handle.upper()) - return response - elif action_handle == "remove_lid": - try: - platecrane.remove_lid( - source=source, - target=target, - plate_type=plate_type, - height_offset=height_offset, - ) - except Exception as err: - response.action_response = StepStatus.FAILED - response.action_log = "Remove lid failed. Error:" + str(err) - print(str(err)) - state = ModuleStatus.ERROR - else: - response.action_response = StepStatus.SUCCEEDED - response.action_msg = "Remove lid successfully completed" - state = ModuleStatus.IDLE - print("Finished Action: " + action_handle.upper()) - return response +@rest_module.action() +def move_safe( + state: State, +): + """This action moves the arm to a safe location (the location named "Safe").""" + platecrane: PlateCrane = state.platecrane + platecrane.move_location("Safe") + return StepResponse.step_succeeded("Moved to the Safe position") - elif action_handle == "replace_lid": - try: - platecrane.replace_lid( - source=source, - target=target, - plate_type=plate_type, - height_offset=height_offset, - ) - except Exception as err: - response.action_response = StepStatus.FAILED - response.action_log = "Replace lid failed. Error:" + str(err) - print(str(err)) - state = ModuleStatus.ERROR - else: - response.action_response = StepStatus.SUCCEEDED - response.action_msg = "Replace lid successfully completed" - state = ModuleStatus.IDLE - print("Finished Action: " + action_handle.upper()) - return response - elif action_handle == "move_safe": - try: - platecrane.move_location("Safe") - except Exception as err: - response.action_response = StepStatus.FAILED - response.action_log = "Move Safe Failed. Error:" + str(err) - print(str(err)) - state = ModuleStatus.ERROR - else: - response.action_response = StepStatus.SUCCEEDED - response.action_msg = "Move Safe successfully completed" - state = ModuleStatus.IDLE - print("Finished Action: " + action_handle.upper()) - return response - else: - msg = "UNKNOWN ACTION REQUEST! Available actions: status, home, get_plate" - response.action_response = StepStatus.FAILED - response.action_log = msg - state = ModuleStatus.ERROR - return response +@rest_module.action() +def set_speed( + state: State, + speed: Annotated[int, "The speed at which the arm moves (as a percentage)."], +): + """This action sets the speed at which the plate crane arm moves (as a percentage)""" + platecrane: PlateCrane = state.platecrane + platecrane.set_speed(speed=speed) + return StepResponse.step_succeeded(f"Set speed to {speed}") if __name__ == "__main__": - import uvicorn - - args = parse_args() - - uvicorn.run( - "platecrane_rest_node:app", - host=args.host, - port=args.port, - reload=False, - ) + rest_module.start() diff --git a/src/sciclops_rest_node.py b/src/sciclops_rest_node.py index d266f96..ac1f7ad 100644 --- a/src/sciclops_rest_node.py +++ b/src/sciclops_rest_node.py @@ -8,7 +8,7 @@ from typing_extensions import Annotated from wei.modules.rest_module import RESTModule from wei.types.module_types import ModuleStatus -from wei.types.step_types import ActionRequest, StepResponse, StepStatus +from wei.types.step_types import StepResponse from wei.utils import extract_version rest_module = RESTModule( @@ -42,29 +42,28 @@ def sciclops_startup(state: State): print("SCICLOPS online") state.status = ModuleStatus.IDLE +@rest_module.action(name="status") +def status(state: State): + """Action that forces the sciclops to check its status.""" + sciclops: SCICLOPS = state.sciclops + sciclops.get_status() + return StepResponse.step_succeeded(action_msg="Succesfully got status") -@rest_module.action(name="status", description="force sciclops to check its status") -def status(state: State, action: ActionRequest): - return StepResponse.step_succeeded() - - - -@rest_module.action(name="home", description="force sciclops to check its status") -def home(state: State, action: ActionRequest): +@rest_module.action() +def home(state: State): + """Homes the sciclops""" state.sciclops.home() return StepResponse.step_succeeded() - -@rest_module.action(name="get_plate", description="force sciclops to check its status") +@rest_module.action(name="get_plate") def get_plate( state: State, - action: ActionRequest, pos: Annotated[int, "Stack to get plate from"], lid: Annotated[bool, "Whether plate has a lid or not"] = False, trash: Annotated[bool, "Whether to use the trash"] = False, ): - print(state._state) + """Get a plate from a stack position and move it to transfer point (or trash)""" state.sciclops.get_plate(pos, lid, trash) return StepResponse.step_succeeded() diff --git a/tests/test_module.py b/tests/test_module.py index afa955e..4968814 100644 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -7,7 +7,7 @@ import pytest import requests from wei import ExperimentClient -from wei.core.data_classes import ModuleAbout, WorkcellData, WorkflowStatus +from wei.core.data_classes import ModuleAbout, Workcell, WorkflowStatus class TestWEI_Base(unittest.TestCase): @@ -20,7 +20,7 @@ def __init__(self, *args, **kwargs): self.workcell_file = self.root_dir / Path( "tests/workcell_defs/test_workcell.yaml" ) - self.workcell = WorkcellData.from_yaml(self.workcell_file) + self.workcell = Workcell.from_yaml(self.workcell_file) self.server_host = self.workcell.config.server_host self.server_port = self.workcell.config.server_port self.url = f"http://{self.server_host}:{self.server_port}"