Skip to content

Commit

Permalink
Give ISAR responsibility to create ids
Browse files Browse the repository at this point in the history
for mission, task and inspection.
Therefore the ids are removed from the StartMissionDefinitions
and instead generated when the Mission and Tasks are created

Some refactors are also applied, most notably replacing dataclass
with pydantic BaseModels
  • Loading branch information
tsundvoll committed Dec 11, 2024
1 parent 593e4c7 commit c8ff71c
Show file tree
Hide file tree
Showing 36 changed files with 371 additions and 554 deletions.
1 change: 1 addition & 0 deletions src/isar/apis/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
class TaskResponse(BaseModel):
id: str
tag_id: Optional[str] = None
inspection_id: Optional[str] = None
type: TaskTypes


Expand Down
196 changes: 73 additions & 123 deletions src/isar/apis/models/start_mission_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from enum import Enum
from typing import List, Optional

from alitra import Frame, Orientation, Pose, Position
from pydantic import BaseModel, Field

from isar.apis.models.models import InputPose, InputPosition
Expand Down Expand Up @@ -44,188 +43,139 @@ class StartMissionInspectionDefinition(BaseModel):
analysis_type: Optional[str] = None
duration: Optional[float] = None
metadata: Optional[dict] = None
id: Optional[str] = None


class StartMissionTaskDefinition(BaseModel):
type: TaskType = Field(default=TaskType.Inspection)
pose: InputPose
inspection: Optional[StartMissionInspectionDefinition] = None
tag: Optional[str] = None
id: Optional[str] = None
zoom: Optional[ZoomDescription] = None


class StartMissionDefinition(BaseModel):
tasks: List[StartMissionTaskDefinition]
id: Optional[str] = None
name: Optional[str] = None
start_pose: Optional[InputPose] = None
dock: Optional[bool] = None
undock: Optional[bool] = None
dock: Optional[bool] = Field(default=False)
undock: Optional[bool] = Field(default=False)


def to_isar_mission(start_mission_definition: StartMissionDefinition) -> Mission:
def to_isar_mission(
start_mission_definition: StartMissionDefinition,
return_pose: Optional[InputPose] = None,
) -> Mission:
isar_tasks: List[TASKS] = []

for start_mission_task_definition in start_mission_definition.tasks:
task: TASKS = create_isar_task(start_mission_task_definition)
if start_mission_task_definition.id:
task.id = start_mission_task_definition.id
for task_definition in start_mission_definition.tasks:
task: TASKS = to_isar_task(task_definition)
isar_tasks.append(task)

if return_pose:
isar_tasks.append(ReturnToHome(pose=return_pose.to_alitra_pose()))

if not isar_tasks:
raise MissionPlannerError("Mission does not contain any valid tasks")

check_for_duplicate_ids(isar_tasks)

isar_mission: Mission = Mission(tasks=isar_tasks)

isar_mission.dock = start_mission_definition.dock
isar_mission.undock = start_mission_definition.undock

isar_mission_name: str
if start_mission_definition.name:
isar_mission.name = start_mission_definition.name
isar_mission_name = start_mission_definition.name
else:
isar_mission.name = _build_mission_name()

if start_mission_definition.id:
isar_mission.id = start_mission_definition.id
isar_mission_name = _build_mission_name()

start_pose = None
if start_mission_definition.start_pose:
input_pose: InputPose = start_mission_definition.start_pose
input_frame: Frame = Frame(name=input_pose.frame_name)
input_position: Position = Position(
input_pose.position.x,
input_pose.position.y,
input_pose.position.z,
input_frame,
)
input_orientation: Orientation = Orientation(
input_pose.orientation.x,
input_pose.orientation.y,
input_pose.orientation.z,
input_pose.orientation.w,
input_frame,
)
isar_mission.start_pose = Pose(
position=input_position, orientation=input_orientation, frame=input_frame
)

return isar_mission


def check_for_duplicate_ids(items: List[TASKS]):
duplicate_ids = get_duplicate_ids(items=items)
if len(duplicate_ids) > 0:
raise MissionPlannerError(
f"Failed to create as there were duplicate IDs which is not allowed "
f"({duplicate_ids})"
)

start_pose = start_mission_definition.start_pose.to_alitra_pose()

return Mission(
tasks=isar_tasks,
name=isar_mission_name,
start_pose=start_pose,
dock=start_mission_definition.dock,
undock=start_mission_definition.undock,
)

def create_isar_task(start_mission_task_definition) -> TASKS:

if start_mission_task_definition.type == TaskType.Inspection:
return create_inspection_task(start_mission_task_definition)
elif start_mission_task_definition.type == TaskType.Localization:
return create_localization_task(start_mission_task_definition)
elif start_mission_task_definition.type == TaskType.ReturnToHome:
return create_return_to_home_task(start_mission_task_definition)
elif start_mission_task_definition.type == TaskType.Dock:
def to_isar_task(task_definition: StartMissionTaskDefinition) -> TASKS:
if task_definition.type == TaskType.Inspection:
return to_inspection_task(task_definition)
elif task_definition.type == TaskType.Localization:
return to_localization_task(task_definition)
elif task_definition.type == TaskType.ReturnToHome:
return create_return_to_home_task(task_definition)
elif task_definition.type == TaskType.Dock:
return create_dock_task()
else:
raise MissionPlannerError(
f"Failed to create task: '{start_mission_task_definition.type}' is not a valid"
f"Failed to create task: '{task_definition.type}' is not a valid"
)


def create_inspection_task(
start_mission_task_definition: StartMissionTaskDefinition,
) -> TASKS:
def to_inspection_task(task_definition: StartMissionTaskDefinition) -> TASKS:
inspection_definition = task_definition.inspection

if start_mission_task_definition.inspection.type == InspectionTypes.image:
if inspection_definition.type == InspectionTypes.image:
return TakeImage(
target=start_mission_task_definition.inspection.inspection_target.to_alitra_position(),
tag_id=start_mission_task_definition.tag,
robot_pose=start_mission_task_definition.pose.to_alitra_pose(),
metadata=start_mission_task_definition.inspection.metadata,
zoom=start_mission_task_definition.zoom,
robot_pose=task_definition.pose.to_alitra_pose(),
tag_id=task_definition.tag,
target=task_definition.inspection.inspection_target.to_alitra_position(),
metadata=task_definition.inspection.metadata,
zoom=task_definition.zoom,
)
elif start_mission_task_definition.inspection.type == InspectionTypes.video:
elif inspection_definition.type == InspectionTypes.video:
return TakeVideo(
target=start_mission_task_definition.inspection.inspection_target.to_alitra_position(),
duration=start_mission_task_definition.inspection.duration,
tag_id=start_mission_task_definition.tag,
robot_pose=start_mission_task_definition.pose.to_alitra_pose(),
metadata=start_mission_task_definition.inspection.metadata,
zoom=start_mission_task_definition.zoom,
robot_pose=task_definition.pose.to_alitra_pose(),
tag_id=task_definition.tag,
target=task_definition.inspection.inspection_target.to_alitra_position(),
duration=inspection_definition.duration,
metadata=task_definition.inspection.metadata,
zoom=task_definition.zoom,
)

elif start_mission_task_definition.inspection.type == InspectionTypes.thermal_image:
elif inspection_definition.type == InspectionTypes.thermal_image:
return TakeThermalImage(
target=start_mission_task_definition.inspection.inspection_target.to_alitra_position(),
tag_id=start_mission_task_definition.tag,
robot_pose=start_mission_task_definition.pose.to_alitra_pose(),
metadata=start_mission_task_definition.inspection.metadata,
zoom=start_mission_task_definition.zoom,
robot_pose=task_definition.pose.to_alitra_pose(),
tag_id=task_definition.tag,
target=task_definition.inspection.inspection_target.to_alitra_position(),
metadata=task_definition.inspection.metadata,
zoom=task_definition.zoom,
)

elif start_mission_task_definition.inspection.type == InspectionTypes.thermal_video:
elif inspection_definition.type == InspectionTypes.thermal_video:
return TakeThermalVideo(
target=start_mission_task_definition.inspection.inspection_target.to_alitra_position(),
duration=start_mission_task_definition.inspection.duration,
tag_id=start_mission_task_definition.tag,
robot_pose=start_mission_task_definition.pose.to_alitra_pose(),
metadata=start_mission_task_definition.inspection.metadata,
zoom=start_mission_task_definition.zoom,
robot_pose=task_definition.pose.to_alitra_pose(),
tag_id=task_definition.tag,
target=task_definition.inspection.inspection_target.to_alitra_position(),
duration=inspection_definition.duration,
metadata=task_definition.inspection.metadata,
zoom=task_definition.zoom,
)

elif start_mission_task_definition.inspection.type == InspectionTypes.audio:
elif inspection_definition.type == InspectionTypes.audio:
return RecordAudio(
target=start_mission_task_definition.inspection.inspection_target.to_alitra_position(),
duration=start_mission_task_definition.inspection.duration,
tag_id=start_mission_task_definition.tag,
robot_pose=start_mission_task_definition.pose.to_alitra_pose(),
metadata=start_mission_task_definition.inspection.metadata,
zoom=start_mission_task_definition.zoom,
robot_pose=task_definition.pose.to_alitra_pose(),
tag_id=task_definition.tag,
target=task_definition.inspection.inspection_target.to_alitra_position(),
duration=inspection_definition.duration,
metadata=task_definition.inspection.metadata,
zoom=task_definition.zoom,
)
else:
raise ValueError(
f"Inspection type '{start_mission_task_definition.inspection.type}' not supported"
f"Inspection type '{inspection_definition.type}' not supported"
)


