From a9db5895f8460b331bf48d594b6b49a513813baf Mon Sep 17 00:00:00 2001 From: victorlouisdg Date: Tue, 19 Mar 2024 07:29:19 +0100 Subject: [PATCH] attempt at deterministic planning --- airo_planner/ompl/single_arm_ompl_planner.py | 41 ++- airo_planner/ompl/utilities.py | 10 + airo_planner/selection/goal_selection.py | 28 ++ airo_planner/selection/path_selection.py | 17 ++ notebooks/02_planning_to_tcp_poses.ipynb | 290 ++++++++++++++++++- 5 files changed, 371 insertions(+), 15 deletions(-) create mode 100644 airo_planner/selection/goal_selection.py create mode 100644 airo_planner/selection/path_selection.py diff --git a/airo_planner/ompl/single_arm_ompl_planner.py b/airo_planner/ompl/single_arm_ompl_planner.py index 7b77d66..82b1cc5 100644 --- a/airo_planner/ompl/single_arm_ompl_planner.py +++ b/airo_planner/ompl/single_arm_ompl_planner.py @@ -1,3 +1,5 @@ +import sys +import time from typing import Callable, List import numpy as np @@ -13,6 +15,7 @@ from airo_planner import JointBoundsType, SingleArmPlanner, path_from_ompl, state_to_ompl from airo_planner.ompl.utilities import create_simple_setup +from airo_planner.selection.path_selection import choose_shortest_path # TODO move this to airo_typing JointConfigurationsModifierType = Callable[[List[JointConfigurationType]], List[JointConfigurationType]] @@ -35,11 +38,10 @@ def __init__( self, is_state_valid_fn: JointConfigurationCheckerType, joint_bounds: JointBoundsType | None = None, - max_planning_time: float = 5.0, inverse_kinematics_fn: InverseKinematicsFunctionType | None = None, filter_goal_configurations_fn: JointConfigurationsModifierType | None = None, rank_goal_configurations_fn: JointConfigurationsModifierType | None = None, - choose_path_fn: JointPathChooserType | None = None, + choose_path_fn: JointPathChooserType | None = choose_shortest_path, degrees_of_freedom: int = 6, ): """Instiatiate a single-arm motion planner that uses OMPL. This creates @@ -53,7 +55,6 @@ def __init__( Args: is_state_valid_fn: A function that checks if a given joint configuration is valid. inverse_kinematics_fn: A function that computes the inverse kinematics of a given TCP pose. - max_planning_time: The maximum time allowed for planning. """ self.is_state_valid_fn = is_state_valid_fn @@ -64,7 +65,6 @@ def __init__( self.choose_path_fn = choose_path_fn # Planning parameters - self.max_planning_time = max_planning_time self._degrees_of_freedom = degrees_of_freedom if joint_bounds is None: self.joint_bounds = np.full(degrees_of_freedom, -2 * np.pi), np.full(degrees_of_freedom, 2 * np.pi) @@ -95,7 +95,7 @@ def plan_to_joint_configuration( simple_setup = self._simple_setup # Run planning algorithm - simple_setup.solve(self.max_planning_time) + simple_setup.solve() if not simple_setup.haveExactSolutionPath(): return None @@ -126,6 +126,7 @@ def plan_to_tcp_pose( # noqa: C901 return None ik_solutions = self.inverse_kinematics_fn(tcp_pose_in_base) + ik_solutions = [solution.squeeze() for solution in ik_solutions] self._ik_solutions = ik_solutions # Save for debugging if ik_solutions is None or len(ik_solutions) == 0: @@ -152,21 +153,47 @@ def plan_to_tcp_pose( # noqa: C901 else: logger.info(f"Found {len(ik_solutions_valid)}/{len(ik_solutions_within_bounds)} valid solutions.") + # TODO filter the IK solutions -> warn when nothing is left + + # TODO rank the IK solutions + terminate_on_first_success = self.rank_goal_configurations_fn is not None + + logger.info(f"Running OMPL towards {len(ik_solutions_valid)} IK solutions.") + start_time = time.time() # Try solving to each IK solution in joint space. paths = [] - for ik_solution in ik_solutions_valid: + for i, ik_solution in enumerate(ik_solutions_valid): path = self.plan_to_joint_configuration(start_configuration, ik_solution) + + # My attempt to get the OMPL Info: messages to show up in the correct order between the loguru logs + # This however does not seems to work, so I don't know where those prints are buffered + sys.stdout.flush() + if path is not None: + logger.info(f"Successfully found path to the valid IK solution with index {i}.") + if terminate_on_first_success is True: + end_time = time.time() + logger.info(f"Terminating on first success (planning time: {end_time - start_time:.2f} s).") + return path paths.append(path) + end_time = time.time() + logger.info(f"Found {len(paths)} paths towards IK solutions, (planning time: {end_time - start_time:.2f} s).") + self._all_paths = paths # Save for debugging if len(paths) == 0: logger.info("No paths founds towards any IK solutions, returning None.") return None - solution_path = paths[0] + solution_path = self.choose_path_fn(paths) return solution_path + + # TODO: exhaustive mode vs iterative mode (~= greedy mode) + # depending on the helper functions provide to the planner, it will operate in different modes + # by default, it operates in an exhaustive mode, meaning that it will treat all IK solutions the same, and plan a path to them all + # then it will return the shortest path + # path_distances = [np.linalg.norm(path[-1] - start_configuration) for path in paths] # path_desirablities = None diff --git a/airo_planner/ompl/utilities.py b/airo_planner/ompl/utilities.py index a64a5b8..44c37ff 100644 --- a/airo_planner/ompl/utilities.py +++ b/airo_planner/ompl/utilities.py @@ -46,6 +46,13 @@ def bounds_to_ompl(joint_bounds: JointBoundsType) -> ob.RealVectorBounds: return bounds +def allocDeterministicStateSampler(space: ob.StateSpace) -> ob.StateSampler: + from loguru import logger + + logger.info("Allocating RealVectorDeterministicStateSampler") + return ob.RealVectorDeterministicStateSampler(space) + + def create_simple_setup( is_state_valid_fn: JointConfigurationCheckerType, joint_bounds: JointBoundsType ) -> og.SimpleSetup: @@ -65,6 +72,9 @@ def create_simple_setup( is_state_valid_ompl = function_to_ompl(is_state_valid_fn, degrees_of_freedom) + space.setStateSamplerAllocator(ob.StateSamplerAllocator(allocDeterministicStateSampler)) + # space.allocStateSampler() + # Configure the SimpleSetup object simple_setup = og.SimpleSetup(space) simple_setup.setStateValidityChecker(ob.StateValidityCheckerFn(is_state_valid_ompl)) diff --git a/airo_planner/selection/goal_selection.py b/airo_planner/selection/goal_selection.py new file mode 100644 index 0000000..63ab45f --- /dev/null +++ b/airo_planner/selection/goal_selection.py @@ -0,0 +1,28 @@ +import numpy as np +from airo_typing import JointConfigurationType + + +def rank_by_distance_to_desirable_configurations( + configurations: list[JointConfigurationType], desirable_configurations: list[JointConfigurationType] +) -> list[JointConfigurationType]: + """Ranks joint configurations based on their distance to a set of desirable configurations. + + Args: + configurations: The list of joint configurations to be ranked. + desirable_configurations: A list of desirable joint configurations. + + Returns: + A list of joint configurations, sorted in ascending order of their + distance to the closest desirable configuration. + """ + + distances = [] + for config in configurations: + distances_to_desirable = [ + np.linalg.norm(config - desirable_config) for desirable_config in desirable_configurations + ] + min_distance = min(distances_to_desirable) + distances.append(min_distance) + + ranked_configurations = [x for _, x in sorted(zip(distances, configurations))] + return ranked_configurations diff --git a/airo_planner/selection/path_selection.py b/airo_planner/selection/path_selection.py new file mode 100644 index 0000000..d58166f --- /dev/null +++ b/airo_planner/selection/path_selection.py @@ -0,0 +1,17 @@ +import numpy as np +from airo_drake import calculate_joint_path_length +from airo_typing import JointPathType + + +def choose_shortest_path(paths: list[JointPathType]) -> JointPathType: + """Selects the shortest path from a list of possible paths. + + Args: + paths: A list of potential joint paths. + + Returns: + The path from the input list that has the shortest length. + """ + path_lengths = [calculate_joint_path_length(p) for p in paths] + shortest_idx = np.argmin(path_lengths) + return paths[shortest_idx] diff --git a/notebooks/02_planning_to_tcp_poses.ipynb b/notebooks/02_planning_to_tcp_poses.ipynb index b50fa61..3088a23 100644 --- a/notebooks/02_planning_to_tcp_poses.ipynb +++ b/notebooks/02_planning_to_tcp_poses.ipynb @@ -9,7 +9,7 @@ "In the previous notebook we saw how to plan paths between joint configurations.\n", "In this notebook we will explore how planning to TCP poses opens doors to many interesting possibilities, such as:\n", "- ๐Ÿ’ฅ **Filtering** e.g. pregrasp configurations to ensure moving the the grasp pose does not collide with the environment\n", - "- ๐Ÿฅ‡ **Selecting** optimal paths, e.g. the shortest, smoothest or fastest after time parametrization\n", + "- ๐Ÿฅ‡ **Selecting** optimal paths, e.g. the shortest, smoothest, fastest after time parametrization or most clearance\n", "- ๐Ÿ“Š **Ranking** goal joint configurations and planning to them iteratively, selecting the first path that is found. This ranking could be based on the distance of the goal joint configuration to the a desirable joint configuration, e.g the start joint configuration.\n", "\n", "**The 4-Step Process**\n", @@ -22,9 +22,6 @@ "\n", "\n", "\n", - "\n", - "\n", - "\n", "