diff --git a/src/modules/sbstudio/plugin/model/safety_check.py b/src/modules/sbstudio/plugin/model/safety_check.py index 62f549f..59e93da 100644 --- a/src/modules/sbstudio/plugin/model/safety_check.py +++ b/src/modules/sbstudio/plugin/model/safety_check.py @@ -266,6 +266,14 @@ def min_distance_is_valid(self) -> bool: # on macOS, hence we need to exclude large min distances return self.min_distance > 0 and self.min_distance < 100000 + @property + def min_navigation_altitude_is_valid(self) -> bool: + """Retuns whether the minimum navigation altitude property can be + considered valid. Right now we use zero to denote cases when there are + no drones in the scene at all. + """ + return self.min_navigation_altitude > 0 + @property def max_altitude_is_valid(self) -> bool: """Retuns whether the maximum altitude property can be considered valid. @@ -294,7 +302,11 @@ def should_show_altitude_warning(self) -> bool: return ( self.altitude_warning_enabled and self.max_altitude_is_valid - and self.max_altitude > self.altitude_warning_threshold + and self.min_navigation_altitude_is_valid + and ( + self.max_altitude > self.altitude_warning_threshold + or self.min_altitude < self.min_navigation_altitude + ) ) @property diff --git a/src/modules/sbstudio/plugin/tasks/safety_check.py b/src/modules/sbstudio/plugin/tasks/safety_check.py index d43e3b2..a68a635 100644 --- a/src/modules/sbstudio/plugin/tasks/safety_check.py +++ b/src/modules/sbstudio/plugin/tasks/safety_check.py @@ -3,13 +3,16 @@ frame. """ +from __future__ import annotations + import bpy from contextlib import contextmanager from math import hypot -from typing import Iterator +from typing import Iterator, TYPE_CHECKING from sbstudio.math.nearest_neighbors import find_nearest_neighbors +from sbstudio.model.types import Coordinate3D from sbstudio.plugin.utils.evaluator import get_position_of_object from sbstudio.plugin.constants import Collections from sbstudio.utils import LRUCache @@ -18,13 +21,20 @@ from .base import Task +if TYPE_CHECKING: + from bpy.types import Scene + # TODO(ntamas): make the nearest-neighbor calculation debounced when we have # lots of drones, but currently we are good with, say 100 drones +PositionSnapshot = dict[str, Coordinate3D] +VelocitySnapshot = dict[str, Coordinate3D] + + #: Cache that stores the positions in the last few frames visited by the user #: in the hope that we can estimate the velocities from it in the current frame -_position_snapshot_cache = LRUCache(5) +_position_snapshot_cache: LRUCache[int, PositionSnapshot] = LRUCache(5) #: Suspension counter. Safety checks are suspended if this counter is positive _suspension_counter = 0 @@ -34,14 +44,18 @@ _ZERO = (0.0, 0.0, 0.0) -def create_position_snapshot_for_drones_in_collection(collection, *, frame): +def create_position_snapshot_for_drones_in_collection( + collection, *, frame: int +) -> PositionSnapshot: """Create a dictionary mapping the names of the drones in the given collection to their positions. """ return {drone.name: get_position_of_object(drone) for drone in collection.objects} -def estimate_velocities_of_drones_at_frame(snapshot, *, frame, scene): +def estimate_velocities_of_drones_at_frame( + snapshot: PositionSnapshot, *, frame: int, scene: Scene +) -> VelocitySnapshot: """Attempts to estimate the velocities of the drones in the given frame, using the given snapshot for the frame and the current data in the position snapshot cache. @@ -94,7 +108,7 @@ def estimate_velocities_of_drones_at_frame(snapshot, *, frame, scene): # @debounced(delay=0.1) -def run_safety_check(scene, depsgraph): +def run_safety_check(scene: Scene, depsgraph) -> None: global _suspension_counter if _suspension_counter > 0: return @@ -110,11 +124,15 @@ def run_safety_check(scene, depsgraph): safety_check.clear_safety_check_result() return - # Get the thresholds from the safety check object + # Get the altitude thresholds from the safety check object if safety_check.altitude_warning_threshold: + min_altitude = safety_check.min_navigation_altitude max_altitude = safety_check.altitude_warning_threshold else: + min_altitude = None max_altitude = None + + # Get the velocity thresholds from the safety check object if safety_check.velocity_warning_enabled: max_velocity_xy = safety_check.velocity_xy_warning_threshold max_velocity_z_down = safety_check.effective_velocity_z_threshold_down @@ -130,14 +148,16 @@ def run_safety_check(scene, depsgraph): storyboard = scene.skybrush.storyboard formation_status = storyboard.get_formation_status_at_frame(frame) - # Check min/max altitude + # Prepare position snapshot positions = list(snapshot.values()) - if max_altitude is not None: - drones_over_max_altitude = [ - position for position in positions if position[2] >= max_altitude - ] - else: - drones_over_max_altitude = [] + + # Prepare velocity snapshot + velocity_snapshot = estimate_velocities_of_drones_at_frame( + snapshot, frame=frame, scene=scene + ) + velocities = list(velocity_snapshot.values()) + + # Find min/max altitude for reporting purposes max_altitude_found = ( max(position[2] for position in positions) if positions else 0.0 ) @@ -145,38 +165,55 @@ def run_safety_check(scene, depsgraph): min(position[2] for position in positions) if positions else 0.0 ) + # Check max altitude constraint + if max_altitude is not None: + drones_over_max_altitude = [ + position for position in positions if position[2] >= max_altitude + ] + else: + drones_over_max_altitude = [] + # Check nearest neighbors nearest_neighbors = find_nearest_neighbors(positions) - # Check velocities - max_velocity_xy_found = None - velocity_snapshot = estimate_velocities_of_drones_at_frame( - snapshot, frame=frame, scene=scene - ) - velocities = list(velocity_snapshot.values()) - + # Check velocities in XY direction max_velocity_xy_found = ( max(hypot(vel[0], vel[1]) for vel in velocities) if velocities else 0.0 ) - if max_velocity_xy is not None: - drones_over_max_velocity_xy = [ + drones_over_max_velocity_xy = ( + [ snapshot.get(name, _ZERO) for name, vel in velocity_snapshot.items() if hypot(vel[0], vel[1]) > max_velocity_xy ] - else: - drones_over_max_velocity_xy = [] + if max_velocity_xy is not None + else [] + ) + # Check velocities in Z direction upper = max(0.0, max(vel[2] for vel in velocities)) if velocities else 0.0 lower = min(0.0, min(vel[2] for vel in velocities)) if velocities else 0.0 - if max_velocity_z_up is not None and max_velocity_z_down is not None: - drones_over_max_velocity_z = [ + drones_over_max_velocity_z = ( + [ snapshot.get(name, _ZERO) for name, vel in velocity_snapshot.items() if vel[2] > max_velocity_z_up or vel[2] < -max_velocity_z_down ] - else: - drones_over_max_velocity_z = [] + if max_velocity_z_up is not None and max_velocity_z_down is not None + else [] + ) + + # Find drones moving horizontally below min navigation altitude + drones_below_min_nav_altitude = ( + [ + pos + for name, vel in velocity_snapshot.items() + if (vel[0] != 0 or vel[1] != 0) + and (pos := snapshot.get(name, _ZERO))[2] < min_altitude + ] + if min_altitude is not None and min_altitude_found < min_altitude + else [] + ) safety_check.set_safety_check_result( formation_status=formation_status, @@ -189,6 +226,7 @@ def run_safety_check(scene, depsgraph): max_velocity_z_up=upper, max_velocity_z_down=abs(lower), drones_over_max_velocity_z=drones_over_max_velocity_z, + drones_below_min_nav_altitude=drones_below_min_nav_altitude, )