Skip to content

Commit

Permalink
split read-write lock
Browse files Browse the repository at this point in the history
  • Loading branch information
Victorlouisdg committed Feb 16, 2024
1 parent 25f4407 commit f6baf77
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
_INTRINSICS_SHM_NAME = "intrinsics"
_FPS_SHM_NAME = "fps"
# We use this flag as a lock (we can't use built-in events/locks because they need to be passed explicitly to the receivers.)
_READ_WRITE_LOCK_SHM_NAME = "read_write_lock"
_WRITE_LOCK_SHM_NAME = "write_lock" # Boolean: only one writer allowed at a time.
_READ_LOCK_SHM_NAME = "read_lock" # int: number of readers currently reading.


def shared_memory_block_like(array: np.ndarray, name: str) -> Tuple[shared_memory.SharedMemory, np.ndarray]:
Expand Down Expand Up @@ -85,7 +86,7 @@ def __init__(
self.timestamp_shm: Optional[shared_memory.SharedMemory] = None
self.intrinsics_shm: Optional[shared_memory.SharedMemory] = None
self.fps_shm: Optional[shared_memory.SharedMemory] = None
self.read_write_lock_shm: Optional[shared_memory.SharedMemory] = None
self.write_lock_shm: Optional[shared_memory.SharedMemory] = None

def start(self) -> None:
"""Starts the process. The process will not start until this method is called."""
Expand Down Expand Up @@ -130,7 +131,8 @@ def _setup(self) -> None:
timestamp_name = f"{self._shared_memory_namespace}_{_TIMESTAMP_SHM_NAME}"
intrinsics_name = f"{self._shared_memory_namespace}_{_INTRINSICS_SHM_NAME}"
fps_name = f"{self._shared_memory_namespace}_{_FPS_SHM_NAME}"
read_write_lock_name = f"{self._shared_memory_namespace}_{_READ_WRITE_LOCK_SHM_NAME}"
write_lock_name = f"{self._shared_memory_namespace}_{_WRITE_LOCK_SHM_NAME}"
read_lock_name = f"{self._shared_memory_namespace}_{_READ_LOCK_SHM_NAME}"

# Get the example arrays (this is the easiest way to initialize the shared memory blocks with the correct size).
rgb = self._camera.get_rgb_image_as_int() # We pass uint8 images as they consume 4x less memory
Expand All @@ -141,7 +143,8 @@ def _setup(self) -> None:
intrinsics = self._camera.intrinsics_matrix()
fps = np.array([self._camera.fps], dtype=np.float64)

read_write_lock = np.array([False], dtype=np.bool_)
write_lock = np.array([False], dtype=np.bool_)
read_lock = np.array([0], dtype=np.int_)

# Create the shared memory blocks and numpy arrays that are backed by them.
logger.info("Creating RGB shared memory blocks.")
Expand All @@ -150,9 +153,8 @@ def _setup(self) -> None:
self.timestamp_shm, self.timestamp_shm_array = shared_memory_block_like(timestamp, timestamp_name)
self.intrinsics_shm, self.intrinsics_shm_array = shared_memory_block_like(intrinsics, intrinsics_name)
self.fps_shm, self.fps_shm_array = shared_memory_block_like(fps, fps_name)
self.read_write_lock_shm, self.read_write_lock_shm_array = shared_memory_block_like(
read_write_lock, read_write_lock_name
)
self.write_lock_shm, self.write_lock_shm_array = shared_memory_block_like(write_lock, write_lock_name)
self.read_lock_shm, self.read_lock_shm_array = shared_memory_block_like(read_lock, read_lock_name)

logger.info("Created RGB shared memory blocks.")

Expand Down Expand Up @@ -180,11 +182,18 @@ def run(self) -> None:

try:
while not self.shutdown_event.is_set():
# Retrieve an image from the camera
image = self._camera.get_rgb_image_as_int()
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)

# Wait to write to the shared memory block until there are no active readers (or writers).
# (Normally we should be the only writer though.)
while self.read_lock_shm_array[0] > 0 and self.write_lock_shm_array[0]:
time.sleep(0.00001)
self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_)

