Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hardware-testing): add provisioning script for the Hepa/UV Module. #14561

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 211 additions & 0 deletions hardware/opentrons_hardware/scripts/provision_hepauv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
#!/usr/bin/env python3
"""Provisions hepauv EEPROMs.

Check warning on line 2 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L2

Added line #L2 was not covered by tests

This can be used either on a production line or locally.

A log of what has been flashed to the hepauv
/var/log/provision_hepauv.log
"""

import re
import asyncio
import logging
import logging.config
import argparse
import struct
from typing import Any, Tuple, Dict, Optional

Check warning on line 16 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L10-L16

Added lines #L10 - L16 were not covered by tests

from opentrons_hardware.instruments.serial_utils import ensure_serial_length
from opentrons_hardware.drivers.can_bus import build, CanMessenger
from opentrons_hardware.firmware_bindings.arbitration_id import ArbitrationId
from opentrons_hardware.firmware_bindings.utils import UInt16Field
from opentrons_hardware.firmware_bindings.messages.messages import MessageDefinition
from opentrons_hardware.firmware_bindings.messages import (

Check warning on line 23 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L18-L23

Added lines #L18 - L23 were not covered by tests
message_definitions as md,
payloads,
fields,
)
from opentrons_hardware.firmware_bindings.constants import NodeId, MessageId
from opentrons_hardware.scripts.can_args import add_can_args, build_settings

Check warning on line 29 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L28-L29

Added lines #L28 - L29 were not covered by tests


log = logging.getLogger(__name__)

Check warning on line 32 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L32

Added line #L32 was not covered by tests


INFO_REGEX_STRING = (

Check warning on line 35 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L35

Added line #L35 was not covered by tests
"^" # start of string
"HUV" # The characters HUV
r"(?P<model>\d{2})" # "model" group contains exactly 2 digits
r"(?P<code>[\w\d]{0,12})" # "code" group contains 0 to 12 inclusive alphanumeric characters
"$" # end of string
)

SERIAL_RE = re.compile(INFO_REGEX_STRING)

Check warning on line 43 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L43

Added line #L43 was not covered by tests


async def get_serial(prompt: str) -> Tuple[int, bytes]:

Check warning on line 46 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L46

Added line #L46 was not covered by tests
"""Get a serial number that is correct and parseable."""
while True:
serial = input(prompt).strip()

Check warning on line 49 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L48-L49

Added lines #L48 - L49 were not covered by tests

# Match the string
matches = SERIAL_RE.match(serial.strip())
if matches:
model = int(matches.group("model"))
data = ensure_serial_length(matches.group("code").encode("ascii"))
if not serial or "y" not in input(

Check warning on line 56 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L52-L56

Added lines #L52 - L56 were not covered by tests
f"read serial '{serial}', write to hepauv? (y/n): "
):
continue
log.info(f"parsed model {model} datecode {data!r} from {serial}")
return model, data
raise RuntimeError(f"Invalid serial number: {serial}")

Check warning on line 62 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L59-L62

Added lines #L59 - L62 were not covered by tests


async def update_serial_and_confirm(

Check warning on line 65 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L65

Added line #L65 was not covered by tests
messenger: CanMessenger,
model: int,
data: bytes,
attempts: int = 3,
) -> bool:
"""Update and verify the update of serial data."""
hepauv_info: Optional[md.HepaUVInfoResponse] = None
event = asyncio.Event()

Check warning on line 73 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L72-L73

Added lines #L72 - L73 were not covered by tests

def _listener(message: MessageDefinition, _: ArbitrationId) -> None:

Check warning on line 75 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L75

Added line #L75 was not covered by tests
nonlocal hepauv_info
if isinstance(message, md.HepaUVInfoResponse):
hepauv_info = message
event.set()

Check warning on line 79 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L77-L79

Added lines #L77 - L79 were not covered by tests

def _filter(arb_id: ArbitrationId) -> bool:
return (NodeId(arb_id.parts.originating_node_id) == NodeId.hepa_uv) and (

Check warning on line 82 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L81-L82

Added lines #L81 - L82 were not covered by tests
MessageId(arb_id.parts.message_id) == MessageId.hepauv_info_response
)

messenger.add_listener(_listener, _filter)

Check warning on line 86 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L86

Added line #L86 was not covered by tests

