Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abr asair sensor #14445

Merged
merged 13 commits into from
Feb 28, 2024
126 changes: 123 additions & 3 deletions hardware-testing/hardware_testing/scripts/abr_asair_sensor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,126 @@
"""ABR Temperature Humidity Sensors."""
# from hardware_testing import data
# from hardware_testing.drivers import asair_sensor

from hardware_testing import data
from hardware_testing.drivers import asair_sensor
import datetime
import sys
import time as t
from typing import List

# if __name__ == "__main__":

try:
sys.path.insert(0, "/var/lib/jupyter/notebooks")
import google_sheets_tool # type: ignore[import]
except ImportError:
raise ImportError(
"Run on robot. Make sure google_sheets_tool.py is in jupyter notebook."
)


def _get_user_input(list: List, some_string: str) -> str:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list is actually a predefined term in python - it is the name of the list type. defining a variable called list will shadow the global name in the scope where the variable is defined, which would lead to surprising results if you wanted to build a list in this function. Would you mind calling this something else, like lst or input_list or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to lst

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelatedly, List is a generic container type that is meant to be specialized with the type it contains - for instance, List[str] is a list of strings, List[int] is a list of ints.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added List[str]

variable = input(some_string)
while variable not in list:
print(
f"Your input was {variable}. Expected input is one of the following: {list}"
)
variable = input(some_string)
return variable


class _abr_asair_sensor:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, common python style is for classes to be PascalCase - in this case, _ABRAsairSensor if the class isn't meant to be exported and ABRAsairSensor if it is

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed class name _ABRAsairSensor

def __init__(self, robot: str, duration: int, frequency: int) -> None:
test_name = "ABR-Environment-Monitoring"
run_id = data.create_run_id()
file_name = data.create_file_name(test_name, run_id, robot)
sensor = asair_sensor.BuildAsairSensor(False, True)
env_data = sensor.get_reading()
header = [
"Robot",
"Timestamp",
"Date",
"Time",
"Temp (oC)",
"Relative Humidity (%)",
]
header_str = ",".join(header) + "\n"
data.append_data_to_file(test_name, run_id, file_name, header_str)
# Upload to google has passed
credentials_path = "/var/lib/jupyter/notebooks/abr.json"
try:
google_sheet = google_sheets_tool.google_sheet(
credentials_path, "ABR Ambient Conditions", tab_number=0
)
print("Connected to the google sheet.")
except FileNotFoundError:
print(
"There is no google sheets credentials. Make sure credentials in jupyter notebook."
)
results_list = [] # type: List
start_time = datetime.datetime.now()
while True:
env_data = sensor.get_reading()
timestamp = datetime.datetime.now()
new_timestamp = timestamp - datetime.timedelta(hours=5)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this b/c of the timezone? either leaving a comment to describe what this is doing or somehow specifying the system's timezone would make it more clear

date = new_timestamp.date()
time = new_timestamp.time()
temp = env_data.temperature
print(temp)
rh = env_data.relative_humidity
print(rh)
row = [
robot,
str(new_timestamp),
str(date),
str(time),
temp,
rh,
]
results_list.append(row)
# Check if duration elapsed
elapsed_time = datetime.datetime.now() - start_time
if elapsed_time.total_seconds() >= duration * 60:
break
# write to google sheet
try:
google_sheet.write_header(header)
google_sheet.update_row_index()
google_sheet.write_to_row(row)
print("Wrote row")
except RuntimeError:
print("Did not write row.")
# Delay for desired frequency minutes before the next iteration
t.sleep(frequency * 60) # seconds

# Upload to robot testing data folder
result_string = ""
for sublist in results_list:
row_str = ", ".join(map(str, sublist)) + "\n" # type: str
result_string += row_str
save_file_path = data.append_data_to_file(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here you are appending the line to all previous lines, and then you appending that string to the file, and doing this for each line, which means that the file you are writing will be huge and filled with repeated lines.

This can be fixed by removing the result_string variable and just passing the row_str into append_data_to_file().

Then also, you can append each line to the CSV at the same time that you write the data to the google sheet. Just move the append_data_to_file() up into while loop above. Then the CSV will be updated in realtime, plus you don't need to store data in the results_list variable anymore.

test_name, run_id, file_name, result_string
)
print(f"Saved to robot: f{save_file_path}.")
print(
f"Done. Ran for {duration} minutes and collected every {frequency} minutes."
)


if __name__ == "__main__":
robot_list = [
"DVT1ABR1",
"DVT1ABR2",
"DVT1ABR3",
"DVT1ABR4",
"DVT2ABR5",
"DVT2ABR6",
"PVT1ABR7",
"PVT1ABR8",
"PVT1ABR9",
"PVT1ABR10",
"PVT1ABR11",
"PVT1ABR12",
] # type: List
robot = _get_user_input(robot_list, "Robot: ")
duration = int(input("Duration (min): "))
frequency = int(input("Frequency (min): "))
_abr_asair_sensor(robot, duration, frequency)
11 changes: 7 additions & 4 deletions hardware-testing/hardware_testing/scripts/analyze_abr.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ def _get_user_input(list: List, some_string: str) -> str:
dir_2 = os.path.join(current_dir, folder_of_interest)
new_csv_file_path = os.path.join(current_dir, results_file_name)
file_list_2 = os.listdir(dir_2) # LIST OF individual run folders
# WRITE HEADER
with open(new_csv_file_path, "w", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(header)
for file2 in file_list_2:
raw_data_folder = os.path.join(dir_2, file2)
raw_data_file_csv = os.listdir(raw_data_folder)[0]
plate_state = raw_data_file_csv.split("_")[-1].split("-")[1].split(".")[0]
sample = raw_data_file_csv.split("_")[-1].split("-")[0]
raw_data_file_csv_path = os.path.join(raw_data_folder, raw_data_file_csv)
results_list = []
try:
with open(raw_data_file_csv_path, "r") as f:
for line in f:
Expand All @@ -54,13 +59,11 @@ def _get_user_input(list: List, some_string: str) -> str:
stable_value,
sample,
)
results_list.append(row_data)

pass
except Exception as e:
print(f"Error opening file: {e}")
# WRITE HEADER
with open(new_csv_file_path, "w", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
csv_writer.writerow(header)
with open(new_csv_file_path, "a", newline="") as csv_file:
csv_writer = csv.writer(csv_file)
# Write data
Expand Down
Loading