diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8719545..65cf79c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -8,7 +8,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.3.5 + rev: v0.8.4 hooks: - id: ruff args: [--fix, --exit-non-zero-on-fix] diff --git a/pyproject.toml b/pyproject.toml index 7f76e0f..2e19bb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,7 +42,10 @@ pyclean = "^2.0.0" [tool.ruff] lint.ignore = ["B027", "B905", "C901", "E402", "E501", "E731"] lint.select = ["B", "C", "E", "F", "W"] -extend-exclude = ["src/modules/sbstudio/vendor"] +extend-exclude = [ + "src/modules/sbstudio/vendor", + "src/modules/sbstudio/i18n/translations.py" +] [build-system] requires = ["poetry-core>=1.0.0"] diff --git a/src/modules/sbstudio/api/base.py b/src/modules/sbstudio/api/base.py index aa0d6c6..135543f 100644 --- a/src/modules/sbstudio/api/base.py +++ b/src/modules/sbstudio/api/base.py @@ -2,6 +2,7 @@ import re from base64 import b64encode +from collections.abc import Iterator, Sequence from contextlib import contextmanager from gzip import compress from http.client import HTTPResponse @@ -10,7 +11,7 @@ from pathlib import Path from shutil import copyfileobj from ssl import create_default_context, CERT_NONE -from typing import Any, Dict, Iterator, List, Optional, Sequence, Tuple +from typing import Any, Optional from urllib.error import HTTPError from urllib.parse import urljoin from urllib.request import Request, urlopen @@ -321,7 +322,7 @@ def decompose_points( *, min_distance: float, method: str = "greedy", - ) -> List[int]: + ) -> list[int]: """Decomposes a set of points into multiple groups while ensuring that the minimum distance of points within the same group is at least as large as the given threshold. @@ -342,44 +343,51 @@ def decompose_points( def export( self, + *, validation: SafetyCheckParams, - trajectories: Dict[str, Trajectory], - lights: Optional[Dict[str, LightProgram]] = None, - yaw_setpoints: Optional[Dict[str, YawSetpointList]] = None, + trajectories: dict[str, Trajectory], + lights: Optional[dict[str, LightProgram]] = None, + yaw_setpoints: Optional[dict[str, YawSetpointList]] = None, output: Optional[Path] = None, show_title: Optional[str] = None, show_type: str = "outdoor", + show_segments: Optional[dict[str, tuple[float, float]]] = None, ndigits: int = 3, timestamp_offset: Optional[float] = None, time_markers: Optional[TimeMarkers] = None, cameras: Optional[list[Camera]] = None, renderer: str = "skyc", - renderer_params: Optional[Dict[str, Any]] = None, + renderer_params: Optional[dict[str, Any]] = None, ) -> Optional[bytes]: - """Export drone show data into Skybrush Compiled Format (.skyc). + """ + Export drone show data. Parameters: - validation: safety check parameters - trajectories: dictionary of trajectories indexed by drone names - lights: dictionary of light programs indexed by drone names - yaw_setpoints: dictionary of yaw setpoints indexed by drone names - output: the file path where the output should be saved or `None` - if the output must be returned instead of saving it to a file - show_title: arbitrary show title; `None` if no title is needed - show_type: type of the show; must be one of `outdoor` or `indoor` - ndigits: round floats to this precision - timestamp_offset: when specified, adds this timestamp offset to the - metadata of the .skyc file, which is then used later for display - purposes in Skybrush Viewer - time_markers: when specified, time markers will be exported to the - .skyc file as temporal cues - cameras: when specified, list of cameras to include in the environment + validation: Safety check parameters. + trajectories: Dictionary of trajectories indexed by drone names. + lights: Dictionary of light programs indexed by drone names. + yaw_setpoints: Dictionary of yaw setpoints indexed by drone names. + output: The file path where the output should be saved or `None` + if the output must be returned instead of saving it to a file. + show_title: Arbitrary show title; `None` if no title is needed. + show_type: Type of the show; must be one of `outdoor` or `indoor`. + show_segments: Dictionary that maps show segment IDs to a start + (inclusive) and end (exclusive) timestamp pair. + ndigits: Round floats to this precision. + timestamp_offset: When specified, adds this timestamp offset to the + show metadata, which can later be used for display purposes in + Skybrush Viewer. + time_markers: When specified, time markers will be exported as + temporal cues. + cameras: When specified, list of cameras to include in the environment. + renderer: The renderer to use to export the show. + renderer_params: Extra parameters for the renderer. Note: drone names must match in trajectories and lights Returns: - the drone show data in .skyc format or `None` if an output filename - was specified + The exported drone show data or `None` if an `output` filename + was specified. """ meta = {} @@ -389,6 +397,9 @@ def export( if timestamp_offset is not None: meta["timestampOffset"] = timestamp_offset + if show_segments is not None: + meta["segments"] = show_segments + if lights is None: lights = {name: LightProgram() for name in trajectories.keys()} @@ -451,7 +462,7 @@ def create_formation_from_svg( num_points: int, size: float, angle: float, - ) -> Tuple[List[Point3D], List[Color3D]]: + ) -> tuple[list[Point3D], list[Color3D]]: """Samples the path objects of an SVG string into a list of coordinates and corresponding colors. @@ -490,7 +501,7 @@ def create_formation_from_svg( def generate_plots( self, - trajectories: Dict[str, Trajectory], + trajectories: dict[str, Trajectory], output: Path, validation: SafetyCheckParams, plots: Sequence[str] = ("pos", "vel", "nn"), @@ -563,7 +574,7 @@ def match_points( target: Sequence[Coordinate3D], *, radius: Optional[float] = None, - ) -> Tuple[Mapping, Optional[float]]: + ) -> tuple[Mapping, Optional[float]]: """Matches the points of a source point set to the points of a target point set in a way that ensures collision-free straight-line trajectories between the matched points when neither the source nor the @@ -594,7 +605,7 @@ def plan_landing( velocity: float, target_altitude: float = 0, spindown_time: float = 5, - ) -> Tuple[List[int], List[int]]: + ) -> tuple[list[int], list[int]]: """Plans the landing trajectories for a set of drones, assuming that they should maintain a given minimum distance while the motors are running and that they land with constant speed. diff --git a/src/modules/sbstudio/plugin/model/light_effects.py b/src/modules/sbstudio/plugin/model/light_effects.py index 0a2e4ab..764e0b2 100644 --- a/src/modules/sbstudio/plugin/model/light_effects.py +++ b/src/modules/sbstudio/plugin/model/light_effects.py @@ -3,18 +3,10 @@ import types import bpy +from collections.abc import Callable, Iterable, Sequence from functools import partial from operator import itemgetter -from typing import ( - cast, - Callable, - Iterable, - List, - Optional, - Sequence, - Tuple, - TYPE_CHECKING, -) +from typing import cast, Optional from bpy.path import abspath from bpy.props import ( @@ -54,9 +46,6 @@ from .mixins import ListMixin -if TYPE_CHECKING: - from sbstudio.api.types import Mapping - from sbstudio.plugin.model import StoryboardEntry __all__ = ("ColorFunctionProperties", "LightEffect", "LightEffectCollection") @@ -159,8 +148,8 @@ def test_is_in_front_of(plane: Optional[Plane], point: Coordinate3D) -> bool: _always_true = constant(True) -def get_color_function_names(self, context: Context) -> List[Tuple[str, str, str]]: - names: List[str] +def get_color_function_names(self, context: Context) -> list[tuple[str, str, str]]: + names: list[str] if self.path: absolute_path = abspath(self.path) @@ -398,7 +387,7 @@ def apply_on_colors( self, colors: Sequence[MutableRGBAColor], positions: Sequence[Coordinate3D], - mapping: Optional[List[int]], + mapping: Optional[list[int]], *, frame: int, random_seq: RandomSequence, @@ -422,7 +411,7 @@ def get_output_based_on_output_type( output_type: str, mapping_mode: str, output_function, - ) -> Tuple[Optional[List[Optional[float]]], Optional[float]]: + ) -> tuple[Optional[list[Optional[float]]], Optional[float]]: """Get the float output(s) for color ramp or image indexing based on the output type. Args: @@ -432,9 +421,9 @@ def get_output_based_on_output_type( Returns: individual and common outputs """ - outputs: Optional[List[Optional[float]]] = None + outputs: Optional[list[Optional[float]]] = None common_output: Optional[float] = None - order: Optional[List[int]] = None + order: Optional[list[int]] = None if output_type == "FIRST_COLOR": common_output = 0.0 diff --git a/src/modules/sbstudio/plugin/model/storyboard.py b/src/modules/sbstudio/plugin/model/storyboard.py index d97cb00..9117b57 100644 --- a/src/modules/sbstudio/plugin/model/storyboard.py +++ b/src/modules/sbstudio/plugin/model/storyboard.py @@ -1,8 +1,13 @@ from __future__ import annotations -import bpy +import enum import json +from dataclasses import dataclass +from operator import attrgetter +from uuid import uuid4 +from typing import TYPE_CHECKING, Optional +import bpy from bpy.props import ( BoolProperty, CollectionProperty, @@ -12,9 +17,6 @@ StringProperty, ) from bpy.types import PropertyGroup -from operator import attrgetter -from uuid import uuid4 -from typing import Dict, List, Optional, Tuple, TYPE_CHECKING from sbstudio.api.types import Mapping from sbstudio.plugin.constants import ( @@ -32,7 +34,12 @@ if TYPE_CHECKING: from bpy.types import bpy_prop_collection, Collection, Context -__all__ = ("ScheduleOverride", "StoryboardEntry", "Storyboard") +__all__ = ( + "ScheduleOverride", + "StoryboardEntry", + "Storyboard", + "StoryboardEntryPurpose", +) class ScheduleOverride(PropertyGroup): @@ -78,7 +85,7 @@ class ScheduleOverride(PropertyGroup): @property def label(self) -> str: - parts: List[str] = [f"@{self.index}"] + parts: list[str] = [f"@{self.index}"] if self.pre_delay != 0: parts.append(f"dep {self.pre_delay}") if self.post_delay != 0: @@ -108,6 +115,28 @@ def _set_frame_end(self: StoryboardEntry, value: int) -> None: self.duration = value - self.frame_start +@dataclass(frozen=True) +class _StoryboardEntryPurposeMixin: + ui_name: str + order: int + + +class StoryboardEntryPurpose(_StoryboardEntryPurposeMixin, enum.Enum): + """ + Storyboard entry purposes in the order in which they can follow each-other. + + The `name` of the enum is used to identify values in/for Blender. + """ + + TAKEOFF = "Takeoff", 1 + SHOW = "Show", 2 + LANDING = "Landing", 3 + + @property + def bpy_enum_item(self) -> tuple[str, str, str, int]: + return (self.name, self.ui_name, "", self.order) + + class StoryboardEntry(PropertyGroup): """Blender property group representing a single entry in the storyboard of the drone show. @@ -187,6 +216,22 @@ class StoryboardEntry(PropertyGroup): default="SYNCHRONIZED", options=set(), ) + + purpose = EnumProperty( + items=[ + StoryboardEntryPurpose.TAKEOFF.bpy_enum_item, + StoryboardEntryPurpose.SHOW.bpy_enum_item, + StoryboardEntryPurpose.LANDING.bpy_enum_item, + ], + name="Purpose", + description=( + "The purpose of the entry in the show. A valid show must start with 0 or more " + "takeoff entries, followed by any number of show entries, and end with 0 or more " + "landing entries." + ), + default=StoryboardEntryPurpose.SHOW.name, + ) + pre_delay_per_drone_in_frames = FloatProperty( name="Departure delay", description=( @@ -319,13 +364,13 @@ def is_staggered(self) -> bool: """Whether the transition is staggered.""" return self.transition_schedule == "STAGGERED" - def get_enabled_schedule_override_map(self) -> Dict[int, ScheduleOverride]: + def get_enabled_schedule_override_map(self) -> dict[int, ScheduleOverride]: """Returns a dictionary mapping indices of markers in the source formation to the corresponding transition schedule overrides. Only enabled schedule overrides are considered. """ - result: Dict[int, ScheduleOverride] = {} + result: dict[int, ScheduleOverride] = {} if self.schedule_overrides_enabled: for override in self.schedule_overrides: @@ -432,6 +477,7 @@ def add_new_entry( frame_start: Optional[int] = None, duration: Optional[int] = None, *, + purpose: StoryboardEntryPurpose = StoryboardEntryPurpose.SHOW, formation: Optional[Collection] = None, select: bool = False, context: Optional[Context] = None, @@ -446,6 +492,7 @@ def add_new_entry( sensible default duration: the duration of the new entry; `None` chooses a sensible default + purpose: The purpose of the entry. formation: the formation that the newly added entry should refer to select: whether to select the newly added entry after it was created @@ -473,10 +520,11 @@ def add_new_entry( if duration is None or duration < 0: duration = fps * DEFAULT_STORYBOARD_ENTRY_DURATION - entry = self.entries.add() + entry: StoryboardEntry = self.entries.add() entry.frame_start = frame_start entry.duration = duration entry.name = name + entry.purpose = purpose.name if ( formation is None @@ -686,7 +734,7 @@ def get_formation_status_at_frame(self, frame: int) -> str: def get_frame_range_of_formation_or_transition_at_frame( self, frame: int - ) -> Optional[Tuple[int, int]]: + ) -> Optional[tuple[int, int]]: """Returns the start and end frame of the current formation or transition that contains the given frame. @@ -760,7 +808,7 @@ def second_entry(self) -> Optional[StoryboardEntry]: else: return None - def validate_and_sort_entries(self) -> List[StoryboardEntry]: + def validate_and_sort_entries(self) -> list[StoryboardEntry]: """Validates the entries in the storyboard and sorts them by start time, keeping the active entry index point at the same entry as before. @@ -774,8 +822,36 @@ def validate_and_sort_entries(self) -> List[StoryboardEntry]: entries = list(self.entries) entries.sort(key=StoryboardEntry.sort_key) - # Check that entries do not overlap - for index, (entry, next_entry) in enumerate(zip(entries, entries[1:])): + for validator in ( + self._validate_entry_sequence, + self._validate_formation_size_contraints, + ): + validator(entries) + + active_entry = self.active_entry + self._sort_entries() + self.active_entry = active_entry + + # Retrieve the entries again because _sort_entries() might have changed + # the ordering + return sorted(self.entries, key=StoryboardEntry.sort_key) + + def _validate_entry_sequence(self, sorted_entries: list[StoryboardEntry]) -> None: + """ + Validates the given entry sequence by ensuring consecutive items can follow each-other. + + Validation rules: + + - Entries must not overlap. + - Entry purposes are in the correct order. + + Raises: + StoryboardValidationError: If validation fails. + """ + for index, (entry, next_entry) in enumerate( + zip(sorted_entries, sorted_entries[1:]) + ): + # -- No overlap. if entry.frame_end >= next_entry.frame_start: raise StoryboardValidationError( f"Storyboard entry {entry.name!r} at index {index + 1} and " @@ -783,10 +859,30 @@ def validate_and_sort_entries(self) -> List[StoryboardEntry]: f"{next_entry.name!r}" ) - # Check sizes of constraints + # -- Purposes are in the correct order. + entry_purpose, next_purpose = ( + StoryboardEntryPurpose[entry.purpose], + StoryboardEntryPurpose[next_entry.purpose], + ) + if entry_purpose.order > next_purpose.order: + raise StoryboardValidationError( + f"Storyboard entry {entry_purpose.name!r} has purpose " + f"{StoryboardEntryPurpose[entry_purpose].ui_name}, which can not be followed by " + f"a {StoryboardEntryPurpose[ next_purpose].ui_name} entry." + ) + + def _validate_formation_size_contraints( + self, sorted_entries: list[StoryboardEntry] + ) -> None: + """ + Validates that the given entries satisfy formation size constraints. + + Raises: + StoryboardValidationError: If validation fails. + """ drones = Collections.find_drones(create=False) num_drones = len(drones.objects) if drones else 0 - for entry in entries: + for entry in sorted_entries: formation = entry.formation if formation is None: continue @@ -803,14 +899,6 @@ def validate_and_sort_entries(self) -> List[StoryboardEntry]: f"Storyboard entry {entry.name!r} contains a formation with {num_markers} drones but {msg}" ) - active_entry = self.active_entry - self._sort_entries() - self.active_entry = active_entry - - # Retrieve the entries again because _sort_entries() might have changed - # the ordering - return sorted(self.entries, key=StoryboardEntry.sort_key) - def _on_active_entry_moving_down(self, this_entry, next_entry) -> bool: pad = next_entry.frame_start - this_entry.frame_end @@ -838,7 +926,7 @@ def _sort_entries(self) -> None: @with_context -def get_storyboard(context: Optional[Context] = None) -> Storyboard: +def get_storyboard(*, context: Optional[Context] = None) -> Storyboard: """Helper function to retrieve the storyboard of the add-on from the given context object. """ diff --git a/src/modules/sbstudio/plugin/operators/create_new_storyboard_entry.py b/src/modules/sbstudio/plugin/operators/create_new_storyboard_entry.py index ea84448..23fd2fa 100644 --- a/src/modules/sbstudio/plugin/operators/create_new_storyboard_entry.py +++ b/src/modules/sbstudio/plugin/operators/create_new_storyboard_entry.py @@ -1,5 +1,14 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + from .base import StoryboardOperator +if TYPE_CHECKING: + from bpy.types import Context + + from sbstudio.plugin.model.storyboard import Storyboard + __all__ = ("CreateNewStoryboardEntryOperator",) @@ -10,6 +19,6 @@ class CreateNewStoryboardEntryOperator(StoryboardOperator): bl_label = "Create New Storyboard Entry" bl_description = "Creates a new storyboard entry at the end of the storyboard." - def execute_on_storyboard(self, storyboard, context): + def execute_on_storyboard(self, storyboard: Storyboard, context: Context): storyboard.add_new_entry(name="Untitled", select=True) return {"FINISHED"} diff --git a/src/modules/sbstudio/plugin/operators/create_takeoff_grid.py b/src/modules/sbstudio/plugin/operators/create_takeoff_grid.py index 0cf7d1d..09fa92a 100644 --- a/src/modules/sbstudio/plugin/operators/create_takeoff_grid.py +++ b/src/modules/sbstudio/plugin/operators/create_takeoff_grid.py @@ -4,7 +4,6 @@ from bpy.types import Operator from numpy import array, column_stack, mgrid, repeat, tile, zeros -from typing import List from sbstudio.model.types import Coordinate3D from sbstudio.plugin.constants import Collections, Formations, Templates @@ -13,7 +12,7 @@ create_keyframe_for_diffuse_color_of_material, ) from sbstudio.plugin.model.formation import add_points_to_formation, create_formation -from sbstudio.plugin.model.storyboard import get_storyboard +from sbstudio.plugin.model.storyboard import StoryboardEntryPurpose, get_storyboard from sbstudio.plugin.operators.detach_materials_from_template import ( detach_material_from_drone_template, ) @@ -64,7 +63,7 @@ def create_points_of_takeoff_grid( spacing_col: float = 1, intra_slot_spacing_row: float = 0.5, intra_slot_spacing_col: float = 0.5, -) -> List[Coordinate3D]: +) -> list[Coordinate3D]: """Creates the points of a takeoff grid centered at the given coordinate. Parameters: @@ -390,6 +389,7 @@ def _run(self, context): formation=create_formation(Formations.TAKEOFF_GRID, points), frame_start=context.scene.frame_start, duration=0, + purpose=StoryboardEntryPurpose.TAKEOFF, select=True, context=context, ) diff --git a/src/modules/sbstudio/plugin/operators/land.py b/src/modules/sbstudio/plugin/operators/land.py index 35a68c5..c1c76fe 100644 --- a/src/modules/sbstudio/plugin/operators/land.py +++ b/src/modules/sbstudio/plugin/operators/land.py @@ -11,7 +11,11 @@ from sbstudio.plugin.constants import Collections from sbstudio.plugin.model.formation import create_formation from sbstudio.plugin.model.safety_check import get_proximity_warning_threshold -from sbstudio.plugin.model.storyboard import get_storyboard +from sbstudio.plugin.model.storyboard import ( + Storyboard, + StoryboardEntryPurpose, + get_storyboard, +) from sbstudio.plugin.utils.evaluator import create_position_evaluator from sbstudio.plugin.utils.transition import find_transition_constraint_between @@ -68,14 +72,14 @@ class LandOperator(StoryboardOperator): ) @classmethod - def poll(cls, context): + def poll(cls, context: Context): if not super().poll(context): return False drones = Collections.find_drones(create=False) return drones is not None and len(drones.objects) > 0 - def invoke(self, context, event): + def invoke(self, context: Context, event): self.start_frame = max( context.scene.frame_current, get_storyboard(context=context).frame_end ) @@ -89,7 +93,7 @@ def execute_on_storyboard(self, storyboard, entries, context): success = False return {"FINISHED"} if success else {"CANCELLED"} - def _run(self, storyboard, *, context) -> bool: + def _run(self, storyboard: Storyboard, *, context: Context) -> bool: bpy.ops.skybrush.prepare() if not self._validate_start_frame(context): @@ -166,6 +170,7 @@ def _run(self, storyboard, *, context) -> bool: frame_start=end_of_landing, duration=0, select=True, + purpose=StoryboardEntryPurpose.LANDING, context=context, ) assert entry is not None diff --git a/src/modules/sbstudio/plugin/operators/prepare.py b/src/modules/sbstudio/plugin/operators/prepare.py index ba0ccfc..708f63b 100644 --- a/src/modules/sbstudio/plugin/operators/prepare.py +++ b/src/modules/sbstudio/plugin/operators/prepare.py @@ -1,6 +1,6 @@ from bpy.types import Operator -from sbstudio.plugin.constants import Collections, Templates +from sbstudio.plugin.constants import Collections from sbstudio.plugin.objects import link_object_to_scene from sbstudio.plugin.state import get_file_specific_state diff --git a/src/modules/sbstudio/plugin/operators/remove_storyboard_entry.py b/src/modules/sbstudio/plugin/operators/remove_storyboard_entry.py index b239d60..51ed166 100644 --- a/src/modules/sbstudio/plugin/operators/remove_storyboard_entry.py +++ b/src/modules/sbstudio/plugin/operators/remove_storyboard_entry.py @@ -1,9 +1,18 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + from sbstudio.plugin.constants import Collections from sbstudio.plugin.model.storyboard import get_storyboard from sbstudio.plugin.utils.transition import find_transition_constraint_between from .base import StoryboardOperator +if TYPE_CHECKING: + from bpy.types import Context + + from sbstudio.plugin.model.storyboard import Storyboard, StoryboardEntry + __all__ = ("RemoveStoryboardEntryOperator",) @@ -15,19 +24,19 @@ class RemoveStoryboardEntryOperator(StoryboardOperator): bl_description = "Remove the selected entry from the storyboard" @classmethod - def poll(cls, context): + def poll(cls, context: Context): return ( StoryboardOperator.poll(context) and get_storyboard(context=context).active_entry is not None ) - def execute_on_storyboard(self, storyboard, context): + def execute_on_storyboard(self, storyboard: Storyboard, context: Context): remove_constraints_for_storyboard_entry(storyboard.active_entry) storyboard.remove_active_entry() return {"FINISHED"} -def remove_constraints_for_storyboard_entry(entry): +def remove_constraints_for_storyboard_entry(entry: StoryboardEntry): if not entry: return diff --git a/src/modules/sbstudio/plugin/operators/return_to_home.py b/src/modules/sbstudio/plugin/operators/return_to_home.py index 6643a71..621c5e8 100644 --- a/src/modules/sbstudio/plugin/operators/return_to_home.py +++ b/src/modules/sbstudio/plugin/operators/return_to_home.py @@ -1,3 +1,5 @@ +from collections.abc import Sequence + import bpy from bpy.props import FloatProperty, IntProperty, BoolProperty @@ -6,6 +8,7 @@ from math import ceil, sqrt from sbstudio.errors import SkybrushStudioError +from sbstudio.model.types import Coordinate3D from sbstudio.plugin.api import call_api_from_blender_operator from sbstudio.plugin.constants import Collections from sbstudio.plugin.actions import ( @@ -14,7 +17,11 @@ ) from sbstudio.plugin.model.formation import create_formation, get_markers_from_formation from sbstudio.plugin.model.safety_check import get_proximity_warning_threshold -from sbstudio.plugin.model.storyboard import get_storyboard +from sbstudio.plugin.model.storyboard import ( + Storyboard, + StoryboardEntryPurpose, + get_storyboard, +) from sbstudio.plugin.utils.evaluator import create_position_evaluator from .base import StoryboardOperator @@ -96,14 +103,14 @@ class ReturnToHomeOperator(StoryboardOperator): ) @classmethod - def poll(cls, context): + def poll(cls, context: Context): if not super().poll(context): return False drones = Collections.find_drones(create=False) return drones is not None and len(drones.objects) > 0 - def draw(self, context): + def draw(self, context: Context): layout = self.layout layout.use_property_split = True @@ -135,7 +142,7 @@ def invoke(self, context, event): ) return context.window_manager.invoke_props_dialog(self) - def execute_on_storyboard(self, storyboard, entries, context): + def execute_on_storyboard(self, storyboard: Storyboard, entries, context: Context): try: success = self._run(storyboard, context=context) except SkybrushStudioError: @@ -146,7 +153,7 @@ def execute_on_storyboard(self, storyboard, entries, context): def _should_use_smart_rth(self) -> bool: return self.use_smart_rth and is_smart_rth_enabled_globally() - def _run(self, storyboard, *, context) -> bool: + def _run(self, storyboard: Storyboard, *, context: Context) -> bool: bpy.ops.skybrush.prepare() if not self._validate_start_frame(context): @@ -173,134 +180,156 @@ def _run(self, storyboard, *, context) -> bool: min_distance=get_proximity_warning_threshold(context), operator=self, ) + + run_rth = self._run_smart_rth if use_smart_rth else self._run_base_rth + result = run_rth(storyboard, source=source, target=target, context=context) + + # Recalculate the transition leading to the target formation + bpy.ops.skybrush.recalculate_transitions(scope="TO_SELECTED") + return result + + def _run_base_rth( + self, + storyboard: Storyboard, + *, + source: Sequence[Coordinate3D], + target: Sequence[Coordinate3D], + context: Context, + ) -> bool: fps = context.scene.render.fps + diffs = [ + sqrt((s[0] - t[0]) ** 2 + (s[1] - t[1]) ** 2 + (s[2] - t[2]) ** 2) + for s, t in zip(source, target) + ] + + # Calculate RTH duration from max distance to travel and the + # average velocity + max_distance = max(diffs) + rth_duration = ceil((max_distance / self.velocity) * fps) + + # Extend the duration of the last formation to the frame where we want + # to start the RTH maneuver + if len(storyboard.entries) > 0: + storyboard.last_entry.extend_until(self.start_frame) + + # Calculate when the RTH should end + end_of_rth = self.start_frame + rth_duration + + # Add a new storyboard entry with the given formation + storyboard.add_new_entry( + formation=create_formation("Return to home", target), + frame_start=end_of_rth, + duration=0, + select=True, + purpose=StoryboardEntryPurpose.LANDING, + context=context, + ) - if use_smart_rth: - # Set up non-trivial parameters - # TODO: get them as explicit parameter if needed - settings = getattr(context.scene.skybrush, "settings", None) - max_acceleration = settings.max_acceleration if settings else 4 - min_distance = get_proximity_warning_threshold(context) - land_speed = min(self.velocity_z, 0.5) - - # call API to create smart RTH plan - with call_api_from_blender_operator(self) as api: - plan = api.plan_smart_rth( - source, - target, - max_velocity_xy=self.velocity, - max_velocity_z=self.velocity_z, - max_acceleration=max_acceleration, - min_distance=min_distance, - rth_model="straight_line_with_neck", - ) - if not plan.start_times or not plan.durations: - return False - - # Add a new storyboard entry for the smart RTH formation - # TODO: What should happen if there is already a formation with the - # same name? - entry = storyboard.add_new_entry( - formation=create_formation("Smart return to home", source), - frame_start=self.start_frame, - duration=int(ceil((plan.duration + self.altitude / land_speed) * fps)), - select=True, - context=context, - ) - assert entry is not None - markers = get_markers_from_formation(entry.formation) - - # generate smart RTH trajectories in the new formation - for start_time, duration, inner_points, p, q, marker in zip( - plan.start_times, - plan.durations, - plan.inner_points, + def _run_smart_rth( + self, + storyboard: Storyboard, + *, + source: Sequence[Coordinate3D], + target: Sequence[Coordinate3D], + context: Context, + ) -> bool: + fps = context.scene.render.fps + + # Set up non-trivial parameters + # TODO: get them as explicit parameter if needed + settings = getattr(context.scene.skybrush, "settings", None) + max_acceleration = settings.max_acceleration if settings else 4 + min_distance = get_proximity_warning_threshold(context) + land_speed = min(self.velocity_z, 0.5) + + # call API to create smart RTH plan + with call_api_from_blender_operator(self) as api: + plan = api.plan_smart_rth( source, target, - markers, - strict=True, - ): - action = ensure_action_exists_for_object( - marker, name=f"Animation data for {marker.name}", clean=True - ) + max_velocity_xy=self.velocity, + max_velocity_z=self.velocity_z, + max_acceleration=max_acceleration, + min_distance=min_distance, + rth_model="straight_line_with_neck", + ) + if not plan.start_times or not plan.durations: + return False - f_curves = [] - for i in range(3): - f_curve = find_f_curve_for_data_path_and_index( - action, "location", i - ) - if f_curve is None: - f_curve = action.fcurves.new("location", index=i) - else: - # We should clear the keyframes that fall within the - # range of our keyframes. Currently it's not needed because - # it's a freshly created marker so it can't have any - # keyframes that we don't know about. - print(f"Already existing F-curve! {marker.name} {i}") - pass - f_curves.append(f_curve) - insert = [ - partial(f_curve.keyframe_points.insert, options={"FAST"}) - for f_curve in f_curves - ] - path_points = [] - if start_time > 0: - path_points.append((0, *p)) - path_points.append((start_time, *p)) - path_points.extend(tuple(inner_points)) - path_points.extend( + # Add a new storyboard entry for the smart RTH formation + # TODO: What should happen if there is already a formation with the + # same name? + entry = storyboard.add_new_entry( + formation=create_formation("Smart return to home", source), + frame_start=self.start_frame, + duration=int(ceil((plan.duration + self.altitude / land_speed) * fps)), + select=True, + purpose=StoryboardEntryPurpose.LANDING, + context=context, + ) + assert entry is not None + markers = get_markers_from_formation(entry.formation) + + # generate smart RTH trajectories in the new formation + for start_time, duration, inner_points, p, q, marker in zip( + plan.start_times, + plan.durations, + plan.inner_points, + source, + target, + markers, + strict=True, + ): + action = ensure_action_exists_for_object( + marker, name=f"Animation data for {marker.name}", clean=True + ) + + f_curves = [] + for i in range(3): + f_curve = find_f_curve_for_data_path_and_index(action, "location", i) + if f_curve is None: + f_curve = action.fcurves.new("location", index=i) + else: + # We should clear the keyframes that fall within the + # range of our keyframes. Currently it's not needed because + # it's a freshly created marker so it can't have any + # keyframes that we don't know about. + print(f"Already existing F-curve! {marker.name} {i}") + pass + f_curves.append(f_curve) + insert = [ + partial(f_curve.keyframe_points.insert, options={"FAST"}) + for f_curve in f_curves + ] + path_points = [] + if start_time > 0: + path_points.append((0, *p)) + path_points.append((start_time, *p)) + path_points.extend(tuple(inner_points)) + path_points.extend( + ( + (start_time + duration, *q), ( - (start_time + duration, *q), - ( - start_time + duration + self.altitude / land_speed, - q[0], - q[1], - 0, # TODO: starting position would be better than explicit 0 - ), - ) + start_time + duration + self.altitude / land_speed, + q[0], + q[1], + 0, # TODO: starting position would be better than explicit 0 + ), ) - for point in path_points: - frame = int(self.start_frame + point[0] * fps) - keyframes = ( - insert[0](frame, point[1]), - insert[1](frame, point[2]), - insert[2](frame, point[3]), - ) - for keyframe in keyframes: - keyframe.interpolation = "LINEAR" - # Commit the insertions that we've made in "fast" mode - for f_curve in f_curves: - f_curve.update() - else: - diffs = [ - sqrt((s[0] - t[0]) ** 2 + (s[1] - t[1]) ** 2 + (s[2] - t[2]) ** 2) - for s, t in zip(source, target) - ] - - # Calculate RTH duration from max distance to travel and the - # average velocity - max_distance = max(diffs) - rth_duration = ceil((max_distance / self.velocity) * fps) - - # Extend the duration of the last formation to the frame where we want - # to start the RTH maneuver - if len(storyboard.entries) > 0: - storyboard.last_entry.extend_until(self.start_frame) - - # Calculate when the RTH should end - end_of_rth = self.start_frame + rth_duration - - # Add a new storyboard entry with the given formation - storyboard.add_new_entry( - formation=create_formation("Return to home", target), - frame_start=end_of_rth, - duration=0, - select=True, - context=context, ) + for point in path_points: + frame = int(self.start_frame + point[0] * fps) + keyframes = ( + insert[0](frame, point[1]), + insert[1](frame, point[2]), + insert[2](frame, point[3]), + ) + for keyframe in keyframes: + keyframe.interpolation = "LINEAR" + # Commit the insertions that we've made in "fast" mode + for f_curve in f_curves: + f_curve.update() - # Recalculate the transition leading to the target formation - bpy.ops.skybrush.recalculate_transitions(scope="TO_SELECTED") return True def _validate_start_frame(self, context: Context) -> bool: diff --git a/src/modules/sbstudio/plugin/operators/run_full_proximity_check.py b/src/modules/sbstudio/plugin/operators/run_full_proximity_check.py index 0ce6319..e93e501 100644 --- a/src/modules/sbstudio/plugin/operators/run_full_proximity_check.py +++ b/src/modules/sbstudio/plugin/operators/run_full_proximity_check.py @@ -1,4 +1,3 @@ -from numpy import array, float32 from typing import Literal from bpy.types import Context, Operator diff --git a/src/modules/sbstudio/plugin/operators/takeoff.py b/src/modules/sbstudio/plugin/operators/takeoff.py index 6630fa9..0a44c79 100644 --- a/src/modules/sbstudio/plugin/operators/takeoff.py +++ b/src/modules/sbstudio/plugin/operators/takeoff.py @@ -4,7 +4,6 @@ from bpy.types import Context from math import ceil, inf -from sbstudio.api.errors import SkybrushStudioAPIError from sbstudio.errors import SkybrushStudioError from sbstudio.math.nearest_neighbors import find_nearest_neighbors from sbstudio.plugin.api import call_api_from_blender_operator, get_api @@ -14,7 +13,11 @@ ensure_formation_consists_of_points, ) from sbstudio.plugin.model.safety_check import get_proximity_warning_threshold -from sbstudio.plugin.model.storyboard import get_storyboard, Storyboard +from sbstudio.plugin.model.storyboard import ( + Storyboard, + StoryboardEntryPurpose, + get_storyboard, +) from sbstudio.plugin.operators.recalculate_transitions import ( RecalculationTask, recalculate_transitions, @@ -103,7 +106,7 @@ def invoke(self, context: Context, event): self.start_frame = int(max(min(context.scene.frame_current, end), start)) return context.window_manager.invoke_props_dialog(self) - def execute_on_storyboard(self, storyboard, entries, context: Context): + def execute_on_storyboard(self, storyboard: Storyboard, entries, context: Context): try: success = self._run(storyboard, context=context) except SkybrushStudioError: @@ -176,6 +179,7 @@ def _run(self, storyboard: Storyboard, *, context: Context) -> bool: formation=create_formation(Formations.TAKEOFF_GRID, source), frame_start=self.start_frame, duration=0, + purpose=StoryboardEntryPurpose.TAKEOFF, select=False, context=context, ) @@ -193,6 +197,7 @@ def _run(self, storyboard: Storyboard, *, context: Context) -> bool: formation=create_formation(Formations.TAKEOFF, target), frame_start=end_of_takeoff, duration=0, + purpose=StoryboardEntryPurpose.TAKEOFF, select=True, context=context, ) diff --git a/src/modules/sbstudio/plugin/operators/utils.py b/src/modules/sbstudio/plugin/operators/utils.py index d1fb9fb..4a7467f 100644 --- a/src/modules/sbstudio/plugin/operators/utils.py +++ b/src/modules/sbstudio/plugin/operators/utils.py @@ -5,16 +5,22 @@ from bpy.path import basename from bpy.types import Context +from itertools import groupby from natsort import natsorted from operator import attrgetter from pathlib import Path -from typing import Dict, Optional, Tuple +from typing import Any, Optional, cast from sbstudio.api.base import SkybrushStudioAPI from sbstudio.model.light_program import LightProgram from sbstudio.model.safety_check import SafetyCheckParams from sbstudio.model.trajectory import Trajectory from sbstudio.model.yaw import YawSetpointList +from sbstudio.plugin.model.storyboard import ( + StoryboardEntry, + StoryboardEntryPurpose, + get_storyboard, +) from sbstudio.plugin.constants import Collections from sbstudio.plugin.errors import SkybrushStudioExportWarning from sbstudio.model.file_formats import FileFormat @@ -22,6 +28,7 @@ from sbstudio.plugin.tasks.light_effects import suspended_light_effects from sbstudio.plugin.tasks.safety_check import suspended_safety_checks from sbstudio.plugin.utils import with_context +from sbstudio.plugin.utils.cameras import get_cameras_from_context from sbstudio.plugin.utils.sampling import ( frame_range, sample_colors_of_objects, @@ -31,8 +38,7 @@ sample_positions_colors_and_yaw_of_objects, ) from sbstudio.plugin.utils.time_markers import get_time_markers_from_context -from sbstudio.plugin.utils.cameras import get_cameras_from_context - +from sbstudio.utils import get_ends __all__ = ("get_drones_to_export", "export_show_to_file_using_api") @@ -40,6 +46,11 @@ log = logging.getLogger(__name__) +class _default_settings: + output_fps = 4 + light_output_fps = 4 + + ################################################################################ # Helper functions for exporter operators @@ -70,7 +81,7 @@ def get_drones_to_export(selected_only: bool = False): @with_context def _get_frame_range_from_export_settings( settings, *, context: Optional[Context] = None -) -> Optional[Tuple[int, int]]: +) -> Optional[tuple[int, int]]: """Returns the range of frames to export, based on the chosen export settings of the user. @@ -84,14 +95,59 @@ def _get_frame_range_from_export_settings( return resolve_frame_range(settings["frame_range"], context=context) +@with_context +def _get_segments(context: Optional[Context] = None) -> dict[str, tuple[float, float]]: + """ + Returns dictionary that maps show segment IDs to start (inclusive) and end (exclusive) timestamps. + + If invalid configuration is found for a segment, then the segment will be omitted + from the result. + """ + result: dict[str, tuple[float, float]] = {} + storyboard = get_storyboard(context=context) + fps = context.scene.render.fps + + entry_purpose_groups = groupby(storyboard.entries, lambda e: cast(str, e.purpose)) + + takeoff_entries: list[StoryboardEntry] | None = None + show_entries: list[StoryboardEntry] | None = None + landing_entries: list[StoryboardEntry] | None = None + show_valid = True + for purpose, entries in entry_purpose_groups: + if purpose == StoryboardEntryPurpose.TAKEOFF.name: + if not (show_entries is None and landing_entries is None): + show_valid = False + break + + takeoff_entries = list(entries) + elif purpose == StoryboardEntryPurpose.SHOW.name: + if landing_entries is not None: + show_valid = False + break + + show_entries = list(entries) + elif purpose == StoryboardEntryPurpose.LANDING.name: + landing_entries = list(entries) + + if show_valid: + if ends := get_ends(takeoff_entries): + result["takeoff"] = (ends[0].frame_start * fps, ends[1].frame_end * fps) + if ends := get_ends(show_entries): + result["show"] = (ends[0].frame_start * fps, ends[1].frame_end * fps) + if ends := get_ends(landing_entries): + result["landing"] = (ends[0].frame_start * fps, ends[1].frame_end * fps) + + return result + + @with_context def _get_trajectories_and_lights( drones, - settings: Dict, - bounds: Tuple[int, int], + settings: dict[str, Any], + bounds: tuple[int, int], *, context: Optional[Context] = None, -) -> Tuple[Dict[str, Trajectory], Dict[str, LightProgram]]: +) -> tuple[dict[str, Trajectory], dict[str, LightProgram]]: """Get trajectories and LED lights of all selected/picked objects. Parameters: @@ -103,11 +159,11 @@ def _get_trajectories_and_lights( Returns: dictionary of Trajectory and LightProgram objects indexed by object names """ - trajectory_fps = settings.get("output_fps", 4) - light_fps = settings.get("light_output_fps", 4) + trajectory_fps = settings.get("output_fps", _default_settings.output_fps) + light_fps = settings.get("light_output_fps", _default_settings.light_output_fps) - trajectories: Dict[str, Trajectory] - lights: Dict[str, LightProgram] + trajectories: dict[str, Trajectory] + lights: dict[str, LightProgram] if trajectory_fps == light_fps: # This is easy, we can iterate over the show once @@ -156,11 +212,11 @@ def _get_trajectories_and_lights( @with_context def _get_trajectories_lights_and_yaw_setpoints( drones, - settings: Dict, - bounds: Tuple[int, int], + settings: dict[str, Any], + bounds: tuple[int, int], *, context: Optional[Context] = None, -) -> Tuple[Dict[str, Trajectory], Dict[str, LightProgram], Dict[str, YawSetpointList]]: +) -> tuple[dict[str, Trajectory], dict[str, LightProgram], dict[str, YawSetpointList]]: """Get trajectories, LED lights and yaw setpoints of all selected/picked objects. Parameters: @@ -172,12 +228,12 @@ def _get_trajectories_lights_and_yaw_setpoints( Returns: dictionary of Trajectory, LightProgram and YawSetpointList objects indexed by object names """ - trajectory_fps = settings.get("output_fps", 4) - light_fps = settings.get("light_output_fps", 4) + trajectory_fps = settings.get("output_fps", _default_settings.output_fps) + light_fps = settings.get("light_output_fps", _default_settings.light_output_fps) - trajectories: Dict[str, Trajectory] - lights: Dict[str, LightProgram] - yaw_setpoints: Dict[str, YawSetpointList] + trajectories: dict[str, Trajectory] + lights: dict[str, LightProgram] + yaw_setpoints: dict[str, YawSetpointList] if trajectory_fps == light_fps: # This is easy, we can iterate over the show once @@ -236,7 +292,7 @@ def _get_trajectories_lights_and_yaw_setpoints( def export_show_to_file_using_api( api: SkybrushStudioAPI, context: Context, - settings: Dict, + settings: dict[str, Any], filepath: Path, format: FileFormat, ) -> None: @@ -269,7 +325,7 @@ def export_show_to_file_using_api( raise SkybrushStudioExportWarning("Selected frame range is empty") # determine list of drones to export - export_selected_only = settings.get("export_selected", False) + export_selected_only: bool = settings.get("export_selected", False) drones = list(get_drones_to_export(selected_only=export_selected_only)) if not drones: if export_selected_only: @@ -282,7 +338,7 @@ def export_show_to_file_using_api( ) # get yaw control enabled state - use_yaw_control = settings.get("use_yaw_control", False) + use_yaw_control: bool = settings.get("use_yaw_control", False) # get trajectories, light programs and yaw setpoints if use_yaw_control: @@ -335,6 +391,9 @@ def export_show_to_file_using_api( min_distance=safety_check.proximity_warning_threshold if safety_check else 3, ) + # get show segments + show_segments = _get_segments(context=context) + renderer_params = {} if "min_nav_altitude" in settings: renderer_params = {"min_nav_altitude": settings["min_nav_altitude"]} @@ -343,7 +402,7 @@ def export_show_to_file_using_api( if format is FileFormat.PDF: log.info("Exporting validation plots to .pdf") plots = settings.get("plots", ["pos", "vel", "nn"]) - fps = settings.get("output_fps", 4) + fps = settings.get("output_fps", _default_settings.output_fps) api.generate_plots( trajectories=trajectories, output=filepath, @@ -404,6 +463,7 @@ def export_show_to_file_using_api( api.export( show_title=show_title, show_type=show_type, + show_segments=show_segments, validation=validation, trajectories=trajectories, lights=lights, diff --git a/src/modules/sbstudio/plugin/panels/storyboard_editor.py b/src/modules/sbstudio/plugin/panels/storyboard_editor.py index 191b10b..a0e6b3e 100644 --- a/src/modules/sbstudio/plugin/panels/storyboard_editor.py +++ b/src/modules/sbstudio/plugin/panels/storyboard_editor.py @@ -77,6 +77,7 @@ def draw(self, context): col.prop(entry, "duration") col.prop(entry, "frame_end") col.prop(entry, "is_name_customized") + col.prop(entry, "purpose") col.popover( "OBJECT_PT_skybrush_transition_editor_pre", icon="TRACKING_CLEAR_BACKWARDS", diff --git a/src/modules/sbstudio/plugin/tasks/light_effects.py b/src/modules/sbstudio/plugin/tasks/light_effects.py index 066ccdc..95884f2 100644 --- a/src/modules/sbstudio/plugin/tasks/light_effects.py +++ b/src/modules/sbstudio/plugin/tasks/light_effects.py @@ -17,7 +17,6 @@ if TYPE_CHECKING: from bpy.types import Depsgraph, Scene - from sbstudio.api.types import Mapping __all__ = ("UpdateLightEffectsTask",) diff --git a/src/modules/sbstudio/utils.py b/src/modules/sbstudio/utils.py index e54bdf4..5e0ff93 100644 --- a/src/modules/sbstudio/utils.py +++ b/src/modules/sbstudio/utils.py @@ -1,10 +1,10 @@ import importlib.util from collections import OrderedDict -from collections.abc import MutableMapping +from collections.abc import Callable, Iterable, MutableMapping, Sequence from functools import wraps from pathlib import Path -from typing import Any, Callable, Generic, List, Sequence, TypeVar +from typing import Any, Generic, Optional, TypeVar from sbstudio.model.types import Coordinate3D @@ -46,6 +46,28 @@ def distance_sq_of(p: Coordinate3D, q: Coordinate3D) -> float: return (p[0] - q[0]) ** 2 + (p[1] - q[1]) ** 2 + (p[2] - q[2]) ** 2 +def get_ends(items: Optional[Iterable[T]]) -> tuple[T, T] | None: + """ + Returns the first and last item from the given iterable as a tuple if the + iterable is not empty, otherwise returns `None`. + + If the iterable contains only one item, then first and last will be that one item. + """ + if items is None: + return None + + iterator = iter(items) + try: + first = last = next(iterator) + except StopIteration: + return None + + for item in iterator: + last = item + + return (first, last) + + def negate(func: Callable[..., bool]) -> Callable[..., bool]: """Decorator that takes a function that returns a Boolean value and returns another function that returns the negation of the result of the original @@ -60,7 +82,7 @@ def new_func(*args, **kwds) -> bool: def simplify_path( - points: Sequence[T], *, eps: float, distance_func: Callable[[List[T], T, T], float] + points: Sequence[T], *, eps: float, distance_func: Callable[[list[T], T, T], float] ) -> Sequence[T]: """Simplifies a sequence of points to a similar sequence with fewer points, using a distance function and an acceptable error term.