Skip to content

Commit

Permalink
[SimulationStep] Allow the specification of step seconds and a callab…
Browse files Browse the repository at this point in the history
…le to be called during simulation step.
  • Loading branch information
AbdelrhmanBassiouny committed Nov 5, 2024
1 parent 97a3167 commit dc6fa95
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 13 deletions.
14 changes: 10 additions & 4 deletions src/pycram/datastructures/world.py
Original file line number Diff line number Diff line change
Expand Up @@ -578,19 +578,21 @@ def get_object_link_names(self, obj: Object) -> List[str]:
"""
pass

def simulate(self, seconds: float, real_time: Optional[bool] = False) -> None:
def simulate(self, seconds: float, real_time: Optional[bool] = False,
func: Optional[Callable[[], None]] = None) -> None:
"""
Simulate Physics in the World for a given amount of seconds. Usually this simulation is faster than real
time. By setting the 'real_time' parameter this simulation is slowed down such that the simulated time is equal
to real time.
:param seconds: The amount of seconds that should be simulated.
:param real_time: If the simulation should happen in real time or faster.
:param func: A function that should be called during the simulation
"""
self.set_realtime(real_time)
for i in range(0, int(seconds * self.conf.simulation_frequency)):
curr_time = Time().now()
self.step()
self.step(func)
for objects, callbacks in self.coll_callbacks.items():
contact_points = self.get_contact_points_between_two_objects(objects[0], objects[1])
if len(contact_points) > 0:
Expand Down Expand Up @@ -809,9 +811,13 @@ def reset_multiple_objects_base_poses(self, objects: Dict[Object, Pose]) -> bool
pass

@abstractmethod
def step(self):
def step(self, func: Optional[Callable[[], None]] = None, step_seconds: Optional[float] = None) -> None:
"""
Step the world simulation using forward dynamics
Step the world simulation using forward dynamics.
:param func: An optional function to be called during the step.
:param step_seconds: The amount of seconds to step the simulation if None the simulation is stepped by the
simulation time step.
"""
pass

Expand Down
9 changes: 7 additions & 2 deletions src/pycram/worlds/bullet_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import numpy as np
import pycram_bullet as p
from geometry_msgs.msg import Point
from typing_extensions import List, Optional, Dict, Any
from typing_extensions import List, Optional, Dict, Any, Callable

from ..datastructures.dataclasses import Color, AxisAlignedBoundingBox, MultiBody, VisualShape, BoxVisualShape, \
ClosestPoint, LateralFriction, ContactPoint, ContactPointsList, ClosestPointsList
Expand All @@ -16,6 +16,7 @@
from ..datastructures.world import World
from ..object_descriptors.generic import ObjectDescription as GenericObjectDescription
from ..object_descriptors.urdf import ObjectDescription
from ..ros.logging import logwarn
from ..validation.goal_validator import (validate_multiple_joint_positions, validate_joint_position,
validate_object_pose, validate_multiple_object_poses)
from ..world_concepts.constraints import Constraint
Expand Down Expand Up @@ -242,7 +243,11 @@ def _set_object_pose_by_id(self, obj_id: int, pose: Pose) -> bool:
physicsClientId=self.id)
return True

def step(self):
def step(self, func: Optional[Callable[[], None]] = None, step_seconds: Optional[float] = None) -> None:
if func is not None:
logwarn("The BulletWorld step function does not support a function argument.")
if step_seconds is not None:
logwarn("The BulletWorld step function does not support a step_seconds argument.")
p.stepSimulation(physicsClientId=self.id)

def get_multiple_object_poses(self, objects: List[Object]) -> Dict[str, Pose]:
Expand Down
16 changes: 9 additions & 7 deletions src/pycram/worlds/multiverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,19 @@

import numpy as np
from tf.transformations import quaternion_matrix
from typing_extensions import List, Dict, Optional, Union, Tuple
from typing_extensions import List, Dict, Optional, Union, Tuple, Callable

from .multiverse_communication.client_manager import MultiverseClientManager
from .multiverse_communication.clients import MultiverseController, MultiverseReader, MultiverseWriter, MultiverseAPI
from ..config.multiverse_conf import MultiverseConfig
from ..datastructures.dataclasses import AxisAlignedBoundingBox, Color, ContactPointsList, ContactPoint, \
MultiverseContactPoint, MeshVisualShape
from ..datastructures.dataclasses import AxisAlignedBoundingBox, Color, ContactPointsList, ContactPoint
from ..datastructures.enums import WorldMode, JointType, ObjectType, MultiverseBodyProperty, MultiverseJointPosition, \
MultiverseJointCMD
from ..datastructures.pose import Pose
from ..datastructures.world import World
from ..description import Link, Joint
from ..object_descriptors.mjcf import ObjectDescription as MJCF, ObjectFactory, PrimitiveObjectFactory
from ..object_descriptors.generic import ObjectDescription as GenericObjectDescription
from ..object_descriptors.mjcf import ObjectDescription as MJCF, PrimitiveObjectFactory
from ..robot_description import RobotDescription
from ..ros.logging import logwarn, logerr
from ..utils import RayTestUtils, wxyz_to_xyzw, xyzw_to_wxyz
Expand Down Expand Up @@ -545,7 +544,7 @@ def get_object_contact_points(self, obj: Object) -> ContactPointsList:
contact_points[-1].position_on_b = point.position
return contact_points

def get_object_with_body_name(self, body_name: str) -> Tuple[Optional[Object],Optional[Link]]:
def get_object_with_body_name(self, body_name: str) -> Tuple[Optional[Object], Optional[Link]]:
"""
Get the object with the body name in the simulator, the body name can be the name of the object or the link.
Expand Down Expand Up @@ -624,9 +623,12 @@ def ray_test_batch(self, from_positions: List[List[float]],
else:
return results

def step(self):
def step(self, func: Optional[Callable[[], None]] = None, step_seconds: Optional[float] = None) -> None:
"""
Perform a simulation step in the simulator, this is useful when use_static_mode is True.
:param func: A function to be called after the simulation step.
:param step_seconds: The number of seconds to step the simulation.
"""
if self.conf.use_static_mode:
self.unpause_simulation()
Expand Down Expand Up @@ -671,7 +673,7 @@ def get_link_axis_aligned_bounding_box(self, link: Link) -> AxisAlignedBoundingB

def set_realtime(self, real_time: bool) -> None:
logwarn("set_realtime is not implemented as an API in Multiverse, it is configured in the"
"multiverse configuration file (.muv file) as rtf_required where a value of 1 means real-time")
"multiverse configuration file (.muv file) as rtf_required where a value of 1 means real-time")

def set_gravity(self, gravity_vector: List[float]) -> None:
logwarn("set_gravity is not implemented in Multiverse")
Expand Down

0 comments on commit dc6fa95

Please sign in to comment.