Skip to content

Commit

Permalink
feat: warning markers are now shown on drones moving horizontally bel…
Browse files Browse the repository at this point in the history
…ow min nav altitude
  • Loading branch information
ntamas committed Apr 22, 2024
1 parent c67a1b6 commit 644436a
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 29 deletions.
14 changes: 13 additions & 1 deletion src/modules/sbstudio/plugin/model/safety_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,14 @@ def min_distance_is_valid(self) -> bool:
# on macOS, hence we need to exclude large min distances
return self.min_distance > 0 and self.min_distance < 100000

@property
def min_navigation_altitude_is_valid(self) -> bool:
"""Retuns whether the minimum navigation altitude property can be
considered valid. Right now we use zero to denote cases when there are
no drones in the scene at all.
"""
return self.min_navigation_altitude > 0

@property
def max_altitude_is_valid(self) -> bool:
"""Retuns whether the maximum altitude property can be considered valid.
Expand Down Expand Up @@ -294,7 +302,11 @@ def should_show_altitude_warning(self) -> bool:
return (
self.altitude_warning_enabled
and self.max_altitude_is_valid
and self.max_altitude > self.altitude_warning_threshold
and self.min_navigation_altitude_is_valid
and (
self.max_altitude > self.altitude_warning_threshold
or self.min_altitude < self.min_navigation_altitude
)
)

@property
Expand Down
94 changes: 66 additions & 28 deletions src/modules/sbstudio/plugin/tasks/safety_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
frame.
"""

from __future__ import annotations

import bpy

from contextlib import contextmanager
from math import hypot
from typing import Iterator
from typing import Iterator, TYPE_CHECKING

from sbstudio.math.nearest_neighbors import find_nearest_neighbors
from sbstudio.model.types import Coordinate3D
from sbstudio.plugin.utils.evaluator import get_position_of_object
from sbstudio.plugin.constants import Collections
from sbstudio.utils import LRUCache
Expand All @@ -18,13 +21,20 @@

from .base import Task

if TYPE_CHECKING:
from bpy.types import Scene


# TODO(ntamas): make the nearest-neighbor calculation debounced when we have
# lots of drones, but currently we are good with, say 100 drones

PositionSnapshot = dict[str, Coordinate3D]
VelocitySnapshot = dict[str, Coordinate3D]


#: Cache that stores the positions in the last few frames visited by the user
#: in the hope that we can estimate the velocities from it in the current frame
_position_snapshot_cache = LRUCache(5)
_position_snapshot_cache: LRUCache[int, PositionSnapshot] = LRUCache(5)

#: Suspension counter. Safety checks are suspended if this counter is positive
_suspension_counter = 0
Expand All @@ -34,14 +44,18 @@
_ZERO = (0.0, 0.0, 0.0)


def create_position_snapshot_for_drones_in_collection(collection, *, frame):
def create_position_snapshot_for_drones_in_collection(
collection, *, frame: int
) -> PositionSnapshot:
"""Create a dictionary mapping the names of the drones in the given
collection to their positions.
"""
return {drone.name: get_position_of_object(drone) for drone in collection.objects}


