diff --git a/inference/core/workflows/core_steps/analytics/velocity/__init__.py b/inference/core/workflows/core_steps/analytics/velocity/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/inference/core/workflows/core_steps/analytics/velocity/v1.py b/inference/core/workflows/core_steps/analytics/velocity/v1.py new file mode 100644 index 000000000..ed820b216 --- /dev/null +++ b/inference/core/workflows/core_steps/analytics/velocity/v1.py @@ -0,0 +1,216 @@ +from typing import Dict, List, Optional, Tuple, Union + +import numpy as np +import supervision as sv +from pydantic import ConfigDict, Field +from typing_extensions import Literal, Type + +from inference.core.workflows.execution_engine.entities.base import ( + OutputDefinition, + VideoMetadata, +) +from inference.core.workflows.execution_engine.entities.types import ( + FLOAT_KIND, + INSTANCE_SEGMENTATION_PREDICTION_KIND, + OBJECT_DETECTION_PREDICTION_KIND, + StepOutputImageSelector, + StepOutputSelector, + WorkflowImageSelector, + WorkflowParameterSelector, + WorkflowVideoMetadataSelector, +) +from inference.core.workflows.prototypes.block import ( + BlockResult, + WorkflowBlock, + WorkflowBlockManifest, +) + +OUTPUT_KEY: str = "velocity_detections" +SHORT_DESCRIPTION = "Calculate the velocity and speed of tracked objects with smoothing and unit conversion." +LONG_DESCRIPTION = """ +The `VelocityBlock` computes the velocity and speed of objects tracked across video frames. +It includes options to smooth the velocity and speed measurements over time and to convert units from pixels per second to meters per second. +It requires detections from Byte Track with unique `tracker_id` assigned to each object, which persists between frames. +The velocities are calculated based on the displacement of object centers over time. +""" + + +class VelocityManifest(WorkflowBlockManifest): + model_config = ConfigDict( + json_schema_extra={ + "name": "Velocity", + "version": "v1", + "short_description": SHORT_DESCRIPTION, + "long_description": LONG_DESCRIPTION, + "license": "Apache-2.0", + "block_type": "analytics", + } + ) + type: Literal["roboflow_core/velocity@v1"] + metadata: WorkflowVideoMetadataSelector + detections: StepOutputSelector( + kind=[ + OBJECT_DETECTION_PREDICTION_KIND, + INSTANCE_SEGMENTATION_PREDICTION_KIND, + ] + ) = Field( # type: ignore + description="Predictions", + examples=["$steps.object_detection_model.predictions"], + ) + smoothing_alpha: Union[float, WorkflowParameterSelector(kind=[FLOAT_KIND])] = Field( # type: ignore + default=0.5, + description="Smoothing factor (alpha) for exponential moving average (0 < alpha <= 1). Lower alpha means more smoothing.", + examples=[0.5], + ) + pixels_per_meter: Union[float, WorkflowParameterSelector(kind=[FLOAT_KIND])] = Field( # type: ignore + default=1.0, + description="Conversion from pixels to meters. Velocity will be converted to meters per second using this value.", + examples=[0.01], # Example: 1 pixel = 0.01 meters + ) + + @classmethod + def describe_outputs(cls) -> List[OutputDefinition]: + return [ + OutputDefinition( + name=OUTPUT_KEY, + kind=[ + OBJECT_DETECTION_PREDICTION_KIND, + INSTANCE_SEGMENTATION_PREDICTION_KIND, + ], + ), + ] + + @classmethod + def get_execution_engine_compatibility(cls) -> Optional[str]: + return ">=1.0.0,<2.0.0" + + +class VelocityBlockV1(WorkflowBlock): + def __init__(self): + # Store previous positions and timestamps for each tracker_id + self._previous_positions: Dict[ + str, Dict[Union[int, str], Tuple[np.ndarray, float]] + ] = {} + # Store smoothed velocities for each tracker_id + self._smoothed_velocities: Dict[str, Dict[Union[int, str], np.ndarray]] = {} + + @classmethod + def get_manifest(cls) -> Type[WorkflowBlockManifest]: + return VelocityManifest + + def run( + self, + detections: sv.Detections, + metadata: VideoMetadata, + smoothing_alpha: float, + pixels_per_meter: float, + ) -> BlockResult: + if detections.tracker_id is None: + raise ValueError( + "tracker_id not initialized, VelocityBlock requires detections to be tracked" + ) + if not (0 < smoothing_alpha <= 1): + raise ValueError( + "smoothing_alpha must be between 0 (exclusive) and 1 (inclusive)" + ) + if not (pixels_per_meter > 0): + raise ValueError("pixels_per_meter must be greater than 0") + + if metadata.comes_from_video_file and metadata.fps != 0: + ts_current = metadata.frame_number / metadata.fps + else: + ts_current = metadata.frame_timestamp.timestamp() + + video_id = metadata.video_identifier + previous_positions = self._previous_positions.setdefault(video_id, {}) + smoothed_velocities = self._smoothed_velocities.setdefault(video_id, {}) + + num_detections = len(detections) + + # Compute current positions (center of bounding boxes) + bbox_xyxy = detections.xyxy # Shape (num_detections, 4) + x_centers = (bbox_xyxy[:, 0] + bbox_xyxy[:, 2]) / 2 + y_centers = (bbox_xyxy[:, 1] + bbox_xyxy[:, 3]) / 2 + current_positions = np.stack( + [x_centers, y_centers], axis=1 + ) # Shape (num_detections, 2) + + velocities = np.zeros_like(current_positions) # Shape (num_detections, 2) + speeds = np.zeros(num_detections) # Shape (num_detections,) + smoothed_velocities_arr = np.zeros_like(current_positions) + smoothed_speeds = np.zeros(num_detections) + + for i, tracker_id in enumerate(detections.tracker_id): + current_position = current_positions[i] + + # Ensure tracker_id is of type int or str + tracker_id = int(tracker_id) + + if tracker_id in previous_positions: + prev_position, prev_timestamp = previous_positions[tracker_id] + delta_time = ts_current - prev_timestamp + + if delta_time > 0: + displacement = current_position - prev_position + velocity = displacement / delta_time # Pixels per second + speed = np.linalg.norm( + velocity + ) # Speed is the magnitude of velocity vector + else: + velocity = np.array([0, 0]) + speed = 0.0 + else: + velocity = np.array([0, 0]) # No previous position + speed = 0.0 + + # Apply exponential moving average for smoothing + if tracker_id in smoothed_velocities: + prev_smoothed_velocity = smoothed_velocities[tracker_id] + smoothed_velocity = ( + smoothing_alpha * velocity + + (1 - smoothing_alpha) * prev_smoothed_velocity + ) + else: + smoothed_velocity = velocity # Initialize with current velocity + + smoothed_speed = np.linalg.norm(smoothed_velocity) + + # Store current position and timestamp for the next frame + previous_positions[tracker_id] = (current_position, ts_current) + smoothed_velocities[tracker_id] = smoothed_velocity + + # Convert velocities and speeds to meters per second if required + velocity_m_s = velocity / pixels_per_meter + smoothed_velocity_m_s = smoothed_velocity / pixels_per_meter + speed_m_s = speed / pixels_per_meter + smoothed_speed_m_s = smoothed_speed / pixels_per_meter + + velocities[i] = velocity_m_s + speeds[i] = speed_m_s + smoothed_velocities_arr[i] = smoothed_velocity_m_s + smoothed_speeds[i] = smoothed_speed_m_s + + # Add velocity and speed to detections.data + # Ensure that 'data' is a dictionary for each detection + if detections.data is None: + detections.data = {} + + # Initialize dictionaries if not present + if "velocity" not in detections.data: + detections.data["velocity"] = {} + if "speed" not in detections.data: + detections.data["speed"] = {} + if "smoothed_velocity" not in detections.data: + detections.data["smoothed_velocity"] = {} + if "smoothed_speed" not in detections.data: + detections.data["smoothed_speed"] = {} + + # Assign velocity data to the corresponding tracker_id + detections.data["velocity"][tracker_id] = velocity_m_s.tolist() # [vx, vy] + detections.data["speed"][tracker_id] = speed_m_s # Scalar + detections.data["smoothed_velocity"][ + tracker_id + ] = smoothed_velocity_m_s.tolist() # [vx, vy] + detections.data["smoothed_speed"][tracker_id] = smoothed_speed_m_s # Scalar + + return {OUTPUT_KEY: detections} diff --git a/inference/core/workflows/core_steps/loader.py b/inference/core/workflows/core_steps/loader.py index efc67723f..25905313e 100644 --- a/inference/core/workflows/core_steps/loader.py +++ b/inference/core/workflows/core_steps/loader.py @@ -28,6 +28,7 @@ from inference.core.workflows.core_steps.analytics.time_in_zone.v2 import ( TimeInZoneBlockV2, ) +from inference.core.workflows.core_steps.analytics.velocity.v1 import VelocityBlockV1 from inference.core.workflows.core_steps.classical_cv.camera_focus.v1 import ( CameraFocusBlockV1, ) @@ -445,6 +446,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]: ReferencePathVisualizationBlockV1, ByteTrackerBlockV3, WebhookSinkBlockV1, + VelocityBlockV1, ] diff --git a/tests/inference/unit_tests/usage_tracking/test_collector.py b/tests/inference/unit_tests/usage_tracking/test_collector.py index 18f75ff7d..f701a0b73 100644 --- a/tests/inference/unit_tests/usage_tracking/test_collector.py +++ b/tests/inference/unit_tests/usage_tracking/test_collector.py @@ -809,7 +809,11 @@ def test_zip_usage_payloads_with_different_exec_session_ids(): def test_system_info_with_dedicated_deployment_id(): # given - system_info = UsageCollector.system_info(ip_address="w.x.y.z", hostname="hostname01", dedicated_deployment_id="deployment01") + system_info = UsageCollector.system_info( + ip_address="w.x.y.z", + hostname="hostname01", + dedicated_deployment_id="deployment01", + ) # then expected_system_info = { @@ -823,7 +827,9 @@ def test_system_info_with_dedicated_deployment_id(): def test_system_info_with_no_dedicated_deployment_id(): # given - system_info = UsageCollector.system_info(ip_address="w.x.y.z", hostname="hostname01") + system_info = UsageCollector.system_info( + ip_address="w.x.y.z", hostname="hostname01" + ) # then expected_system_info = { diff --git a/tests/workflows/unit_tests/core_steps/analytics/test_velocity.py b/tests/workflows/unit_tests/core_steps/analytics/test_velocity.py new file mode 100644 index 000000000..b2f5a6fb2 --- /dev/null +++ b/tests/workflows/unit_tests/core_steps/analytics/test_velocity.py @@ -0,0 +1,729 @@ +import datetime + +import numpy as np +import pytest +import supervision as sv + +from inference.core.workflows.core_steps.analytics.velocity.v1 import VelocityBlockV1 +from inference.core.workflows.execution_engine.entities.base import VideoMetadata + + +def test_velocity_block_basic_calculation() -> None: + # given + velocity_block = VelocityBlockV1() + + # Initial frame detections + frame1_detections = sv.Detections( + xyxy=np.array( + [ + [100, 100, 110, 110], # Object 1 + [200, 200, 210, 210], # Object 2 + ] + ), + tracker_id=np.array([1, 2]), + ) + + metadata1 = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # Run on first frame + frame1_result = velocity_block.run( + detections=frame1_detections, + metadata=metadata1, + smoothing_alpha=0.5, + pixels_per_meter=1000, # 1000 pixels = 1 meter + ) + + # Since this is the first frame, velocities should be zero + expected_data_frame1 = { + "velocity": { + 1: [0.0, 0.0], + 2: [0.0, 0.0], + }, + "speed": { + 1: 0.0, + 2: 0.0, + }, + "smoothed_velocity": { + 1: [0.0, 0.0], + 2: [0.0, 0.0], + }, + "smoothed_speed": { + 1: 0.0, + 2: 0.0, + }, + } + assert frame1_result == {"velocity_detections": frame1_detections} + assert ( + frame1_result["velocity_detections"].data["velocity"] + == expected_data_frame1["velocity"] + ) + assert ( + frame1_result["velocity_detections"].data["speed"] + == expected_data_frame1["speed"] + ) + assert ( + frame1_result["velocity_detections"].data["smoothed_velocity"] + == expected_data_frame1["smoothed_velocity"] + ) + assert ( + frame1_result["velocity_detections"].data["smoothed_speed"] + == expected_data_frame1["smoothed_speed"] + ) + + # Second frame detections with movement + frame2_detections = sv.Detections( + xyxy=np.array( + [ + [105, 100, 115, 110], # Object 1 moved +5 px right + [200, 205, 210, 215], # Object 2 moved +5 px down + ] + ), + tracker_id=np.array([1, 2]), + ) + + metadata2 = VideoMetadata( + video_identifier="vid_1", + frame_number=2, + frame_timestamp=datetime.datetime.fromtimestamp(1726570801).astimezone( + tz=datetime.timezone.utc + ), # 1 second later + ) + + # Run on second frame + frame2_result = velocity_block.run( + detections=frame2_detections, + metadata=metadata2, + smoothing_alpha=0.5, + pixels_per_meter=1000, # 1000 pixels = 1 meter + ) + + # Expected velocities: + # Object 1: [5 px/s, 0 px/s] => [0.005 m/s, 0.0 m/s] + # Object 2: [0 px/s, 5 px/s] => [0.0 m/s, 0.005 m/s] + + # Expected smoothed velocities: + # Object 1: 0.5 * [0.005, 0.0] + 0.5 * [0.0, 0.0] = [0.0025, 0.0] + # Object 2: 0.5 * [0.0, 0.005] + 0.5 * [0.0, 0.0] = [0.0, 0.0025] + + expected_data_frame2 = { + "velocity": { + 1: [0.005, 0.0], + 2: [0.0, 0.005], + }, + "speed": { + 1: 0.005, + 2: 0.005, + }, + "smoothed_velocity": { + 1: [0.0025, 0.0], + 2: [0.0, 0.0025], + }, + "smoothed_speed": { + 1: 0.0025, + 2: 0.0025, + }, + } + assert frame2_result == {"velocity_detections": frame2_detections} + assert ( + frame2_result["velocity_detections"].data["velocity"] + == expected_data_frame2["velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["speed"] + == expected_data_frame2["speed"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_velocity"] + == expected_data_frame2["smoothed_velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_speed"] + == expected_data_frame2["smoothed_speed"] + ) + + +def test_velocity_block_new_tracker_id() -> None: + # given + velocity_block = VelocityBlockV1() + + # Frame 1 detections + frame1_detections = sv.Detections( + xyxy=np.array( + [ + [100, 100, 110, 110], # Object 1 + ] + ), + tracker_id=np.array([1]), + ) + + metadata1 = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # Run on first frame + velocity_block.run( + detections=frame1_detections, + metadata=metadata1, + smoothing_alpha=0.5, + pixels_per_meter=1000, # 1000 pixels = 1 meter + ) + + # Second frame detections with a new tracker_id + frame2_detections = sv.Detections( + xyxy=np.array( + [ + [105, 100, 115, 110], # Object 1 moved +5 px right + [200, 200, 210, 210], # New Object 2 + ] + ), + tracker_id=np.array([1, 2]), + ) + + metadata2 = VideoMetadata( + video_identifier="vid_1", + frame_number=2, + frame_timestamp=datetime.datetime.fromtimestamp(1726570801).astimezone( + tz=datetime.timezone.utc + ), + ) + + # Run on second frame + frame2_result = velocity_block.run( + detections=frame2_detections, + metadata=metadata2, + smoothing_alpha=0.5, + pixels_per_meter=1000, + ) + + # Expected velocities: + # Object 1: [5 px/s, 0 px/s] => [0.005 m/s, 0.0 m/s] + # Object 2: [0 px/s, 0 px/s] => [0.0 m/s, 0.0 m/s] (first appearance) + + # Expected smoothed velocities: + # Object 1: 0.5 * [0.005, 0.0] + 0.5 * [0.0, 0.0] = [0.0025, 0.0] + # Object 2: [0.0, 0.0] (first appearance) + + expected_data_frame2 = { + "velocity": { + 1: [0.005, 0.0], + 2: [0.0, 0.0], + }, + "speed": { + 1: 0.005, + 2: 0.0, + }, + "smoothed_velocity": { + 1: [0.0025, 0.0], + 2: [0.0, 0.0], + }, + "smoothed_speed": { + 1: 0.0025, + 2: 0.0, + }, + } + assert ( + frame2_result["velocity_detections"].data["velocity"] + == expected_data_frame2["velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["speed"] + == expected_data_frame2["speed"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_velocity"] + == expected_data_frame2["smoothed_velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_speed"] + == expected_data_frame2["smoothed_speed"] + ) + + +def test_velocity_block_missing_tracker_id() -> None: + # given + velocity_block = VelocityBlockV1() + + # Detections without tracker_id + detections = sv.Detections( + xyxy=np.array([[100, 100, 110, 110]]), + # tracker_id is missing + ) + + metadata = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # when / then + with pytest.raises( + ValueError, + match="tracker_id not initialized, VelocityBlock requires detections to be tracked", + ): + velocity_block.run( + detections=detections, + metadata=metadata, + smoothing_alpha=0.5, + pixels_per_meter=1000, + ) + + +def test_velocity_block_invalid_smoothing_alpha() -> None: + # given + velocity_block = VelocityBlockV1() + + detections = sv.Detections( + xyxy=np.array([[100, 100, 110, 110]]), + tracker_id=np.array([1]), + ) + + metadata = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # when / then: smoothing_alpha <= 0 + with pytest.raises( + ValueError, + match="smoothing_alpha must be between 0 \\(exclusive\\) and 1 \\(inclusive\\)", + ): + velocity_block.run( + detections=detections, + metadata=metadata, + smoothing_alpha=0.0, + pixels_per_meter=1000, + ) + + # when / then: smoothing_alpha > 1 + with pytest.raises( + ValueError, + match="smoothing_alpha must be between 0 \\(exclusive\\) and 1 \\(inclusive\\)", + ): + velocity_block.run( + detections=detections, + metadata=metadata, + smoothing_alpha=1.5, + pixels_per_meter=1000, + ) + + +def test_velocity_block_invalid_pixels_per_meter() -> None: + # given + velocity_block = VelocityBlockV1() + + detections = sv.Detections( + xyxy=np.array([[100, 100, 110, 110]]), + tracker_id=np.array([1]), + ) + + metadata = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # when / then: pixels_per_meter <= 0 + with pytest.raises( + ValueError, + match="pixels_per_meter must be greater than 0", + ): + velocity_block.run( + detections=detections, + metadata=metadata, + smoothing_alpha=0.5, + pixels_per_meter=0.0, + ) + + with pytest.raises( + ValueError, + match="pixels_per_meter must be greater than 0", + ): + velocity_block.run( + detections=detections, + metadata=metadata, + smoothing_alpha=0.5, + pixels_per_meter=-1000, + ) + + +def test_velocity_block_zero_delta_time() -> None: + # given + velocity_block = VelocityBlockV1() + + # Frame 1 detections + frame1_detections = sv.Detections( + xyxy=np.array([[100, 100, 110, 110]]), + tracker_id=np.array([1]), + ) + + metadata1 = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # Run on first frame + velocity_block.run( + detections=frame1_detections, + metadata=metadata1, + smoothing_alpha=0.5, + pixels_per_meter=1000, + ) + + # Frame 2 with same timestamp (delta_time = 0) + frame2_detections = sv.Detections( + xyxy=np.array([[105, 100, 115, 110]]), # Moved +5 px right + tracker_id=np.array([1]), + ) + + metadata2 = VideoMetadata( + video_identifier="vid_1", + frame_number=2, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), # Same timestamp + ) + + frame2_result = velocity_block.run( + detections=frame2_detections, + metadata=metadata2, + smoothing_alpha=0.5, + pixels_per_meter=1000, + ) + + # Expected velocities: [0.0, 0.0] due to delta_time = 0 + # Smoothed velocities: remains [0.0, 0.0] + + expected_data_frame2 = { + "velocity": { + 1: [0.0, 0.0], + }, + "speed": { + 1: 0.0, + }, + "smoothed_velocity": { + 1: [0.0, 0.0], + }, + "smoothed_speed": { + 1: 0.0, + }, + } + assert ( + frame2_result["velocity_detections"].data["velocity"] + == expected_data_frame2["velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["speed"] + == expected_data_frame2["speed"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_velocity"] + == expected_data_frame2["smoothed_velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_speed"] + == expected_data_frame2["smoothed_speed"] + ) + + +def test_velocity_block_multiple_objects_with_movement() -> None: + # given + velocity_block = VelocityBlockV1() + + # Frame 1 detections + frame1_detections = sv.Detections( + xyxy=np.array( + [ + [100, 100, 110, 110], # Object 1 + [200, 200, 210, 210], # Object 2 + [300, 300, 310, 310], # Object 3 + ] + ), + tracker_id=np.array([1, 2, 3]), + ) + + metadata1 = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # Run on first frame + velocity_block.run( + detections=frame1_detections, + metadata=metadata1, + smoothing_alpha=0.3, + pixels_per_meter=1000, # 1000 pixels = 1 meter + ) + + # Frame 2 detections with movements + frame2_detections = sv.Detections( + xyxy=np.array( + [ + [105, 100, 115, 110], # Object 1 moved +5 px right + [200, 205, 210, 215], # Object 2 moved +5 px down + [295, 295, 305, 305], # Object 3 moved -5 px left and up + ] + ), + tracker_id=np.array([1, 2, 3]), + ) + + metadata2 = VideoMetadata( + video_identifier="vid_1", + frame_number=2, + frame_timestamp=datetime.datetime.fromtimestamp(1726570801).astimezone( + tz=datetime.timezone.utc + ), # 1 second later + ) + + frame2_result = velocity_block.run( + detections=frame2_detections, + metadata=metadata2, + smoothing_alpha=0.3, + pixels_per_meter=1000, + ) + + # Expected velocities: + # Object 1: [5 px/s, 0 px/s] => [0.005 m/s, 0.0 m/s] + # Object 2: [0 px/s, 5 px/s] => [0.0 m/s, 0.005 m/s] + # Object 3: [-5 px/s, -5 px/s] => [-0.005 m/s, -0.005 m/s] + + # Expected smoothed velocities: + # Object 1: 0.3 * [0.005, 0.0] + 0.7 * [0.0, 0.0] = [0.0015, 0.0] + # Object 2: 0.3 * [0.0, 0.005] + 0.7 * [0.0, 0.0] = [0.0, 0.0015] + # Object 3: 0.3 * [-0.005, -0.005] + 0.7 * [0.0, 0.0] = [-0.0015, -0.0015] + + expected_data_frame2 = { + "velocity": { + 1: [0.005, 0.0], + 2: [0.0, 0.005], + 3: [-0.005, -0.005], + }, + "speed": { + 1: 0.005, + 2: 0.005, + 3: 0.0070710678118654755, # sqrt(0.005^2 + 0.005^2) + }, + "smoothed_velocity": { + 1: [0.0015, 0.0], + 2: [0.0, 0.0015], + 3: [-0.0015, -0.0015], + }, + "smoothed_speed": { + 1: 0.0015, + 2: 0.0015, + 3: 0.002121320343559643, # sqrt(0.0015^2 + 0.0015^2) + }, + } + assert ( + frame2_result["velocity_detections"].data["velocity"] + == expected_data_frame2["velocity"] + ) + assert frame2_result["velocity_detections"].data["speed"] == pytest.approx( + expected_data_frame2["speed"], rel=1e-5 + ) + assert frame2_result["velocity_detections"].data[ + "smoothed_velocity" + ] == pytest.approx(expected_data_frame2["smoothed_velocity"], rel=1e-5) + assert frame2_result["velocity_detections"].data["smoothed_speed"] == pytest.approx( + expected_data_frame2["smoothed_speed"], rel=1e-5 + ) + + +def test_velocity_block_inconsistent_tracker_ids() -> None: + # given + velocity_block = VelocityBlockV1() + + # Frame 1 detections + frame1_detections = sv.Detections( + xyxy=np.array( + [ + [100, 100, 110, 110], # Object 1 + [200, 200, 210, 210], # Object 2 + ] + ), + tracker_id=np.array([1, 2]), + ) + + metadata1 = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + velocity_block.run( + detections=frame1_detections, + metadata=metadata1, + smoothing_alpha=0.5, + pixels_per_meter=1000, + ) + + # Frame 2 detections with missing tracker_id=2 and new tracker_id=3 + frame2_detections = sv.Detections( + xyxy=np.array( + [ + [105, 100, 115, 110], # Object 1 moved +5 px right + [300, 300, 310, 310], # New Object 3 + ] + ), + tracker_id=np.array([1, 3]), + ) + + metadata2 = VideoMetadata( + video_identifier="vid_1", + frame_number=2, + frame_timestamp=datetime.datetime.fromtimestamp(1726570801).astimezone( + tz=datetime.timezone.utc + ), + ) + + frame2_result = velocity_block.run( + detections=frame2_detections, + metadata=metadata2, + smoothing_alpha=0.5, + pixels_per_meter=1000, + ) + + # Expected velocities: + # Object 1: [5 px/s, 0 px/s] => [0.005 m/s, 0.0 m/s] + # Object 3: [0 px/s, 0 px/s] => [0.0 m/s, 0.0 m/s] (first appearance) + + expected_data_frame2 = { + "velocity": { + 1: [0.005, 0.0], + 3: [0.0, 0.0], + }, + "speed": { + 1: 0.005, + 3: 0.0, + }, + "smoothed_velocity": { + 1: [0.0025, 0.0], + 3: [0.0, 0.0], + }, + "smoothed_speed": { + 1: 0.0025, + 3: 0.0, + }, + } + assert ( + frame2_result["velocity_detections"].data["velocity"] + == expected_data_frame2["velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["speed"] + == expected_data_frame2["speed"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_velocity"] + == expected_data_frame2["smoothed_velocity"] + ) + assert ( + frame2_result["velocity_detections"].data["smoothed_speed"] + == expected_data_frame2["smoothed_speed"] + ) + + +def test_velocity_block_large_movement() -> None: + # given + velocity_block = VelocityBlockV1() + + # Frame 1 detections + frame1_detections = sv.Detections( + xyxy=np.array([[100, 100, 110, 110]]), + tracker_id=np.array([1]), + ) + + metadata1 = VideoMetadata( + video_identifier="vid_1", + frame_number=1, + frame_timestamp=datetime.datetime.fromtimestamp(1726570800).astimezone( + tz=datetime.timezone.utc + ), + ) + + # Run on first frame + velocity_block.run( + detections=frame1_detections, + metadata=metadata1, + smoothing_alpha=0.5, + pixels_per_meter=1000, # 1000 pixels = 1 meter + ) + + # Frame 2 detections with large movement + frame2_detections = sv.Detections( + xyxy=np.array([[2000, 2000, 2010, 2010]]), # Moved +1900 px right and down + tracker_id=np.array([1]), + ) + + metadata2 = VideoMetadata( + video_identifier="vid_1", + frame_number=2, + frame_timestamp=datetime.datetime.fromtimestamp(1726570801).astimezone( + tz=datetime.timezone.utc + ), + ) + + frame2_result = velocity_block.run( + detections=frame2_detections, + metadata=metadata2, + smoothing_alpha=0.5, + pixels_per_meter=1000, + ) + + # Expected velocities: + # [1900 px/s, 1900 px/s] => [1.9 m/s, 1.9 m/s] + + # Expected smoothed velocities: + # 0.5 * [1.9, 1.9] + 0.5 * [0.0, 0.0] = [0.95, 0.95] + + expected_data_frame2 = { + "velocity": { + 1: [1.9, 1.9], + }, + "speed": { + 1: 2.68675135, # sqrt(1.9^2 + 1.9^2) + }, + "smoothed_velocity": { + 1: [0.95, 0.95], + }, + "smoothed_speed": { + 1: 1.343375675, # sqrt(0.95^2 + 0.95^2) + }, + } + assert ( + frame2_result["velocity_detections"].data["velocity"] + == expected_data_frame2["velocity"] + ) + assert frame2_result["velocity_detections"].data["speed"] == pytest.approx( + expected_data_frame2["speed"], rel=1e-4 + ) + assert frame2_result["velocity_detections"].data[ + "smoothed_velocity" + ] == pytest.approx(expected_data_frame2["smoothed_velocity"], rel=1e-4) + assert frame2_result["velocity_detections"].data["smoothed_speed"] == pytest.approx( + expected_data_frame2["smoothed_speed"], rel=1e-4 + )