Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abr asair sensor #14445

Merged
merged 13 commits into from
Feb 28, 2024
129 changes: 126 additions & 3 deletions hardware-testing/hardware_testing/scripts/abr_asair_sensor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,129 @@
"""ABR Temperature Humidity Sensors."""
# from hardware_testing import data
# from hardware_testing.drivers import asair_sensor

from hardware_testing import data
from hardware_testing.drivers import asair_sensor
import datetime
import sys
import time as t
from typing import List
import os

# if __name__ == "__main__":

def _get_user_input(lst: List[str], some_string: str) -> str:
variable = input(some_string)
while variable not in lst:
print(
f"Your input was {variable}. Expected input is one of the following: {lst}"
)
variable = input(some_string)
return variable


class _ABRAsairSensor:
def __init__(self, robot: str, duration: int, frequency: int) -> None:
try:
sys.path.insert(0, "/var/lib/jupyter/notebooks")
import google_sheets_tool # type: ignore[import]

credentials_path = "/var/lib/jupyter/notebooks/abr.json"
except ImportError:
raise ImportError(
"Run on robot. Make sure google_sheets_tool.py is in jupyter notebook."
)
print(os.path.exists(credentials_path))
test_name = "ABR-Environment-Monitoring"
run_id = data.create_run_id()
file_name = data.create_file_name(test_name, run_id, robot)
sensor = asair_sensor.BuildAsairSensor(False, True)
print(sensor)
env_data = sensor.get_reading()
header = [
"Robot",
"Timestamp",
"Date",
"Time",
"Temp (oC)",
"Relative Humidity (%)",
]
header_str = ",".join(header) + "\n"
data.append_data_to_file(test_name, run_id, file_name, header_str)
# Upload to google has passed
try:
google_sheet = google_sheets_tool.google_sheet(
credentials_path, "ABR Ambient Conditions", tab_number=0
)
print("Connected to the google sheet.")
except FileNotFoundError:
print(
"There is no google sheets credentials. Make sure credentials in jupyter notebook."
)
results_list = [] # type: List
start_time = datetime.datetime.now()
while True:
env_data = sensor.get_reading()
timestamp = datetime.datetime.now()
# Time adjustment for ABR robot timezone
new_timestamp = timestamp - datetime.timedelta(hours=5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this b/c of the timezone? either leaving a comment to describe what this is doing or somehow specifying the system's timezone would make it more clear

date = new_timestamp.date()
time = new_timestamp.time()
temp = env_data.temperature
print(temp)
rh = env_data.relative_humidity
print(rh)
row = [
robot,
str(new_timestamp),
str(date),
str(time),
temp,
rh,
]
results_list.append(row)
# Check if duration elapsed
elapsed_time = datetime.datetime.now() - start_time
if elapsed_time.total_seconds() >= duration * 60:
break
# write to google sheet
try:
google_sheet.write_header(header)
google_sheet.update_row_index()
google_sheet.write_to_row(row)
print("Wrote row")
except RuntimeError:
print("Did not write row.")
# Delay for desired frequency minutes before the next iteration
t.sleep(frequency * 60) # seconds

# Upload to robot testing data folder
for sublist in results_list:
row_str = ", ".join(map(str, sublist)) + "\n" # type: str
save_file_path = data.append_data_to_file(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you are appending the line to all previous lines, and then you appending that string to the file, and doing this for each line, which means that the file you are writing will be huge and filled with repeated lines.

This can be fixed by removing the result_string variable and just passing the row_str into append_data_to_file().

Then also, you can append each line to the CSV at the same time that you write the data to the google sheet. Just move the append_data_to_file() up into while loop above. Then the CSV will be updated in realtime, plus you don't need to store data in the results_list variable anymore.

test_name, run_id, file_name, row_str
)
print(f"Saved to robot: f{save_file_path}.")
print(
f"Done. Ran for {duration} minutes and collected every {frequency} minutes."
)


if __name__ == "__main__":
robot_list = [
"DVT1ABR1",
"DVT1ABR2",
"DVT1ABR3",
"DVT1ABR4",
"DVT2ABR5",
"DVT2ABR6",
"PVT1ABR7",
"PVT1ABR8",
"PVT1ABR9",
"PVT1ABR10",
"PVT1ABR11",
"PVT1ABR12",
"ROOM_339",
"Room_340",
] # type: List
robot = _get_user_input(robot_list, "Robot/Room: ")
duration = int(input("Duration (min): "))
frequency = int(input("Frequency (min): "))
_ABRAsairSensor(robot, duration, frequency)
2 changes: 2 additions & 0 deletions hardware-testing/hardware_testing/scripts/abr_scale.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"PVT1ABR10",
"PVT1ABR11",
"PVT1ABR12",
"ROOM_339",
"ROOM_340",
]
# Labware per Robot
labware_DVT1ABR4 = [
Expand Down
11 changes: 7 additions & 4 deletions hardware-testing/hardware_testing/scripts/analyze_abr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ def _get_user_input(list: List, some_string: str) -> str:
dir_2 = os.path.join(current_dir, folder_of_interest)
new_csv_file_path = os.path.join(current_dir, results_file_name)
file_list_2 = os.listdir(dir_2) # LIST OF individual run folders
# WRITE HEADER
with open(new_csv_file_path, "w", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(header)
for file2 in file_list_2:
raw_data_folder = os.path.join(dir_2, file2)
raw_data_file_csv = os.listdir(raw_data_folder)[0]
plate_state = raw_data_file_csv.split("_")[-1].split("-")[1].split(".")[0]
sample = raw_data_file_csv.split("_")[-1].split("-")[0]
raw_data_file_csv_path = os.path.join(raw_data_folder, raw_data_file_csv)
results_list = []
try:
with open(raw_data_file_csv_path, "r") as f:
for line in f:
Expand All @@ -54,13 +59,11 @@ def _get_user_input(list: List, some_string: str) -> str:
stable_value,
sample,
)
results_list.append(row_data)

pass
except Exception as e:
print(f"Error opening file: {e}")
# WRITE HEADER
with open(new_csv_file_path, "w", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(header)
with open(new_csv_file_path, "a", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
# Write data
Expand Down
Loading