From 9d6a77311a3b27f828f48b6f1bf3241b4d6b6081 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Thu, 5 Sep 2024 23:55:52 +0200 Subject: [PATCH] feat(storage-manager): enforce History::Latest based on Timestamp This commit forces all Storage that have the capability `History::Latest` to only keep the Latest value based on their Timestamp. This is achieved by (i) keeping track of the Timestamp associated to all key expressions (for which a publication was received) and (ii) discarding received publications that have an older Timestamp (compared to what is tracked for the same key expression). The main motivation is the Replication (or Storage Alignment) feature: the algorithm we use rely on the assumption that a Storage with the capability `History::Latest` only keeps the latest publication based on the Timestamp. If that property is not upheld, no guarantee can be made on the Replication algorithm. Hence, to avoid leaving all the responsibilities on Storage developers, it has been decided to enforce that behaviour at the Storage Manager level. This is what this commit does. * plugins/zenoh-plugin-storage-manager/src/replica/storage.rs: - Add a new field `latest_updates` to the `StorageService` structure, which is used to keep track of the latest publication timestamp for all stripped key expression for which the Storage received a publication. - In the `process_sample` function, add a check to ensure that the Sample being processed is more recent than what is currently stored in the Storage. If the Sample is less recent, it is discarded. - If the `process_sample` function, after the Sample was successfully inserted, update the `latest_updates` structure with the more recent Timestamp. Note that the `StorageInsertionResult::Outdated` could still be used as an escape hatch for a Storage to filter Samples on something else than the Timestamp. Signed-off-by: Julien Loudet --- .../src/replica/storage.rs | 64 ++++++++++++++----- 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index d2147b137c..28026840d2 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -71,6 +71,7 @@ pub struct StorageService { capability: Capability, tombstones: Arc>>, wildcard_updates: Arc>>, + latest_updates: Mutex, Timestamp>>, replication: Option, } @@ -94,6 +95,7 @@ impl StorageService { capability: store_intercept.capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), + latest_updates: Mutex::new(HashMap::new()), replication, }; if storage_service @@ -335,7 +337,11 @@ impl StorageService { let sample_to_store_timestamp = match sample_to_store.timestamp() { Some(timestamp) => *timestamp, None => { - tracing::error!("Discarding `Sample` generated through `SampleBuilder` that has no Timestamp: {:?}", sample_to_store); + tracing::error!( + "Discarding `Sample` generated through `SampleBuilder` that has no \ + Timestamp: {:?}", + sample_to_store + ); continue; } }; @@ -350,12 +356,32 @@ impl StorageService { return; } }; + + // If the Storage was declared as only keeping the Latest value, we ensure that, for + // each received Sample, it is indeed the Latest value that is processed. + if self.capability.history == History::Latest { + if let Some(stored_timestamp) = + self.latest_updates.lock().await.get(&stripped_key) + { + if sample_to_store_timestamp < *stored_timestamp { + tracing::debug!( + "Skipping Sample for < {:?} >, a Value with a more recent \ + Timestamp is stored: (received) {} vs (stored) {}", + stripped_key, + sample_to_store_timestamp, + stored_timestamp + ); + continue; + } + } + } + let mut storage = self.storage.lock().await; let result = match sample.kind() { SampleKind::Put => { storage .put( - stripped_key, + stripped_key.clone(), Value::new( sample_to_store.payload().clone(), sample_to_store.encoding().clone(), @@ -368,24 +394,30 @@ impl StorageService { // register a tombstone self.mark_tombstone(&k, sample_to_store_timestamp).await; storage - .delete(stripped_key, sample_to_store_timestamp) + .delete(stripped_key.clone(), sample_to_store_timestamp) .await } }; drop(storage); - if self.replication.is_some() - && result.is_ok() - && !matches!(result.unwrap(), StorageInsertionResult::Outdated) - { - let sending = self - .replication - .as_ref() - .unwrap() - .log_propagation - .send((k.clone(), sample_to_store_timestamp)); - match sending { - Ok(_) => (), - Err(e) => { + + if result.is_ok_and(|insertion_result| { + !matches!(insertion_result, StorageInsertionResult::Outdated) + }) { + // Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps + // track of the Latest value, the timestamp is indeed more recent (it was + // checked before being processed): we update our internal structure. + if self.capability.history == History::Latest { + self.latest_updates + .lock() + .await + .insert(stripped_key, sample_to_store_timestamp); + } + + if let Some(replication) = &self.replication { + if let Err(e) = replication + .log_propagation + .send((k.clone(), sample_to_store_timestamp)) + { tracing::error!("Error in sending the sample to the log: {}", e); } }