Skip to content

Commit

Permalink
start for goal configuration filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
Victorlouisdg committed Mar 19, 2024
1 parent e798c8f commit 2e9f37f
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 14 deletions.
17 changes: 14 additions & 3 deletions airo_planner/ompl/single_arm_ompl_planner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def __init__(
inverse_kinematics_fn: InverseKinematicsFunctionType | None = None,
filter_goal_configurations_fn: JointConfigurationsModifierType | None = None,
rank_goal_configurations_fn: JointConfigurationsModifierType | None = None,
choose_path_fn: JointPathChooserType | None = choose_shortest_path,
choose_path_fn: JointPathChooserType = choose_shortest_path,
degrees_of_freedom: int = 6,
):
"""Instiatiate a single-arm motion planner that uses OMPL. This creates
Expand Down Expand Up @@ -154,15 +154,26 @@ def plan_to_tcp_pose( # noqa: C901
logger.info(f"Found {len(ik_solutions_valid)}/{len(ik_solutions_within_bounds)} valid solutions.")

# TODO filter the IK solutions -> warn when nothing is left
if self.filter_goal_configurations_fn is not None:
ik_solutions_filtered = self.filter_goal_configurations_fn(ik_solutions_valid)
if len(ik_solutions_filtered) == 0:
logger.info("All IK solutions were filtered out, returning None.")
return None
else:
logger.info(
f"Filtered IK solutions to {len(ik_solutions_filtered)}/{len(ik_solutions_valid)} solutions."
)
else:
ik_solutions_filtered = ik_solutions_valid

# TODO rank the IK solutions
terminate_on_first_success = self.rank_goal_configurations_fn is not None

logger.info(f"Running OMPL towards {len(ik_solutions_valid)} IK solutions.")
logger.info(f"Running OMPL towards {len(ik_solutions_filtered)} IK solutions.")
start_time = time.time()
# Try solving to each IK solution in joint space.
paths = []
for i, ik_solution in enumerate(ik_solutions_valid):
for i, ik_solution in enumerate(ik_solutions_filtered):
path = self.plan_to_joint_configuration(start_configuration, ik_solution)

# My attempt to get the OMPL Info: messages to show up in the correct order between the loguru logs
Expand Down
28 changes: 28 additions & 0 deletions airo_planner/selection/goal_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,31 @@ def rank_by_distance_to_desirable_configurations(

ranked_configurations = [x for _, x in sorted(zip(distances, configurations))]
return ranked_configurations


def filter_with_distance_to_configurations(
joint_configurations: list[JointConfigurationType],
joint_configurations_close: list[JointConfigurationType],
distance_threshold: float = 0.01,
) -> list[JointConfigurationType]:
"""Filters a list of joint configurations, keeping only those within a specified distance of a reference set.
Args:
joint_configurations: The list of joint configurations to be filtered.
joint_configurations_close: A list of reference joint configurations.
distance_threshold: The maximum allowable distance between a configuration
and a reference configuration for it to be kept.
Returns:
A list of filtered joint configurations that are within the distance threshold
of at least one configuration in the reference set.
"""
joint_configurations_to_keep = []

for joint_configuration in joint_configurations:
for joints_close in joint_configurations_close:
if np.linalg.norm(joint_configuration - joints_close) < distance_threshold:
joint_configurations_to_keep.append(joint_configuration)
break

return joint_configurations_to_keep
236 changes: 225 additions & 11 deletions notebooks/02_planning_to_tcp_poses.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
"\n",
"with np.printoptions(precision=3, suppress=True):\n",
" for solution in solutions:\n",
" print(solution)\n"
" print(solution)"
]
},
{
Expand Down Expand Up @@ -291,6 +291,15 @@
"print(f\"\\nLength of path that planner returned: {calculate_joint_path_length(path):.2f}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"animate_joint_configurations(scene.meshcat, scene.robot_diagram, scene.arm_index, path)"
]
},
{
"cell_type": "markdown",
"metadata": {},
Expand All @@ -311,26 +320,231 @@
"from airo_planner.interfaces import SingleArmPlanner\n",
"from pydrake.trajectories import Trajectory\n",
"\n",
"# use this a filtering example: we want to filter out the grasp paths that collide with the environment\n",
"\n",
"def plan_to_grasp(planner: SingleArmPlanner, grasp_pose: HomogeneousMatrixType) -> Trajectory:\n",
" pass\n",
" # attempt planning the the grasp pose directly\n",
" # if all goals in collision try backing-off to a pregrasp pose\n",
" # time parametrize both paths and concatenate them\n",
"\n"
"from airo_drake import visualize_frame\n",
"\n",
"\n",
"# grasp_location = np.array([0.2, 0.5, 0.5]) # This moves the robot through a singularlity for some start configurations\n",
"grasp_location = np.array([0.2, 0.3, 0.2])\n",
"\n",
"gripper_forward_direction = np.array([1, 0, 0])\n",
"\n",
"Z = gripper_forward_direction / np.linalg.norm(gripper_forward_direction)\n",
"Y = np.array([0, 0, -1]) # 0, 0, 1 is also an option\n",
"X = np.cross(Y, Z)\n",
"\n",
"grasp_orientation = np.column_stack([X, Y, Z])\n",
"grasp_pose = np.identity(4)\n",
"grasp_pose[0:3, 0:3] = grasp_orientation\n",
"grasp_pose[0:3, 3] = grasp_location\n",
"\n",
"pregrasp_pose = grasp_pose.copy()\n",
"pregrasp_pose[0:3, 3] -= 0.25 * gripper_forward_direction\n",
"\n",
"visualize_frame(scene.meshcat, \"pregrasp_pose\", pregrasp_pose)\n",
"visualize_frame(scene.meshcat, \"grasp_pose\", grasp_pose)"
]
},
{
"cell_type": "markdown",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Question: what will increase the joint distance the most? adding pi/4 to all joints or np.pi /2 to a single joint? "
"from ur_analytic_ik import ur3e\n",
"\n",
"tcp_transform = np.identity(4)\n",
"tcp_transform[2, 3] = 0.175 # 175 mm in z\n",
"\n",
"start_configurations = ur3e.inverse_kinematics_with_tcp(pregrasp_pose, tcp_transform)\n",
"goal_configurations = ur3e.inverse_kinematics_with_tcp(grasp_pose, tcp_transform)\n",
"\n",
"start_configurations = np.array(start_configurations).squeeze()\n",
"goal_configurations = np.array(goal_configurations).squeeze()\n",
"\n",
"len(start_configurations), len(goal_configurations)"
]
},
{
"cell_type": "markdown",
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def path_collisions_as_emojis(is_collision_free: list[bool]):\n",
" \"\"\"Displays an emoji-based visualization of a path's collisions.\n",
"\n",
" Example output: \"✅✅💥✅✅✅💥✅✅✅✅\"\n",
"\n",
" Args:\n",
" is_collision_free: A list of booleans, where True indicates no collision.\n",
"\n",
" Returns:\n",
" A string of emojis representing the collision status of the path.\n",
" \"\"\"\n",
" emojis = [\"\" if is_free else \"💥\" for is_free in is_collision_free]\n",
" emoji_str = \"\".join(emojis)\n",
" return emoji_str\n",
"\n",
"\n",
"print(path_collisions_as_emojis(collision_checker.CheckConfigsCollisionFree(start_configurations)))\n",
"print(path_collisions_as_emojis(collision_checker.CheckConfigsCollisionFree(goal_configurations)))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from pydrake.trajectories import PiecewisePose\n",
"from pydrake.math import RigidTransform\n",
"from airo_drake import discretize_drake_pose_trajectory\n",
"\n",
"\n",
"rigid_transforms = [RigidTransform(pose) for pose in [pregrasp_pose, grasp_pose]]\n",
"times = np.linspace(0, 1, len(rigid_transforms))\n",
"pose_trajectory = PiecewisePose.MakeLinear(times=times, poses=rigid_transforms)\n",
"\n",
"\n",
"tcp_path = discretize_drake_pose_trajectory(pose_trajectory).poses\n",
"\n",
"for i, tcp_pose in enumerate(tcp_path):\n",
" visualize_frame(scene.meshcat, f\"tcp_path/pose_{i}\", tcp_pose, length=0.05, opacity=0.1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from airo_drake import calculate_valid_joint_paths, calculate_joint_paths\n",
"\n",
"\n",
"def ur3e_inverse_kinematics(tcp_pose: HomogeneousMatrixType) -> list[JointConfigurationType]:\n",
" solutions_1x6 = ur3e.inverse_kinematics_with_tcp(tcp_pose, tcp_transform)\n",
" solutions = [solution.squeeze() for solution in solutions_1x6]\n",
" return solutions\n",
"\n",
"\n",
"joint_paths_ = calculate_joint_paths(tcp_path, ur3e_inverse_kinematics)\n",
"\n",
"for i, path in enumerate(joint_paths_):\n",
" print(f\"{path_collisions_as_emojis(collision_checker.CheckConfigsCollisionFree(path))}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"joint_paths = calculate_valid_joint_paths(\n",
" tcp_path, ur3e_inverse_kinematics, collision_checker.CheckConfigCollisionFree\n",
")\n",
"\n",
"for i, path in enumerate(joint_paths):\n",
" print(f\"{path_collisions_as_emojis(collision_checker.CheckConfigsCollisionFree(path))}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"animate_joint_configurations(scene.meshcat, scene.robot_diagram, scene.arm_index, joint_paths[0])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from functools import partial\n",
"\n",
"from airo_planner.selection.goal_selection import filter_with_distance_to_configurations\n",
"\n",
"path_starts = [path[0] for path in joint_paths]\n",
"\n",
"filter_fn = partial(filter_with_distance_to_configurations, joint_configurations_close=path_starts)\n",
"\n",
"filter_fn(start_configurations)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# use this a filtering example: we want to filter out the grasp paths that collide with the environment\n",
"\n",
"# def plan_to_grasp(planner: SingleArmPlanner, grasp_pose: HomogeneousMatrixType) -> Trajectory:\n",
"# pass\n",
"# # attempt planning the the grasp pose directly\n",
"# # if all goals in collision try backing-off to a pregrasp pose\n",
"# # time parametrize both paths and concatenate them\n",
"from airo_typing import InverseKinematicsFunctionType, JointConfigurationCheckerType\n",
"\n",
"\n",
"def plan_pregrasp(\n",
" grasp_pose: HomogeneousMatrixType,\n",
" start_configuration: JointConfigurationType,\n",
" inverse_kinematics_fn: InverseKinematicsFunctionType,\n",
" is_state_valid_fn_pregrasp: JointConfigurationCheckerType,\n",
" is_state_valid_fn_grasp: JointConfigurationCheckerType,\n",
"):\n",
"\n",
" pregrasp_distances_to_try = [0.05, 0.1, 0.15, 0.2, 0.25]\n",
"\n",
" planner = SingleArmOmplPlanner(is_state_valid_fn_pregrasp, inverse_kinematics_fn=inverse_kinematics_fn)\n",
"\n",
" for distance in pregrasp_distances_to_try:\n",
" # 1. Compute pregrasp pose\n",
" pregrasp_pose = grasp_pose.copy()\n",
" pregrasp_pose[0:3, 3] -= distance * pregrasp_pose[0:3, 2]\n",
"\n",
" # 2. Compute grasp TCP path\n",
" rigid_transforms = [RigidTransform(pose) for pose in [pregrasp_pose, grasp_pose]]\n",
" times = np.linspace(0, 1, len(rigid_transforms))\n",
" pose_trajectory = PiecewisePose.MakeLinear(times=times, poses=rigid_transforms)\n",
" tcp_path = discretize_drake_pose_trajectory(pose_trajectory).poses\n",
"\n",
" # 3 Compute valid grasp joint paths\n",
" joint_paths = calculate_valid_joint_paths(tcp_path, ur3e_inverse_kinematics, is_state_valid_fn_grasp)\n",
" path_starts = [path[0] for path in joint_paths]\n",
"\n",
" # 4 plan to pregrasp tcp poses, filtering on the valid grasp joint paths\n",
" filter_fn = partial(filter_with_distance_to_configurations, joint_configurations_close=path_starts)\n",
"\n",
" planner.filter_goal_configurations_fn = filter_fn\n",
" path = planner.plan_to_tcp_pose(start_configuration, pregrasp_pose)\n",
"\n",
" if path is not None:\n",
" return path\n",
"\n",
" # Exhausted all pregrasp poses to try\n",
"\n",
"\n",
"path = plan_pregrasp(\n",
" grasp_pose,\n",
" start_joints,\n",
" ur3e_inverse_kinematics,\n",
" collision_checker.CheckConfigCollisionFree,\n",
" collision_checker.CheckConfigCollisionFree,\n",
")\n",
"\n",
"animate_joint_configurations(scene.meshcat, scene.robot_diagram, scene.arm_index, path)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
Expand Down

0 comments on commit 2e9f37f

Please sign in to comment.