Skip to content

Commit

Permalink
feat(storage-manager): enforce History::Latest based on Timestamp
Browse files Browse the repository at this point in the history
This commit forces all Storage that have the capability
`History::Latest` to only keep the Latest value based on their
Timestamp. This is achieved by (i) keeping track of the Timestamp
associated to all key expressions (for which a publication was received)
and (ii) discarding received publications that have an older
Timestamp (compared to what is tracked for the same key expression).

The main motivation is the Replication (or Storage Alignment) feature:
the algorithm we use rely on the assumption that a Storage with the
capability `History::Latest` only keeps the latest publication based on
the Timestamp. If that property is not upheld, no guarantee can be made
on the Replication algorithm.

Hence, to avoid leaving all the responsibilities on Storage developers,
it has been decided to enforce that behaviour at the Storage Manager
level. This is what this commit does.

* plugins/zenoh-plugin-storage-manager/src/replica/storage.rs:
  - Add a new field `latest_updates` to the `StorageService` structure,
    which is used to keep track of the latest publication timestamp for
    all stripped key expression for which the Storage received a
    publication.
  - In the `process_sample` function, add a check to ensure that the
    Sample being processed is more recent than what is currently stored in
    the Storage. If the Sample is less recent, it is discarded.
  - If the `process_sample` function, after the Sample was successfully
    inserted, update the `latest_updates` structure with the more recent
    Timestamp. Note that the `StorageInsertionResult::Outdated` could
    still be used as an escape hatch for a Storage to filter Samples on
    something else than the Timestamp.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 6, 2024
1 parent d9ce46f commit 9d6a773
Showing 1 changed file with 48 additions and 16 deletions.
64 changes: 48 additions & 16 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ pub struct StorageService {
capability: Capability,
tombstones: Arc<RwLock<KeBoxTree<Timestamp, NonWild, KeyedSetProvider>>>,
wildcard_updates: Arc<RwLock<KeBoxTree<Update, UnknownWildness, KeyedSetProvider>>>,
latest_updates: Mutex<HashMap<Option<OwnedKeyExpr>, Timestamp>>,
replication: Option<ReplicationService>,
}

Expand All @@ -94,6 +95,7 @@ impl StorageService {
capability: store_intercept.capability,
tombstones: Arc::new(RwLock::new(KeBoxTree::default())),
wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())),
latest_updates: Mutex::new(HashMap::new()),
replication,
};
if storage_service
Expand Down Expand Up @@ -335,7 +337,11 @@ impl StorageService {
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!("Discarding `Sample` generated through `SampleBuilder` that has no Timestamp: {:?}", sample_to_store);
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
}
};
Expand All @@ -350,12 +356,32 @@ impl StorageService {
return;
}
};

// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
if self.capability.history == History::Latest {
if let Some(stored_timestamp) =
self.latest_updates.lock().await.get(&stripped_key)
{
if sample_to_store_timestamp < *stored_timestamp {
tracing::debug!(
"Skipping Sample for < {:?} >, a Value with a more recent \
Timestamp is stored: (received) {} vs (stored) {}",
stripped_key,
sample_to_store_timestamp,
stored_timestamp
);
continue;
}
}
}

let mut storage = self.storage.lock().await;
let result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key,
stripped_key.clone(),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
Expand All @@ -368,24 +394,30 @@ impl StorageService {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key, sample_to_store_timestamp)
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};
drop(storage);
if self.replication.is_some()
&& result.is_ok()
&& !matches!(result.unwrap(), StorageInsertionResult::Outdated)
{
let sending = self
.replication
.as_ref()
.unwrap()
.log_propagation
.send((k.clone(), sample_to_store_timestamp));
match sending {
Ok(_) => (),
Err(e) => {

if result.is_ok_and(|insertion_result| {
!matches!(insertion_result, StorageInsertionResult::Outdated)
}) {
// Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps
// track of the Latest value, the timestamp is indeed more recent (it was
// checked before being processed): we update our internal structure.
if self.capability.history == History::Latest {
self.latest_updates
.lock()
.await
.insert(stripped_key, sample_to_store_timestamp);
}

if let Some(replication) = &self.replication {
if let Err(e) = replication
.log_propagation
.send((k.clone(), sample_to_store_timestamp))
{
tracing::error!("Error in sending the sample to the log: {}", e);
}
}
Expand Down

0 comments on commit 9d6a773

Please sign in to comment.