def create_localization_task(
start_mission_task_definition: StartMissionTaskDefinition,
) -> Localize:
return Localize(
localization_pose=start_mission_task_definition.pose.to_alitra_pose()
)
def to_localization_task(task_definition: StartMissionTaskDefinition) -> Localize:
return Localize(localization_pose=task_definition.pose.to_alitra_pose())


def create_return_to_home_task(
start_mission_task_definition: StartMissionTaskDefinition,
task_definition: StartMissionTaskDefinition,
) -> ReturnToHome:
return ReturnToHome(pose=start_mission_task_definition.pose.to_alitra_pose())
return ReturnToHome(pose=task_definition.pose.to_alitra_pose())


def create_dock_task() -> DockingProcedure:
return DockingProcedure(behavior="dock")


def get_duplicate_ids(items: List[TASKS]) -> List[str]:
unique_ids: List[str] = []
duplicate_ids: List[str] = []
for item in items:
id: str = item.id
if id not in unique_ids:
unique_ids.append(id)
else:
duplicate_ids.append(id)

return duplicate_ids


def _build_mission_name() -> str:
return f"{settings.PLANT_SHORT_NAME}{settings.ROBOT_NAME}{int(time.time())}"
36 changes: 27 additions & 9 deletions src/isar/apis/schedule/scheduling_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
from isar.services.utilities.scheduling_utilities import SchedulingUtilities
from isar.state_machine.states_enum import States
from robot_interface.models.mission.mission import Mission
from robot_interface.models.mission.task import TASKS, Localize, MoveArm, ReturnToHome
from robot_interface.models.mission.task import (
TASKS,
InspectionTask,
Localize,
MoveArm,
ReturnToHome,
)


class SchedulingController:
Expand Down Expand Up @@ -115,7 +121,9 @@ def start_mission(
self.scheduling_utilities.verify_state_machine_ready_to_receive_mission(state)

try:
mission: Mission = to_isar_mission(mission_definition)
mission: Mission = to_isar_mission(
start_mission_definition=mission_definition, return_pose=return_pose
)
except MissionPlannerError as e:
error_message = f"Bad Request - Cannot create ISAR mission: {e}"
self.logger.warning(error_message)
Expand All @@ -127,9 +135,6 @@ def start_mission(
self.scheduling_utilities.verify_robot_capable_of_mission(
mission=mission, robot_capabilities=robot_settings.CAPABILITIES
)
if return_pose:
pose: Pose = return_pose.to_alitra_pose()
mission.tasks.append(ReturnToHome(pose=pose))

initial_pose_alitra: Optional[Pose] = (
initial_pose.to_alitra_pose() if initial_pose else None
Expand Down Expand Up @@ -213,7 +218,9 @@ def drive_to(
self.scheduling_utilities.verify_state_machine_ready_to_receive_mission(state)

pose: Pose = target_pose.to_alitra_pose()
mission: Mission = Mission(tasks=[ReturnToHome(pose=pose)])
mission: Mission = Mission(
name="Drive to pose", tasks=[ReturnToHome(pose=pose)]
)

self.logger.info(
f"Starting drive to mission with ISAR Mission ID: '{mission.id}'"
Expand All @@ -237,7 +244,9 @@ def start_localization_mission(
self.scheduling_utilities.verify_state_machine_ready_to_receive_mission(state)

pose: Pose = localization_pose.to_alitra_pose()
mission: Mission = Mission(tasks=[Localize(localization_pose=pose)])
mission: Mission = Mission(
name="Localization mission", tasks=[Localize(localization_pose=pose)]
)

self.logger.info(
f"Starting localization mission with ISAR Mission ID: '{mission.id}'"
Expand Down Expand Up @@ -284,7 +293,9 @@ def start_move_arm_mission(

self.scheduling_utilities.verify_state_machine_ready_to_receive_mission(state)

mission: Mission = Mission(tasks=[MoveArm(arm_pose=arm_pose_literal)])
mission: Mission = Mission(
name="Move arm mission", tasks=[MoveArm(arm_pose=arm_pose_literal)]
)

self.logger.info(
f"Starting move arm mission with ISAR Mission ID: '{mission.id}'"
Expand All @@ -302,4 +313,11 @@ def _api_response(self, mission: Mission) -> StartMissionResponse:
)

def _task_api_response(self, task: TASKS) -> TaskResponse:
return TaskResponse(id=task.id, tag_id=task.tag_id, type=task.type)
if isinstance(task, InspectionTask):
inspection_id = task.inspection_id
else:
inspection_id = None

return TaskResponse(
id=task.id, tag_id=task.tag_id, inspection_id=inspection_id, type=task.type
)
Loading

0 comments on commit c8ff71c

Please sign in to comment.