From bb28fa7f8156b3ba2f558b4fc54a6865addcaab2 Mon Sep 17 00:00:00 2001 From: Julien Loudet Date: Fri, 13 Sep 2024 09:51:43 +0200 Subject: [PATCH] refactor(storage-manager): implement cache for History::Latest The changes introduced in #1367 were actually redundant (and incomplete): the method `is_latest` was checking that a received Sample was more recent than what is currently stored. This commit removes the redundant checks and implements a correct caching strategy by leveraging both approaches: the method `is_latest` will check if there is an entry in the cache and, if not, it will query the Storage for the latest value. In addition, the cache is now filled at initialisation. This approach has one main advantage: the Storage will only be queried for missing key expressions (because of garbage collection for old keys), hence reducing its load. * plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs: * plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs Signed-off-by: Julien Loudet --- .../src/storages_mgt/mod.rs | 20 +- .../src/storages_mgt/service.rs | 253 +++++++++--------- 2 files changed, 150 insertions(+), 123 deletions(-) diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 2a72321813..8f2b184af9 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use flume::Sender; use tokio::sync::Mutex; -use zenoh::{session::Session, Result as ZResult}; +use zenoh::{internal::bail, session::Session, Result as ZResult}; use zenoh_backend_traits::{config::StorageConfig, VolumeInstance}; mod service; @@ -48,9 +48,25 @@ pub(crate) async fn create_and_start_storage( let (tx, rx) = flume::bounded(1); + let latest_updates = match storage.get_all_entries().await { + Ok(entries) => entries.into_iter().collect(), + Err(e) => { + bail!("Failed to retrieve entries from Storage < {storage_name} >: {e:?}"); + } + }; + let storage = Arc::new(Mutex::new(storage)); tokio::task::spawn(async move { - StorageService::start(zenoh_session, config, &name, storage, capability, rx).await; + StorageService::start( + zenoh_session, + config, + &name, + storage, + capability, + rx, + Arc::new(Mutex::new(latest_updates)), + ) + .await; }); Ok(tx) diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index ac52891d9e..6814c8707b 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -22,7 +22,7 @@ use std::{ use async_trait::async_trait; use flume::Receiver; use futures::select; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, MutexGuard, RwLock}; use zenoh::{ internal::{ buffers::{SplitBuffer, ZBuf}, @@ -75,9 +75,8 @@ impl StorageService { storage: Arc>>, capability: Capability, rx: Receiver, + latest_updates: Arc, Timestamp>>>, ) { - // @TODO: optimization: if read_cost is high for the storage, initialize a cache for the - // latest value let mut storage_service = StorageService { session, key_expr: config.key_expr, @@ -88,7 +87,7 @@ impl StorageService { capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), - latest_updates: Arc::new(Mutex::new(HashMap::new())), + latest_updates, }; if storage_service .capability @@ -236,57 +235,56 @@ impl StorageService { ); for k in matching_keys { - if !self.is_deleted(&k.clone(), sample_timestamp).await - && (self.capability.history.eq(&History::All) - || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, sample_timestamp).await)) - { - tracing::trace!( - "Sample `{:?}` identified as needed processing for key {}", - sample, - k - ); - // there might be the case that the actual update was outdated due to a wild card - // update, but not stored yet in the storage. get the relevant wild - // card entry and use that value and timestamp to update the storage - let sample_to_store: Sample = - if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await { - match update.kind { - SampleKind::Put => { - SampleBuilder::put(k.clone(), update.data.value.payload().clone()) - .encoding(update.data.value.encoding().clone()) - .timestamp(update.data.timestamp) - .into() - } - SampleKind::Delete => SampleBuilder::delete(k.clone()) + if self.is_deleted(&k, sample_timestamp).await { + tracing::trace!("Skipping Sample < {} > deleted later on", k); + continue; + } + + tracing::trace!( + "Sample `{:?}` identified as needed processing for key {}", + sample, + k + ); + + // there might be the case that the actual update was outdated due to a wild card + // update, but not stored yet in the storage. get the relevant wild + // card entry and use that value and timestamp to update the storage + let sample_to_store: Sample = + if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await { + match update.kind { + SampleKind::Put => { + SampleBuilder::put(k.clone(), update.data.value.payload().clone()) + .encoding(update.data.value.encoding().clone()) .timestamp(update.data.timestamp) - .into(), + .into() } - } else { - SampleBuilder::from(sample.clone()) - .keyexpr(k.clone()) - .into() - }; - - // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample - // generated should have a Timestamp and, in theory, this check is - // unneeded. - let sample_to_store_timestamp = match sample_to_store.timestamp() { - Some(timestamp) => *timestamp, - None => { - tracing::error!( - "Discarding `Sample` generated through `SampleBuilder` that has no \ - Timestamp: {:?}", - sample_to_store - ); - continue; + SampleKind::Delete => SampleBuilder::delete(k.clone()) + .timestamp(update.data.timestamp) + .into(), } + } else { + SampleBuilder::from(sample.clone()) + .keyexpr(k.clone()) + .into() }; - let stripped_key = match crate::strip_prefix( - self.strip_prefix.as_ref(), - sample_to_store.key_expr(), - ) { + // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample + // generated should have a Timestamp and, in theory, this check is + // unneeded. + let sample_to_store_timestamp = match sample_to_store.timestamp() { + Some(timestamp) => *timestamp, + None => { + tracing::error!( + "Discarding `Sample` generated through `SampleBuilder` that has no \ + Timestamp: {:?}", + sample_to_store + ); + continue; + } + }; + + let stripped_key = + match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { tracing::error!("{}", e); @@ -294,69 +292,59 @@ impl StorageService { } }; - // If the Storage was declared as only keeping the Latest value, we ensure that, for - // each received Sample, it is indeed the Latest value that is processed. - let mut latest_updates_guard = self.latest_updates.lock().await; - if self.capability.history == History::Latest { - if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) { - if sample_to_store_timestamp < *stored_timestamp { - tracing::debug!( - "Skipping Sample for < {:?} >, a Value with a more recent \ - Timestamp is stored: (received) {} vs (stored) {}", - stripped_key, - sample_to_store_timestamp, - stored_timestamp - ); - continue; - } - } - } + // If the Storage was declared as only keeping the Latest value, we ensure that, for + // each received Sample, it is indeed the Latest value that is processed. + let mut latest_updates_guard = self.latest_updates.lock().await; + if self.capability.history == History::Latest + && !self + .is_latest( + &latest_updates_guard, + &stripped_key, + &sample_to_store_timestamp, + ) + .await + { + tracing::trace!("Skipping outdated Sample < {} >", k); + continue; + } - let mut storage = self.storage.lock().await; - let storage_result = match sample.kind() { - SampleKind::Put => { - storage - .put( - stripped_key.clone(), - Value::new( - sample_to_store.payload().clone(), - sample_to_store.encoding().clone(), - ), - sample_to_store_timestamp, - ) - .await - } - SampleKind::Delete => { - // register a tombstone - self.mark_tombstone(&k, sample_to_store_timestamp).await; - storage - .delete(stripped_key.clone(), sample_to_store_timestamp) - .await - } - }; + let mut storage = self.storage.lock().await; + let storage_result = match sample.kind() { + SampleKind::Put => { + storage + .put( + stripped_key.clone(), + Value::new( + sample_to_store.payload().clone(), + sample_to_store.encoding().clone(), + ), + sample_to_store_timestamp, + ) + .await + } + SampleKind::Delete => { + // register a tombstone + self.mark_tombstone(&k, sample_to_store_timestamp).await; + storage + .delete(stripped_key.clone(), sample_to_store_timestamp) + .await + } + }; - drop(storage); + drop(storage); - match storage_result { - Ok(StorageInsertionResult::Outdated) => { - tracing::trace!( - "Ignoring `Outdated` sample < {} >", - sample_to_store.key_expr() - ); - } - Ok(_) => { - if self.capability.history == History::Latest { - latest_updates_guard.insert(stripped_key, sample_to_store_timestamp); - } - } - Err(e) => { - tracing::error!( - "`{}` on < {} > failed with: {e:?}", - sample.kind(), - sample_to_store.key_expr() - ); + match storage_result { + Ok(StorageInsertionResult::Outdated) => { + tracing::trace!("Ignoring `Outdated` sample < {} >", k); + } + Ok(_) => { + if self.capability.history == History::Latest { + latest_updates_guard.insert(stripped_key, sample_to_store_timestamp); } } + Err(e) => { + tracing::error!("`{}` on < {} > failed with: {e:?}", sample.kind(), k); + } } } } @@ -464,23 +452,46 @@ impl StorageService { update } - async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool { - // @TODO: if cache exists, read from there + /// Returns if the provided [Timestamp] is more recent than the [Timestamp] kept in the Storage + /// for that, stripped, key expression. + /// + /// This method will first look up any cached value and if none is found, it will request the + /// Storage. + /// + /// # Deadlock + /// + /// As shown by its parameters, this method takes a locked `latest_updates` structure. It can + /// also take the lock of the Storage if the provided key expression is not present in + /// `latest_updates`. + /// + /// This will not create deadlocks as the `latest_updates` structure is locked only when + /// processing a publication, i.e. when performing exactly the same chain of operations + /// (starting by calling `is_latest`). + /// + /// We lock the Storage as late as possible to minimise contention with other operations. + async fn is_latest( + &self, + latest_updates: &MutexGuard<'_, HashMap, Timestamp>>, + stripped_key: &Option, + timestamp: &Timestamp, + ) -> bool { + if let Some(latest_timestamp) = latest_updates.get(stripped_key) { + return timestamp > latest_timestamp; + } + let mut storage = self.storage.lock().await; - let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - return false; - } + let Ok(stored_data) = storage.get(stripped_key.clone(), "").await else { + // FIXME: An actual error from the underlying Storage cannot be distinguished from a + // missing entry. + return true; }; - if let Ok(stored_data) = storage.get(stripped_key, "").await { - for entry in stored_data { - if entry.timestamp > *timestamp { - return false; - } + + for data in stored_data { + if data.timestamp > *timestamp { + return false; } } + true }