def estimate_velocities_of_drones_at_frame(snapshot, *, frame, scene):
def estimate_velocities_of_drones_at_frame(
snapshot: PositionSnapshot, *, frame: int, scene: Scene
) -> VelocitySnapshot:
"""Attempts to estimate the velocities of the drones in the given frame,
using the given snapshot for the frame and the current data in the
position snapshot cache.
Expand Down Expand Up @@ -94,7 +108,7 @@ def estimate_velocities_of_drones_at_frame(snapshot, *, frame, scene):


# @debounced(delay=0.1)
def run_safety_check(scene, depsgraph):
def run_safety_check(scene: Scene, depsgraph) -> None:
global _suspension_counter
if _suspension_counter > 0:
return
Expand All @@ -110,11 +124,15 @@ def run_safety_check(scene, depsgraph):
safety_check.clear_safety_check_result()
return

# Get the thresholds from the safety check object
# Get the altitude thresholds from the safety check object
if safety_check.altitude_warning_threshold:
min_altitude = safety_check.min_navigation_altitude
max_altitude = safety_check.altitude_warning_threshold
else:
min_altitude = None
max_altitude = None

# Get the velocity thresholds from the safety check object
if safety_check.velocity_warning_enabled:
max_velocity_xy = safety_check.velocity_xy_warning_threshold
max_velocity_z_down = safety_check.effective_velocity_z_threshold_down
Expand All @@ -130,53 +148,72 @@ def run_safety_check(scene, depsgraph):
storyboard = scene.skybrush.storyboard
formation_status = storyboard.get_formation_status_at_frame(frame)

# Check min/max altitude
# Prepare position snapshot
positions = list(snapshot.values())
if max_altitude is not None:
drones_over_max_altitude = [
position for position in positions if position[2] >= max_altitude
]
else:
drones_over_max_altitude = []

# Prepare velocity snapshot
velocity_snapshot = estimate_velocities_of_drones_at_frame(
snapshot, frame=frame, scene=scene
)
velocities = list(velocity_snapshot.values())

# Find min/max altitude for reporting purposes
max_altitude_found = (
max(position[2] for position in positions) if positions else 0.0
)
min_altitude_found = (
min(position[2] for position in positions) if positions else 0.0
)

# Check max altitude constraint
if max_altitude is not None:
drones_over_max_altitude = [
position for position in positions if position[2] >= max_altitude
]
else:
drones_over_max_altitude = []

# Check nearest neighbors
nearest_neighbors = find_nearest_neighbors(positions)

# Check velocities
max_velocity_xy_found = None
velocity_snapshot = estimate_velocities_of_drones_at_frame(
snapshot, frame=frame, scene=scene
)
velocities = list(velocity_snapshot.values())

# Check velocities in XY direction
max_velocity_xy_found = (
max(hypot(vel[0], vel[1]) for vel in velocities) if velocities else 0.0
)
if max_velocity_xy is not None:
drones_over_max_velocity_xy = [
drones_over_max_velocity_xy = (
[
snapshot.get(name, _ZERO)
for name, vel in velocity_snapshot.items()
if hypot(vel[0], vel[1]) > max_velocity_xy
]
else:
drones_over_max_velocity_xy = []
if max_velocity_xy is not None
else []
)

# Check velocities in Z direction
upper = max(0.0, max(vel[2] for vel in velocities)) if velocities else 0.0
lower = min(0.0, min(vel[2] for vel in velocities)) if velocities else 0.0
if max_velocity_z_up is not None and max_velocity_z_down is not None:
drones_over_max_velocity_z = [
drones_over_max_velocity_z = (
[
snapshot.get(name, _ZERO)
for name, vel in velocity_snapshot.items()
if vel[2] > max_velocity_z_up or vel[2] < -max_velocity_z_down
]
else:
drones_over_max_velocity_z = []
if max_velocity_z_up is not None and max_velocity_z_down is not None
else []
)

# Find drones moving horizontally below min navigation altitude
drones_below_min_nav_altitude = (
[
pos
for name, vel in velocity_snapshot.items()
if (vel[0] != 0 or vel[1] != 0)
and (pos := snapshot.get(name, _ZERO))[2] < min_altitude
]
if min_altitude is not None and min_altitude_found < min_altitude
else []
)

safety_check.set_safety_check_result(
formation_status=formation_status,
Expand All @@ -189,6 +226,7 @@ def run_safety_check(scene, depsgraph):
max_velocity_z_up=upper,
max_velocity_z_down=abs(lower),
drones_over_max_velocity_z=drones_over_max_velocity_z,
drones_below_min_nav_altitude=drones_below_min_nav_altitude,
)


Expand Down

0 comments on commit 644436a

Please sign in to comment.