diff --git a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgb_camera.py b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgb_camera.py index 942a426..e80ca24 100644 --- a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgb_camera.py +++ b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgb_camera.py @@ -18,7 +18,8 @@ _INTRINSICS_SHM_NAME = "intrinsics" _FPS_SHM_NAME = "fps" # We use this flag as a lock (we can't use built-in events/locks because they need to be passed explicitly to the receivers.) -_READ_WRITE_LOCK_SHM_NAME = "read_write_lock" +_WRITE_LOCK_SHM_NAME = "write_lock" # Boolean: only one writer allowed at a time. +_READ_LOCK_SHM_NAME = "read_lock" # int: number of readers currently reading. def shared_memory_block_like(array: np.ndarray, name: str) -> Tuple[shared_memory.SharedMemory, np.ndarray]: @@ -85,7 +86,7 @@ def __init__( self.timestamp_shm: Optional[shared_memory.SharedMemory] = None self.intrinsics_shm: Optional[shared_memory.SharedMemory] = None self.fps_shm: Optional[shared_memory.SharedMemory] = None - self.read_write_lock_shm: Optional[shared_memory.SharedMemory] = None + self.write_lock_shm: Optional[shared_memory.SharedMemory] = None def start(self) -> None: """Starts the process. The process will not start until this method is called.""" @@ -130,7 +131,8 @@ def _setup(self) -> None: timestamp_name = f"{self._shared_memory_namespace}_{_TIMESTAMP_SHM_NAME}" intrinsics_name = f"{self._shared_memory_namespace}_{_INTRINSICS_SHM_NAME}" fps_name = f"{self._shared_memory_namespace}_{_FPS_SHM_NAME}" - read_write_lock_name = f"{self._shared_memory_namespace}_{_READ_WRITE_LOCK_SHM_NAME}" + write_lock_name = f"{self._shared_memory_namespace}_{_WRITE_LOCK_SHM_NAME}" + read_lock_name = f"{self._shared_memory_namespace}_{_READ_LOCK_SHM_NAME}" # Get the example arrays (this is the easiest way to initialize the shared memory blocks with the correct size). rgb = self._camera.get_rgb_image_as_int() # We pass uint8 images as they consume 4x less memory @@ -141,7 +143,8 @@ def _setup(self) -> None: intrinsics = self._camera.intrinsics_matrix() fps = np.array([self._camera.fps], dtype=np.float64) - read_write_lock = np.array([False], dtype=np.bool_) + write_lock = np.array([False], dtype=np.bool_) + read_lock = np.array([0], dtype=np.int_) # Create the shared memory blocks and numpy arrays that are backed by them. logger.info("Creating RGB shared memory blocks.") @@ -150,9 +153,8 @@ def _setup(self) -> None: self.timestamp_shm, self.timestamp_shm_array = shared_memory_block_like(timestamp, timestamp_name) self.intrinsics_shm, self.intrinsics_shm_array = shared_memory_block_like(intrinsics, intrinsics_name) self.fps_shm, self.fps_shm_array = shared_memory_block_like(fps, fps_name) - self.read_write_lock_shm, self.read_write_lock_shm_array = shared_memory_block_like( - read_write_lock, read_write_lock_name - ) + self.write_lock_shm, self.write_lock_shm_array = shared_memory_block_like(write_lock, write_lock_name) + self.read_lock_shm, self.read_lock_shm_array = shared_memory_block_like(read_lock, read_lock_name) logger.info("Created RGB shared memory blocks.") @@ -180,11 +182,18 @@ def run(self) -> None: try: while not self.shutdown_event.is_set(): + # Retrieve an image from the camera image = self._camera.get_rgb_image_as_int() - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + + # Wait to write to the shared memory block until there are no active readers (or writers). + # (Normally we should be the only writer though.) + while self.read_lock_shm_array[0] > 0 and self.write_lock_shm_array[0]: + time.sleep(0.00001) + self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.rgb_shm_array[:] = image[:] self.timestamp_shm_array[:] = np.array([time.time()])[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_) self.running_event.set() except Exception as e: logger.error(f"Error in {self.__class__.__name__}: {e}") @@ -229,10 +238,15 @@ def unlink_shared_memory(self) -> None: self.fps_shm.unlink() self.fps_shm = None - if self.read_write_lock_shm is not None: - self.read_write_lock_shm.close() - self.read_write_lock_shm.unlink() - self.read_write_lock_shm = None + if self.write_lock_shm is not None: + self.write_lock_shm.close() + self.write_lock_shm.unlink() + self.write_lock_shm = None + + if self.read_lock_shm is not None: + self.read_lock_shm.close() + self.read_lock_shm.unlink() + self.read_lock_shm = None def __del__(self) -> None: self.unlink_shared_memory() @@ -255,7 +269,8 @@ def __init__( timestamp_name = f"{self._shared_memory_namespace}_{_TIMESTAMP_SHM_NAME}" intrinsics_name = f"{self._shared_memory_namespace}_{_INTRINSICS_SHM_NAME}" fps_name = f"{self._shared_memory_namespace}_{_FPS_SHM_NAME}" - read_write_lock_name = f"{self._shared_memory_namespace}_{_READ_WRITE_LOCK_SHM_NAME}" + write_lock_name = f"{self._shared_memory_namespace}_{_WRITE_LOCK_SHM_NAME}" + read_lock_name = f"{self._shared_memory_namespace}_{_READ_LOCK_SHM_NAME}" # Attach to existing shared memory blocks. Retry a few times to give the publisher time to start up (opening # connection to a camera can take a while). @@ -265,7 +280,8 @@ def __init__( self.timestamp_shm = shared_memory.SharedMemory(name=timestamp_name) self.intrinsics_shm = shared_memory.SharedMemory(name=intrinsics_name) self.fps_shm = shared_memory.SharedMemory(name=fps_name) - self.read_write_lock_shm = shared_memory.SharedMemory(name=read_write_lock_name) + self.write_lock_shm = shared_memory.SharedMemory(name=write_lock_name) + self.read_lock_shm = shared_memory.SharedMemory(name=read_lock_name) logger.info(f'SharedMemory namespace "{self._shared_memory_namespace}" found.') @@ -280,7 +296,8 @@ def __init__( resource_tracker.unregister(self.intrinsics_shm._name, "shared_memory") # type: ignore[attr-defined] resource_tracker.unregister(self.timestamp_shm._name, "shared_memory") # type: ignore[attr-defined] resource_tracker.unregister(self.fps_shm._name, "shared_memory") # type: ignore[attr-defined] - resource_tracker.unregister(self.read_write_lock_shm._name, "shared_memory") # type: ignore[attr-defined] + resource_tracker.unregister(self.write_lock_shm._name, "shared_memory") # type: ignore[attr-defined] + resource_tracker.unregister(self.read_lock_shm._name, "shared_memory") # type: ignore[attr-defined] # Timestamp and intrinsics are the same shape for all images, so I decided that we could hardcode their shape. # However, images come in many shapes, which I also decided to pass via shared memory. (Previously, I required @@ -291,9 +308,8 @@ def __init__( self.intrinsics_shm_array: np.ndarray = np.ndarray((3, 3), dtype=np.float64, buffer=self.intrinsics_shm.buf) self.timestamp_shm_array: np.ndarray = np.ndarray((1,), dtype=np.float64, buffer=self.timestamp_shm.buf) self.fps_shm_array: np.ndarray = np.ndarray((1,), dtype=np.float64, buffer=self.fps_shm.buf) - self.read_write_lock_shm_array: np.ndarray = np.ndarray( - (1,), dtype=np.bool_, buffer=self.read_write_lock_shm.buf - ) + self.write_lock_shm_array: np.ndarray = np.ndarray((1,), dtype=np.bool_, buffer=self.write_lock_shm.buf) + self.read_lock_shm_array: np.ndarray = np.ndarray((1,), dtype=np.int_, buffer=self.read_lock_shm.buf) # The shape of the image is not known in advance, so we need to retrieve it from the shared memory block. rgb_shape = tuple(self.rgb_shape_shm_array[:]) @@ -331,11 +347,11 @@ def _retrieve_rgb_image(self) -> NumpyFloatImageType: def _retrieve_rgb_image_as_int(self) -> NumpyIntImageType: logger.debug("Retrieving RGB (TO BUFFER).") - while self.read_write_lock_shm_array[0]: + while self.write_lock_shm_array[0]: time.sleep(0.00001) - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.read_lock_shm_array[0] += 1 self.rgb_buffer_array[:] = self.rgb_shm_array[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.read_lock_shm_array[0] -= 1 return self.rgb_buffer_array def intrinsics_matrix(self) -> CameraIntrinsicsMatrixType: @@ -365,9 +381,13 @@ def _close_shared_memory(self) -> None: self.fps_shm.close() self.fps_shm = None - if self.read_write_lock_shm is not None: - self.read_write_lock_shm.close() - self.read_write_lock_shm = None + if self.write_lock_shm is not None: + self.write_lock_shm.close() + self.write_lock_shm = None + + if self.read_lock_shm is not None: + self.read_lock_shm.close() + self.read_lock_shm = None def __del__(self) -> None: self._close_shared_memory() @@ -422,7 +442,7 @@ def __del__(self) -> None: fps_str = f"{fps:.2f}".rjust(6, " ") camera_fps_str = f"{camera_fps:.2f}".rjust(6, " ") if fps < 0.9 * camera_fps: - logger.warning(f"FPS: {fps_str} / {camera_fps_str} (recorder might be missing frames)") + logger.warning(f"FPS: {fps_str} / {camera_fps_str} (too slow)") else: logger.debug(f"FPS: {fps_str} / {camera_fps_str}") diff --git a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgbd_camera.py b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgbd_camera.py index 0e4bba4..6c06770 100644 --- a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgbd_camera.py +++ b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_rgbd_camera.py @@ -125,12 +125,23 @@ def run(self) -> None: try: while not self.shutdown_event.is_set(): + start_time = time.time() image = self._camera.get_rgb_image_as_int() depth_map = self._camera._retrieve_depth_map() depth_image = self._camera._retrieve_depth_image() confidence_map = self._camera._retrieve_confidence_map() point_cloud = self._camera._retrieve_colored_point_cloud() - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + end_time = time.time() + logger.info(f"Time to grab images: {end_time - start_time:.3f} s") + + start_time = time.time() + while self.read_lock_shm_array[0] > 0 and self.write_lock_shm_array[0]: + time.sleep(0.00001) + end_time = time.time() + logger.info(f"Time to wait for lock: {end_time - start_time:.3f} s") + + start_time = time.time() + self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_) self.rgb_shm_array[:] = image[:] self.depth_shm_array[:] = depth_map[:] self.depth_image_shm_array[:] = depth_image[:] @@ -138,7 +149,9 @@ def run(self) -> None: self.point_cloud_positions_shm_array[:] = point_cloud.points[:] self.point_cloud_colors_shm_array[:] = point_cloud.colors[:] self.timestamp_shm_array[:] = np.array([time.time()])[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + end_time = time.time() + logger.info(f"Time to write to shared memory: {end_time - start_time:.3f} s") self.running_event.set() except Exception as e: logger.error(f"Error in {self.__class__.__name__}: {e}") @@ -293,38 +306,37 @@ def __init__(self, shared_memory_namespace: str) -> None: self.point_cloud_colors_buffer_array = np.ndarray(point_cloud_colors_shape, dtype=np.uint8) def _retrieve_depth_map(self) -> NumpyDepthMapType: - while self.read_write_lock_shm_array[0]: + while self.write_lock_shm_array[0]: time.sleep(0.00001) - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.read_lock_shm_array[0] += 1 self.depth_buffer_array[:] = self.depth_shm_array[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.read_lock_shm_array[0] -= 1 return self.depth_buffer_array def _retrieve_depth_image(self) -> NumpyIntImageType: - while self.read_write_lock_shm_array[0]: + while self.write_lock_shm_array[0]: time.sleep(0.00001) - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.read_lock_shm_array[0] += 1 self.depth_image_buffer_array[:] = self.depth_image_shm_array[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.read_lock_shm_array[0] -= 1 return self.depth_image_buffer_array def _retrieve_confidence_map(self) -> NumpyFloatImageType: - while self.read_write_lock_shm_array[0]: + while self.write_lock_shm_array[0]: time.sleep(0.00001) - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.read_lock_shm_array[0] += 1 self.confidence_map_buffer_array[:] = self.confidence_map_shm_array[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.read_lock_shm_array[0] -= 1 return self.confidence_map_buffer_array def _retrieve_colored_point_cloud(self) -> PointCloud: logger.debug("Retrieving point cloud (TO BUFFER).") - while self.read_write_lock_shm_array[0]: + while self.write_lock_shm_array[0]: time.sleep(0.00001) - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.read_lock_shm_array[0] += 1 self.point_cloud_positions_buffer_array[:] = self.point_cloud_positions_shm_array[:] self.point_cloud_colors_buffer_array[:] = self.point_cloud_colors_shm_array[:] - - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.read_lock_shm_array[0] -= 1 point_cloud = PointCloud(self.point_cloud_positions_buffer_array, self.point_cloud_colors_buffer_array) return point_cloud @@ -418,7 +430,7 @@ def __del__(self) -> None: depth_map = receiver.get_depth_map() depth_image = receiver._retrieve_depth_image() confidence_map = receiver._retrieve_confidence_map() - # point_cloud = receiver._retrieve_colored_point_cloud() + point_cloud = receiver._retrieve_colored_point_cloud() cv2.imshow("Depth Map", depth_map) cv2.imshow("Depth Image", depth_image) @@ -434,7 +446,7 @@ def __del__(self) -> None: fps_str = f"{fps:.2f}".rjust(6, " ") camera_fps_str = f"{camera_fps:.2f}".rjust(6, " ") if fps < 0.9 * camera_fps: - logger.warning(f"FPS: {fps_str} / {camera_fps_str} (recorder might be missing frames)") + logger.warning(f"FPS: {fps_str} / {camera_fps_str} (too slow)") else: logger.debug(f"FPS: {fps_str} / {camera_fps_str}") diff --git a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_stereo_rgbd_camera.py b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_stereo_rgbd_camera.py index 0d4584f..9a4bd01 100644 --- a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_stereo_rgbd_camera.py +++ b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_stereo_rgbd_camera.py @@ -93,7 +93,7 @@ def run(self) -> None: depth_image = self._camera._retrieve_depth_image() confidence_map = self._camera._retrieve_confidence_map() point_cloud = self._camera._retrieve_colored_point_cloud() - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_) self.rgb_shm_array[:] = image[:] self.rgb_right_shm_array[:] = image_right[:] self.depth_shm_array[:] = depth_map[:] @@ -102,7 +102,7 @@ def run(self) -> None: self.point_cloud_positions_shm_array[:] = point_cloud.points[:] self.point_cloud_colors_shm_array[:] = point_cloud.colors[:] self.timestamp_shm_array[:] = np.array([time.time()])[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_) self.running_event.set() except Exception as e: logger.error(f"Error in {self.__class__.__name__}: {e}") @@ -184,20 +184,20 @@ def _retrieve_rgb_image(self, view: str = StereoRGBDCamera.LEFT_RGB) -> NumpyFlo return image def _retrieve_rgb_image_as_int(self, view: str = StereoRGBDCamera.LEFT_RGB) -> NumpyIntImageType: - while self.read_write_lock_shm_array[0]: + while self.write_lock_shm_array[0]: time.sleep(0.00001) # doing arr[0] = True/False might also work if view == StereoRGBDCamera.LEFT_RGB: - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_) self.rgb_buffer_array[:] = self.rgb_shm_array[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_) return self.rgb_buffer_array elif view == StereoRGBDCamera.RIGHT_RGB: logger.debug("Retrieving RGB RIGHT (TO BUFFER).") - self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_) self.rgb_right_buffer_array[:] = self.rgb_right_shm_array[:] - self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_) + self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_) return self.rgb_right_buffer_array else: raise ValueError(f"Unknown view: {view}") @@ -321,7 +321,7 @@ def __del__(self) -> None: fps_str = f"{fps:.2f}".rjust(6, " ") camera_fps_str = f"{camera_fps:.2f}".rjust(6, " ") if fps < 0.9 * camera_fps: - logger.warning(f"FPS: {fps_str} / {camera_fps_str} (recorder might be missing frames)") + logger.warning(f"FPS: {fps_str} / {camera_fps_str} (too slow)") else: logger.debug(f"FPS: {fps_str} / {camera_fps_str}") diff --git a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_video_recorder.py b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_video_recorder.py index 89853a8..a2aa0b6 100644 --- a/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_video_recorder.py +++ b/airo-camera-toolkit/airo_camera_toolkit/cameras/multiprocess/multiprocess_video_recorder.py @@ -7,6 +7,7 @@ from airo_camera_toolkit.cameras.multiprocess.multiprocess_rgb_camera import MultiprocessRGBReceiver from airo_camera_toolkit.image_transforms.image_transform import ImageTransform +from airo_camera_toolkit.utils.image_converter import ImageConverter from loguru import logger @@ -58,12 +59,12 @@ def run(self) -> None: while not self.shutdown_event.is_set(): time_previous = time_current time_current = time.time() - receiver.get_rgb_image_as_int() - # image = ImageConverter.from_numpy_int_format(image_rgb).image_in_opencv_format + image_rgb = receiver.get_rgb_image_as_int() + image = ImageConverter.from_numpy_int_format(image_rgb).image_in_opencv_format # # Known bug: vertical images still give horizontal videos - # if self._image_transform is not None: - # image = self._image_transform.transform_image(image) - # video_writer.write(image) + if self._image_transform is not None: + image = self._image_transform.transform_image(image) + video_writer.write(image) self.recording_started_event.set() if time_previous is not None: