Skip to content

Commit

Permalink
fix(storage-manager): race condition on latest_updates structure
Browse files Browse the repository at this point in the history
As the lock on the `latest_updates` structure was dropped before the
operation on the Storage was performed and re-acquired afterwards, a
race condition could have happened as illustrated in the following
scenario.

---

Let us assume that the Storage and `latest_updates` have `(A, T_A0)`.

A first operation is performed with `T_A1 > T_A0`. As the timestamp is
greater, `latest_updates` will allow it. The lock on `latest_updates` is
released.

The Storage is updated and has `(A, T_A1)`.

In the meantime, another operation is received with `T_A2 > T_A0`. As
the timestamp is greater than `T_A0` (`latest_updates` was still not
updated!), it is allowed.

For scheduling reasons, the second operation is performed *entirely*:
the Storage and `latest_updates` both have `(A, T_A2)`.

The scheduler goes back to finish `T_A1` and inserts in `latest_updates`
the pair `(A, T_A1)`.

The information in the Storage (`(A, T_A2)`) and in
`latest_updates` (`(A, T_A1)`) are no longer coherent.

---

This commit fixes this issue by only releasing the lock on the
`latest_updates` structure after having updated it — instead of after
checking if the received operation is more recent.

Note that this should create very little additional contention: the bulk
of the processing is performed when interacting with the Storage, not
when checking / updating the `latest_updates` structure.

* plugins/zenoh-plugin-storage-manager/src/replica/storage.rs: only
  release the lock on the `latest_updates` structure after having
  updated it.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 11, 2024
1 parent 717e005 commit 2bc10a2
Showing 1 changed file with 4 additions and 7 deletions.
11 changes: 4 additions & 7 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,10 +359,9 @@ impl StorageService {

// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
let mut latest_updates_guard = self.latest_updates.lock().await;
if self.capability.history == History::Latest {
if let Some(stored_timestamp) =
self.latest_updates.lock().await.get(&stripped_key)
{
if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) {
if sample_to_store_timestamp < *stored_timestamp {
tracing::debug!(
"Skipping Sample for < {:?} >, a Value with a more recent \
Expand Down Expand Up @@ -407,11 +406,9 @@ impl StorageService {
// track of the Latest value, the timestamp is indeed more recent (it was
// checked before being processed): we update our internal structure.
if self.capability.history == History::Latest {
self.latest_updates
.lock()
.await
.insert(stripped_key, sample_to_store_timestamp);
latest_updates_guard.insert(stripped_key, sample_to_store_timestamp);
}
drop(latest_updates_guard);

if let Some(replication) = &self.replication {
if let Err(e) = replication
Expand Down

0 comments on commit 2bc10a2

Please sign in to comment.