Skip to content

Commit

Permalink
attempt at deterministic planning
Browse files Browse the repository at this point in the history
  • Loading branch information
Victorlouisdg committed Mar 19, 2024
1 parent 741c34b commit a9db589
Show file tree
Hide file tree
Showing 5 changed files with 371 additions and 15 deletions.
41 changes: 34 additions & 7 deletions airo_planner/ompl/single_arm_ompl_planner.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import sys
import time
from typing import Callable, List

import numpy as np
Expand All @@ -13,6 +15,7 @@

from airo_planner import JointBoundsType, SingleArmPlanner, path_from_ompl, state_to_ompl
from airo_planner.ompl.utilities import create_simple_setup
from airo_planner.selection.path_selection import choose_shortest_path

# TODO move this to airo_typing
JointConfigurationsModifierType = Callable[[List[JointConfigurationType]], List[JointConfigurationType]]
Expand All @@ -35,11 +38,10 @@ def __init__(
self,
is_state_valid_fn: JointConfigurationCheckerType,
joint_bounds: JointBoundsType | None = None,
max_planning_time: float = 5.0,
inverse_kinematics_fn: InverseKinematicsFunctionType | None = None,
filter_goal_configurations_fn: JointConfigurationsModifierType | None = None,
rank_goal_configurations_fn: JointConfigurationsModifierType | None = None,
choose_path_fn: JointPathChooserType | None = None,
choose_path_fn: JointPathChooserType | None = choose_shortest_path,
degrees_of_freedom: int = 6,
):
"""Instiatiate a single-arm motion planner that uses OMPL. This creates
Expand All @@ -53,7 +55,6 @@ def __init__(
Args:
is_state_valid_fn: A function that checks if a given joint configuration is valid.
inverse_kinematics_fn: A function that computes the inverse kinematics of a given TCP pose.
max_planning_time: The maximum time allowed for planning.
"""
self.is_state_valid_fn = is_state_valid_fn

Expand All @@ -64,7 +65,6 @@ def __init__(
self.choose_path_fn = choose_path_fn

# Planning parameters
self.max_planning_time = max_planning_time
self._degrees_of_freedom = degrees_of_freedom
if joint_bounds is None:
self.joint_bounds = np.full(degrees_of_freedom, -2 * np.pi), np.full(degrees_of_freedom, 2 * np.pi)
Expand Down Expand Up @@ -95,7 +95,7 @@ def plan_to_joint_configuration(
simple_setup = self._simple_setup

# Run planning algorithm
simple_setup.solve(self.max_planning_time)
simple_setup.solve()

if not simple_setup.haveExactSolutionPath():
return None
Expand Down Expand Up @@ -126,6 +126,7 @@ def plan_to_tcp_pose( # noqa: C901
return None

ik_solutions = self.inverse_kinematics_fn(tcp_pose_in_base)
ik_solutions = [solution.squeeze() for solution in ik_solutions]
self._ik_solutions = ik_solutions # Save for debugging

if ik_solutions is None or len(ik_solutions) == 0:
Expand All @@ -152,21 +153,47 @@ def plan_to_tcp_pose( # noqa: C901
else:
logger.info(f"Found {len(ik_solutions_valid)}/{len(ik_solutions_within_bounds)} valid solutions.")

# TODO filter the IK solutions -> warn when nothing is left

# TODO rank the IK solutions
terminate_on_first_success = self.rank_goal_configurations_fn is not None

logger.info(f"Running OMPL towards {len(ik_solutions_valid)} IK solutions.")
start_time = time.time()
# Try solving to each IK solution in joint space.
paths = []
for ik_solution in ik_solutions_valid:
for i, ik_solution in enumerate(ik_solutions_valid):
path = self.plan_to_joint_configuration(start_configuration, ik_solution)

# My attempt to get the OMPL Info: messages to show up in the correct order between the loguru logs
# This however does not seems to work, so I don't know where those prints are buffered
sys.stdout.flush()

if path is not None:
logger.info(f"Successfully found path to the valid IK solution with index {i}.")
if terminate_on_first_success is True:
end_time = time.time()
logger.info(f"Terminating on first success (planning time: {end_time - start_time:.2f} s).")
return path
paths.append(path)

end_time = time.time()
logger.info(f"Found {len(paths)} paths towards IK solutions, (planning time: {end_time - start_time:.2f} s).")

self._all_paths = paths # Save for debugging

if len(paths) == 0:
logger.info("No paths founds towards any IK solutions, returning None.")
return None

solution_path = paths[0]
solution_path = self.choose_path_fn(paths)
return solution_path

# TODO: exhaustive mode vs iterative mode (~= greedy mode)
# depending on the helper functions provide to the planner, it will operate in different modes
# by default, it operates in an exhaustive mode, meaning that it will treat all IK solutions the same, and plan a path to them all
# then it will return the shortest path

# path_distances = [np.linalg.norm(path[-1] - start_configuration) for path in paths]

# path_desirablities = None
Expand Down
10 changes: 10 additions & 0 deletions airo_planner/ompl/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ def bounds_to_ompl(joint_bounds: JointBoundsType) -> ob.RealVectorBounds:
return bounds


def allocDeterministicStateSampler(space: ob.StateSpace) -> ob.StateSampler:
from loguru import logger

logger.info("Allocating RealVectorDeterministicStateSampler")
return ob.RealVectorDeterministicStateSampler(space)


def create_simple_setup(
is_state_valid_fn: JointConfigurationCheckerType, joint_bounds: JointBoundsType
) -> og.SimpleSetup:
Expand All @@ -65,6 +72,9 @@ def create_simple_setup(

is_state_valid_ompl = function_to_ompl(is_state_valid_fn, degrees_of_freedom)

space.setStateSamplerAllocator(ob.StateSamplerAllocator(allocDeterministicStateSampler))
# space.allocStateSampler()

# Configure the SimpleSetup object
simple_setup = og.SimpleSetup(space)
simple_setup.setStateValidityChecker(ob.StateValidityCheckerFn(is_state_valid_ompl))
Expand Down
28 changes: 28 additions & 0 deletions airo_planner/selection/goal_selection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import numpy as np
from airo_typing import JointConfigurationType


def rank_by_distance_to_desirable_configurations(
configurations: list[JointConfigurationType], desirable_configurations: list[JointConfigurationType]
) -> list[JointConfigurationType]:
"""Ranks joint configurations based on their distance to a set of desirable configurations.
Args:
configurations: The list of joint configurations to be ranked.
desirable_configurations: A list of desirable joint configurations.
Returns:
A list of joint configurations, sorted in ascending order of their
distance to the closest desirable configuration.
"""

distances = []
for config in configurations:
distances_to_desirable = [
np.linalg.norm(config - desirable_config) for desirable_config in desirable_configurations
]
min_distance = min(distances_to_desirable)
distances.append(min_distance)

ranked_configurations = [x for _, x in sorted(zip(distances, configurations))]
return ranked_configurations
17 changes: 17 additions & 0 deletions airo_planner/selection/path_selection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import numpy as np
from airo_drake import calculate_joint_path_length
from airo_typing import JointPathType


def choose_shortest_path(paths: list[JointPathType]) -> JointPathType:
"""Selects the shortest path from a list of possible paths.
Args:
paths: A list of potential joint paths.
Returns:
The path from the input list that has the shortest length.
"""
path_lengths = [calculate_joint_path_length(p) for p in paths]
shortest_idx = np.argmin(path_lengths)
return paths[shortest_idx]
Loading

0 comments on commit a9db589

Please sign in to comment.