From 87f47919c24450bd802117774e6d662de158d664 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Thu, 5 Sep 2024 23:55:52 +0200 Subject: [PATCH] feat(storage-manager): enforce History::Latest based on Timestamp This commit forces all Storage that have the capability `History::Latest` to only keep the Latest value based on their Timestamp. This is achieved by (i) keeping track of the Timestamp associated to all key expressions (for which a publication was received) and (ii) discarding received publications that have an older Timestamp (compared to what is tracked for the same key expression). The main motivation is the Replication (or Storage Alignment) feature: the algorithm we use rely on the assumption that a Storage with the capability `History::Latest` only keeps the latest publication based on the Timestamp. If that property is not upheld, no guarantee can be made on the Replication algorithm. Hence, to avoid leaving this responsibility on Storage developers, it has been decided to enforce that behaviour at the Storage Manager level. This is what this commit does. Additionally, to avoid having the `latest_updates` structure grow infinitely, it was included in the `GarbageCollectionEvent` structure to be garbage collected at regular interval. This garbage collection is controlled by the `GarbageCollectionConfig` which defaults to removing entries that are more than 24h old. * plugins/zenoh-plugin-storage-manager/src/replica/storage.rs: - Add a new field `latest_updates` to the `StorageService` structure, which is used to keep track of the latest publication timestamp for all stripped key expression for which the Storage received a publication. - In the `process_sample` function, add a check to ensure that the Sample being processed is more recent than what is currently stored in the Storage. If the Sample is less recent, it is discarded. - If the `process_sample` function, after the Sample was successfully inserted, update the `latest_updates` structure with the more recent Timestamp. Note that the `StorageInsertionResult::Outdated` could still be used as an escape hatch for a Storage to filter Samples on something else than the Timestamp. - In the `GarbageCollectionEvent`, add the `last_updates` structure such that it is garbage collected. - Updated the `run` method of the `GarbageCollectionEvent` to retain only entries that are less than `GarbageCollectionConfig::lifespan` old. Signed-off-by: Julien Loudet --- .../src/replica/storage.rs | 71 ++++++++++++++----- 1 file changed, 55 insertions(+), 16 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs index d2147b137c..297188dc60 100644 --- a/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs +++ b/plugins/zenoh-plugin-storage-manager/src/replica/storage.rs @@ -71,6 +71,7 @@ pub struct StorageService { capability: Capability, tombstones: Arc>>, wildcard_updates: Arc>>, + latest_updates: Arc, Timestamp>>>, replication: Option, } @@ -94,6 +95,7 @@ impl StorageService { capability: store_intercept.capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), + latest_updates: Arc::new(Mutex::new(HashMap::new())), replication, }; if storage_service @@ -143,6 +145,7 @@ impl StorageService { config: gc_config, tombstones: self.tombstones.clone(), wildcard_updates: self.wildcard_updates.clone(), + latest_updates: self.latest_updates.clone(), }, ); t.add_async(gc).await; @@ -335,7 +338,11 @@ impl StorageService { let sample_to_store_timestamp = match sample_to_store.timestamp() { Some(timestamp) => *timestamp, None => { - tracing::error!("Discarding `Sample` generated through `SampleBuilder` that has no Timestamp: {:?}", sample_to_store); + tracing::error!( + "Discarding `Sample` generated through `SampleBuilder` that has no \ + Timestamp: {:?}", + sample_to_store + ); continue; } }; @@ -350,12 +357,32 @@ impl StorageService { return; } }; + + // If the Storage was declared as only keeping the Latest value, we ensure that, for + // each received Sample, it is indeed the Latest value that is processed. + if self.capability.history == History::Latest { + if let Some(stored_timestamp) = + self.latest_updates.lock().await.get(&stripped_key) + { + if sample_to_store_timestamp < *stored_timestamp { + tracing::debug!( + "Skipping Sample for < {:?} >, a Value with a more recent \ + Timestamp is stored: (received) {} vs (stored) {}", + stripped_key, + sample_to_store_timestamp, + stored_timestamp + ); + continue; + } + } + } + let mut storage = self.storage.lock().await; let result = match sample.kind() { SampleKind::Put => { storage .put( - stripped_key, + stripped_key.clone(), Value::new( sample_to_store.payload().clone(), sample_to_store.encoding().clone(), @@ -368,24 +395,30 @@ impl StorageService { // register a tombstone self.mark_tombstone(&k, sample_to_store_timestamp).await; storage - .delete(stripped_key, sample_to_store_timestamp) + .delete(stripped_key.clone(), sample_to_store_timestamp) .await } }; drop(storage); - if self.replication.is_some() - && result.is_ok() - && !matches!(result.unwrap(), StorageInsertionResult::Outdated) - { - let sending = self - .replication - .as_ref() - .unwrap() - .log_propagation - .send((k.clone(), sample_to_store_timestamp)); - match sending { - Ok(_) => (), - Err(e) => { + + if result.is_ok_and(|insertion_result| { + !matches!(insertion_result, StorageInsertionResult::Outdated) + }) { + // Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps + // track of the Latest value, the timestamp is indeed more recent (it was + // checked before being processed): we update our internal structure. + if self.capability.history == History::Latest { + self.latest_updates + .lock() + .await + .insert(stripped_key, sample_to_store_timestamp); + } + + if let Some(replication) = &self.replication { + if let Err(e) = replication + .log_propagation + .send((k.clone(), sample_to_store_timestamp)) + { tracing::error!("Error in sending the sample to the log: {}", e); } } @@ -696,6 +729,7 @@ struct GarbageCollectionEvent { config: GarbageCollectionConfig, tombstones: Arc>>, wildcard_updates: Arc>>, + latest_updates: Arc, Timestamp>>>, } #[async_trait] @@ -732,6 +766,11 @@ impl Timed for GarbageCollectionEvent { wildcard_updates.remove(&k); } + self.latest_updates + .lock() + .await + .retain(|_, timestamp| timestamp.get_time() < &time_limit); + tracing::trace!("End garbage collection of obsolete data-infos"); } }