serial_bytes = struct.pack(">H16s", model, ensure_serial_length(data))
set_message = md.SetSerialNumber(

Check warning on line 89 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L88-L89

Added lines #L88 - L89 were not covered by tests
payload=payloads.SerialNumberPayload(serial=fields.SerialField(serial_bytes))
)
for attempt in range(attempts):
log.debug(

Check warning on line 93 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L92-L93

Added lines #L92 - L93 were not covered by tests
f"beginning set and confirm attempt {attempt} with bytes {serial_bytes!r}"
)
await messenger.send(NodeId.hepa_uv, set_message)
log.debug(f"Sent set-serial: {set_message}")

Check warning on line 97 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L96-L97

Added lines #L96 - L97 were not covered by tests

# wait some time before confirming
await asyncio.sleep(1)

Check warning on line 100 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L100

Added line #L100 was not covered by tests

# confirm that we set the proper serial number
log.info("Confirming serial number")
await messenger.send(node_id=NodeId.hepa_uv, message=md.InstrumentInfoRequest())
try:
await asyncio.wait_for(event.wait(), 1.0)
if (

Check warning on line 107 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L103-L107

Added lines #L103 - L107 were not covered by tests
hepauv_info
and hepauv_info.payload.model == UInt16Field(model)
and hepauv_info.payload.serial == fields.SerialField(data)
):
log.info(f"serial confirmed on attempt {attempt}")
messenger.remove_listener(_listener)
return True
except asyncio.TimeoutError:
log.warning("Instrument info request timed out")

Check warning on line 116 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L112-L116

Added lines #L112 - L116 were not covered by tests

messenger.remove_listener(_listener)
log.error(f"Could not get HepaInfoResponse after {attempts} retries.")
return False

Check warning on line 120 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L118-L120

Added lines #L118 - L120 were not covered by tests


async def _main(args: argparse.Namespace) -> None:

Check warning on line 123 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L123

Added line #L123 was not covered by tests
"""Script entrypoint."""
async with build.driver(build_settings(args)) as driver, CanMessenger(

Check warning on line 125 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L125

Added line #L125 was not covered by tests
driver
) as messenger:
while True:
try:
model, data = await get_serial("Enter serial for hepauv: ")
success = await update_serial_and_confirm(messenger, model, data)
except KeyboardInterrupt:
log.warning("Keyboard Interrupt!")
break
except RuntimeError as e:
log.error(e)
continue

Check warning on line 137 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L128-L137

Added lines #L128 - L137 were not covered by tests

# print result
log.info(f"SUCCESS,{model},{data!r}") if success else log.error(

Check warning on line 140 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L140

Added line #L140 was not covered by tests
f"FAILURE,{model},{data!r}"
)

# breakout if we only want to program once
if args.once:
break

Check warning on line 146 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L145-L146

Added lines #L145 - L146 were not covered by tests


def log_config(log_level: int) -> Dict[str, Any]:

Check warning on line 149 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L149

Added line #L149 was not covered by tests
"""Configure logging."""
return {

Check warning on line 151 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L151

Added line #L151 was not covered by tests
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"basic": {"format": "%(asctime)s %(name)s %(levelname)s %(message)s"}
},
"handlers": {
"stream_handler": {
"class": "logging.StreamHandler",
"formatter": "basic",
"level": log_level,
},
"file_handler": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "basic",
"filename": "/var/log/provision_hepauv.log",
"maxBytes": 5000000,
"level": log_level,
"backupCount": 3,
},
},
"loggers": {
"": {
"handlers": ["stream_handler"]
if log_level > logging.INFO
else ["stream_handler", "file_handler"],
"level": log_level,
},
},
}


if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(

Check warning on line 185 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L183-L185

Added lines #L183 - L185 were not covered by tests
"-l",
"--log-level",
help=(
"Developer logging level. At DEBUG or below, logs are written "
"to console; at INFO or above, logs are only written to "
"provision_hepauv.log"
),
type=str,
choices=["DEBUG", "INFO", "WARNING", "ERROR"],
default="INFO",
)
parser.add_argument(

Check warning on line 197 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L197

Added line #L197 was not covered by tests
"--once",
action="store_true",
help="Run just once and quit instead of staying in a loop",
)
add_can_args(parser)

Check warning on line 202 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L202

Added line #L202 was not covered by tests

args = parser.parse_args()
logging.config.dictConfig(log_config(getattr(logging, args.log_level)))
try:
asyncio.run(_main(args))
except Exception as e:
log.exception(f"Unexpected exception: {e}")

Check warning on line 209 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L204-L209

Added lines #L204 - L209 were not covered by tests
finally:
log.info("Exiting...")

Check warning on line 211 in hardware/opentrons_hardware/scripts/provision_hepauv.py

View check run for this annotation

Codecov / codecov/patch

hardware/opentrons_hardware/scripts/provision_hepauv.py#L211

Added line #L211 was not covered by tests
Loading