self.rgb_shm_array[:] = image[:]
self.timestamp_shm_array[:] = np.array([time.time()])[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.running_event.set()
except Exception as e:
logger.error(f"Error in {self.__class__.__name__}: {e}")
Expand Down Expand Up @@ -229,10 +238,15 @@ def unlink_shared_memory(self) -> None:
self.fps_shm.unlink()
self.fps_shm = None

if self.read_write_lock_shm is not None:
self.read_write_lock_shm.close()
self.read_write_lock_shm.unlink()
self.read_write_lock_shm = None
if self.write_lock_shm is not None:
self.write_lock_shm.close()
self.write_lock_shm.unlink()
self.write_lock_shm = None

if self.read_lock_shm is not None:
self.read_lock_shm.close()
self.read_lock_shm.unlink()
self.read_lock_shm = None

def __del__(self) -> None:
self.unlink_shared_memory()
Expand All @@ -255,7 +269,8 @@ def __init__(
timestamp_name = f"{self._shared_memory_namespace}_{_TIMESTAMP_SHM_NAME}"
intrinsics_name = f"{self._shared_memory_namespace}_{_INTRINSICS_SHM_NAME}"
fps_name = f"{self._shared_memory_namespace}_{_FPS_SHM_NAME}"
read_write_lock_name = f"{self._shared_memory_namespace}_{_READ_WRITE_LOCK_SHM_NAME}"
write_lock_name = f"{self._shared_memory_namespace}_{_WRITE_LOCK_SHM_NAME}"
read_lock_name = f"{self._shared_memory_namespace}_{_READ_LOCK_SHM_NAME}"

# Attach to existing shared memory blocks. Retry a few times to give the publisher time to start up (opening
# connection to a camera can take a while).
Expand All @@ -265,7 +280,8 @@ def __init__(
self.timestamp_shm = shared_memory.SharedMemory(name=timestamp_name)
self.intrinsics_shm = shared_memory.SharedMemory(name=intrinsics_name)
self.fps_shm = shared_memory.SharedMemory(name=fps_name)
self.read_write_lock_shm = shared_memory.SharedMemory(name=read_write_lock_name)
self.write_lock_shm = shared_memory.SharedMemory(name=write_lock_name)
self.read_lock_shm = shared_memory.SharedMemory(name=read_lock_name)

logger.info(f'SharedMemory namespace "{self._shared_memory_namespace}" found.')

Expand All @@ -280,7 +296,8 @@ def __init__(
resource_tracker.unregister(self.intrinsics_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.timestamp_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.fps_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.read_write_lock_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.write_lock_shm._name, "shared_memory") # type: ignore[attr-defined]
resource_tracker.unregister(self.read_lock_shm._name, "shared_memory") # type: ignore[attr-defined]

# Timestamp and intrinsics are the same shape for all images, so I decided that we could hardcode their shape.
# However, images come in many shapes, which I also decided to pass via shared memory. (Previously, I required
Expand All @@ -291,9 +308,8 @@ def __init__(
self.intrinsics_shm_array: np.ndarray = np.ndarray((3, 3), dtype=np.float64, buffer=self.intrinsics_shm.buf)
self.timestamp_shm_array: np.ndarray = np.ndarray((1,), dtype=np.float64, buffer=self.timestamp_shm.buf)
self.fps_shm_array: np.ndarray = np.ndarray((1,), dtype=np.float64, buffer=self.fps_shm.buf)
self.read_write_lock_shm_array: np.ndarray = np.ndarray(
(1,), dtype=np.bool_, buffer=self.read_write_lock_shm.buf
)
self.write_lock_shm_array: np.ndarray = np.ndarray((1,), dtype=np.bool_, buffer=self.write_lock_shm.buf)
self.read_lock_shm_array: np.ndarray = np.ndarray((1,), dtype=np.int_, buffer=self.read_lock_shm.buf)

# The shape of the image is not known in advance, so we need to retrieve it from the shared memory block.
rgb_shape = tuple(self.rgb_shape_shm_array[:])
Expand Down Expand Up @@ -331,11 +347,11 @@ def _retrieve_rgb_image(self) -> NumpyFloatImageType:

def _retrieve_rgb_image_as_int(self) -> NumpyIntImageType:
logger.debug("Retrieving RGB (TO BUFFER).")
while self.read_write_lock_shm_array[0]:
while self.write_lock_shm_array[0]:
time.sleep(0.00001)
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.read_lock_shm_array[0] += 1
self.rgb_buffer_array[:] = self.rgb_shm_array[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.read_lock_shm_array[0] -= 1
return self.rgb_buffer_array

def intrinsics_matrix(self) -> CameraIntrinsicsMatrixType:
Expand Down Expand Up @@ -365,9 +381,13 @@ def _close_shared_memory(self) -> None:
self.fps_shm.close()
self.fps_shm = None

if self.read_write_lock_shm is not None:
self.read_write_lock_shm.close()
self.read_write_lock_shm = None
if self.write_lock_shm is not None:
self.write_lock_shm.close()
self.write_lock_shm = None

if self.read_lock_shm is not None:
self.read_lock_shm.close()
self.read_lock_shm = None

def __del__(self) -> None:
self._close_shared_memory()
Expand Down Expand Up @@ -422,7 +442,7 @@ def __del__(self) -> None:
fps_str = f"{fps:.2f}".rjust(6, " ")
camera_fps_str = f"{camera_fps:.2f}".rjust(6, " ")
if fps < 0.9 * camera_fps:
logger.warning(f"FPS: {fps_str} / {camera_fps_str} (recorder might be missing frames)")
logger.warning(f"FPS: {fps_str} / {camera_fps_str} (too slow)")
else:
logger.debug(f"FPS: {fps_str} / {camera_fps_str}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,33 @@ def run(self) -> None:

try:
while not self.shutdown_event.is_set():
start_time = time.time()
image = self._camera.get_rgb_image_as_int()
depth_map = self._camera._retrieve_depth_map()
depth_image = self._camera._retrieve_depth_image()
confidence_map = self._camera._retrieve_confidence_map()
point_cloud = self._camera._retrieve_colored_point_cloud()
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
end_time = time.time()
logger.info(f"Time to grab images: {end_time - start_time:.3f} s")

start_time = time.time()
while self.read_lock_shm_array[0] > 0 and self.write_lock_shm_array[0]:
time.sleep(0.00001)
end_time = time.time()
logger.info(f"Time to wait for lock: {end_time - start_time:.3f} s")

start_time = time.time()
self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.rgb_shm_array[:] = image[:]
self.depth_shm_array[:] = depth_map[:]
self.depth_image_shm_array[:] = depth_image[:]
self.confidence_map_shm_array[:] = confidence_map[:]
self.point_cloud_positions_shm_array[:] = point_cloud.points[:]
self.point_cloud_colors_shm_array[:] = point_cloud.colors[:]
self.timestamp_shm_array[:] = np.array([time.time()])[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
end_time = time.time()
logger.info(f"Time to write to shared memory: {end_time - start_time:.3f} s")
self.running_event.set()
except Exception as e:
logger.error(f"Error in {self.__class__.__name__}: {e}")
Expand Down Expand Up @@ -293,38 +306,37 @@ def __init__(self, shared_memory_namespace: str) -> None:
self.point_cloud_colors_buffer_array = np.ndarray(point_cloud_colors_shape, dtype=np.uint8)

def _retrieve_depth_map(self) -> NumpyDepthMapType:
while self.read_write_lock_shm_array[0]:
while self.write_lock_shm_array[0]:
time.sleep(0.00001)
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.read_lock_shm_array[0] += 1
self.depth_buffer_array[:] = self.depth_shm_array[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.read_lock_shm_array[0] -= 1
return self.depth_buffer_array

def _retrieve_depth_image(self) -> NumpyIntImageType:
while self.read_write_lock_shm_array[0]:
while self.write_lock_shm_array[0]:
time.sleep(0.00001)
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.read_lock_shm_array[0] += 1
self.depth_image_buffer_array[:] = self.depth_image_shm_array[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.read_lock_shm_array[0] -= 1
return self.depth_image_buffer_array

def _retrieve_confidence_map(self) -> NumpyFloatImageType:
while self.read_write_lock_shm_array[0]:
while self.write_lock_shm_array[0]:
time.sleep(0.00001)
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.read_lock_shm_array[0] += 1
self.confidence_map_buffer_array[:] = self.confidence_map_shm_array[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.read_lock_shm_array[0] -= 1
return self.confidence_map_buffer_array

def _retrieve_colored_point_cloud(self) -> PointCloud:
logger.debug("Retrieving point cloud (TO BUFFER).")
while self.read_write_lock_shm_array[0]:
while self.write_lock_shm_array[0]:
time.sleep(0.00001)
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.read_lock_shm_array[0] += 1
self.point_cloud_positions_buffer_array[:] = self.point_cloud_positions_shm_array[:]
self.point_cloud_colors_buffer_array[:] = self.point_cloud_colors_shm_array[:]

self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.read_lock_shm_array[0] -= 1
point_cloud = PointCloud(self.point_cloud_positions_buffer_array, self.point_cloud_colors_buffer_array)
return point_cloud

Expand Down Expand Up @@ -418,7 +430,7 @@ def __del__(self) -> None:
depth_map = receiver.get_depth_map()
depth_image = receiver._retrieve_depth_image()
confidence_map = receiver._retrieve_confidence_map()
# point_cloud = receiver._retrieve_colored_point_cloud()
point_cloud = receiver._retrieve_colored_point_cloud()

cv2.imshow("Depth Map", depth_map)
cv2.imshow("Depth Image", depth_image)
Expand All @@ -434,7 +446,7 @@ def __del__(self) -> None:
fps_str = f"{fps:.2f}".rjust(6, " ")
camera_fps_str = f"{camera_fps:.2f}".rjust(6, " ")
if fps < 0.9 * camera_fps:
logger.warning(f"FPS: {fps_str} / {camera_fps_str} (recorder might be missing frames)")
logger.warning(f"FPS: {fps_str} / {camera_fps_str} (too slow)")
else:
logger.debug(f"FPS: {fps_str} / {camera_fps_str}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def run(self) -> None:
depth_image = self._camera._retrieve_depth_image()
confidence_map = self._camera._retrieve_confidence_map()
point_cloud = self._camera._retrieve_colored_point_cloud()
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.rgb_shm_array[:] = image[:]
self.rgb_right_shm_array[:] = image_right[:]
self.depth_shm_array[:] = depth_map[:]
Expand All @@ -102,7 +102,7 @@ def run(self) -> None:
self.point_cloud_positions_shm_array[:] = point_cloud.points[:]
self.point_cloud_colors_shm_array[:] = point_cloud.colors[:]
self.timestamp_shm_array[:] = np.array([time.time()])[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.running_event.set()
except Exception as e:
logger.error(f"Error in {self.__class__.__name__}: {e}")
Expand Down Expand Up @@ -184,20 +184,20 @@ def _retrieve_rgb_image(self, view: str = StereoRGBDCamera.LEFT_RGB) -> NumpyFlo
return image

def _retrieve_rgb_image_as_int(self, view: str = StereoRGBDCamera.LEFT_RGB) -> NumpyIntImageType:
while self.read_write_lock_shm_array[0]:
while self.write_lock_shm_array[0]:
time.sleep(0.00001)

# doing arr[0] = True/False might also work
if view == StereoRGBDCamera.LEFT_RGB:
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.rgb_buffer_array[:] = self.rgb_shm_array[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
return self.rgb_buffer_array
elif view == StereoRGBDCamera.RIGHT_RGB:
logger.debug("Retrieving RGB RIGHT (TO BUFFER).")
self.read_write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([True], dtype=np.bool_)
self.rgb_right_buffer_array[:] = self.rgb_right_shm_array[:]
self.read_write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
self.write_lock_shm_array[:] = np.array([False], dtype=np.bool_)
return self.rgb_right_buffer_array
else:
raise ValueError(f"Unknown view: {view}")
Expand Down Expand Up @@ -321,7 +321,7 @@ def __del__(self) -> None:
fps_str = f"{fps:.2f}".rjust(6, " ")
camera_fps_str = f"{camera_fps:.2f}".rjust(6, " ")
if fps < 0.9 * camera_fps:
logger.warning(f"FPS: {fps_str} / {camera_fps_str} (recorder might be missing frames)")
logger.warning(f"FPS: {fps_str} / {camera_fps_str} (too slow)")
else:
logger.debug(f"FPS: {fps_str} / {camera_fps_str}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from airo_camera_toolkit.cameras.multiprocess.multiprocess_rgb_camera import MultiprocessRGBReceiver
from airo_camera_toolkit.image_transforms.image_transform import ImageTransform
from airo_camera_toolkit.utils.image_converter import ImageConverter
from loguru import logger


Expand Down Expand Up @@ -58,12 +59,12 @@ def run(self) -> None:
while not self.shutdown_event.is_set():
time_previous = time_current
time_current = time.time()
receiver.get_rgb_image_as_int()
# image = ImageConverter.from_numpy_int_format(image_rgb).image_in_opencv_format
image_rgb = receiver.get_rgb_image_as_int()
image = ImageConverter.from_numpy_int_format(image_rgb).image_in_opencv_format
# # Known bug: vertical images still give horizontal videos
# if self._image_transform is not None:
# image = self._image_transform.transform_image(image)
# video_writer.write(image)
if self._image_transform is not None:
image = self._image_transform.transform_image(image)
video_writer.write(image)
self.recording_started_event.set()

if time_previous is not None:
Expand Down

0 comments on commit f6baf77

Please sign in to comment.