diff --git a/plugins/zenoh-backend-example/src/lib.rs b/plugins/zenoh-backend-example/src/lib.rs index 4899bed6b0..7f8bcc214f 100644 --- a/plugins/zenoh-backend-example/src/lib.rs +++ b/plugins/zenoh-backend-example/src/lib.rs @@ -62,7 +62,6 @@ impl Volume for ExampleBackend { Capability { persistence: Persistence::Volatile, history: History::Latest, - read_cost: 0, } } async fn create_storage(&self, _props: StorageConfig) -> ZResult> { diff --git a/plugins/zenoh-backend-traits/src/lib.rs b/plugins/zenoh-backend-traits/src/lib.rs index cd2238dc6a..3f2be96030 100644 --- a/plugins/zenoh-backend-traits/src/lib.rs +++ b/plugins/zenoh-backend-traits/src/lib.rs @@ -53,12 +53,11 @@ //! } //! //! fn get_capability(&self) -> Capability { -//! // This operation is used to confirm if the volume indeed supports +//! // This operation is used to confirm if the volume indeed supports //! // the capabilities requested by the configuration //! Capability{ //! persistence: Persistence::Volatile, //! history: History::Latest, -//! read_cost: 0, //! } //! } //! @@ -145,10 +144,6 @@ const FEATURES: &str = pub struct Capability { pub persistence: Persistence, pub history: History, - /// `read_cost` is a parameter that hels the storage manager take a decision on optimizing database roundtrips - /// If the `read_cost` is higher than a given threshold, the storage manger will maintain a cache with the keys present in the database - /// This is a placeholder, not actually utilised in the current implementation - pub read_cost: u32, } /// Persistence is the guarantee expected from a storage in case of failures @@ -185,7 +180,6 @@ pub struct StoredData { } /// Trait to be implemented by a Backend. -/// #[async_trait] pub trait Volume: Send + Sync { /// Returns the status that will be sent as a reply to a query diff --git a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs index b056cf7faf..cb6fe9be02 100644 --- a/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/memory_backend/mod.rs @@ -53,7 +53,6 @@ impl Volume for MemoryBackend { Capability { persistence: Persistence::Volatile, history: History::Latest, - read_cost: 0, } } diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 2a72321813..8f2b184af9 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use flume::Sender; use tokio::sync::Mutex; -use zenoh::{session::Session, Result as ZResult}; +use zenoh::{internal::bail, session::Session, Result as ZResult}; use zenoh_backend_traits::{config::StorageConfig, VolumeInstance}; mod service; @@ -48,9 +48,25 @@ pub(crate) async fn create_and_start_storage( let (tx, rx) = flume::bounded(1); + let latest_updates = match storage.get_all_entries().await { + Ok(entries) => entries.into_iter().collect(), + Err(e) => { + bail!("Failed to retrieve entries from Storage < {storage_name} >: {e:?}"); + } + }; + let storage = Arc::new(Mutex::new(storage)); tokio::task::spawn(async move { - StorageService::start(zenoh_session, config, &name, storage, capability, rx).await; + StorageService::start( + zenoh_session, + config, + &name, + storage, + capability, + rx, + Arc::new(Mutex::new(latest_updates)), + ) + .await; }); Ok(tx) diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index ac52891d9e..a34805fe30 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -22,7 +22,7 @@ use std::{ use async_trait::async_trait; use flume::Receiver; use futures::select; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, MutexGuard, RwLock}; use zenoh::{ internal::{ buffers::{SplitBuffer, ZBuf}, @@ -75,9 +75,8 @@ impl StorageService { storage: Arc>>, capability: Capability, rx: Receiver, + latest_updates: Arc, Timestamp>>>, ) { - // @TODO: optimization: if read_cost is high for the storage, initialize a cache for the - // latest value let mut storage_service = StorageService { session, key_expr: config.key_expr, @@ -88,7 +87,7 @@ impl StorageService { capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), - latest_updates: Arc::new(Mutex::new(HashMap::new())), + latest_updates, }; if storage_service .capability @@ -236,57 +235,55 @@ impl StorageService { ); for k in matching_keys { - if !self.is_deleted(&k.clone(), sample_timestamp).await - && (self.capability.history.eq(&History::All) - || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, sample_timestamp).await)) + if self.is_deleted(&k, sample_timestamp).await { + tracing::trace!("Skipping Sample < {} > deleted later on", k); + continue; + } + + tracing::trace!( + "Sample `{:?}` identified as needed processing for key {}", + sample, + k + ); + + // there might be the case that the actual update was outdated due to a wild card + // update, but not stored yet in the storage. get the relevant wild + // card entry and use that value and timestamp to update the storage + let sample_to_store: Sample = if let Some(update) = + self.overriding_wild_update(&k, sample_timestamp).await { - tracing::trace!( - "Sample `{:?}` identified as needed processing for key {}", - sample, - k - ); - // there might be the case that the actual update was outdated due to a wild card - // update, but not stored yet in the storage. get the relevant wild - // card entry and use that value and timestamp to update the storage - let sample_to_store: Sample = - if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await { - match update.kind { - SampleKind::Put => { - SampleBuilder::put(k.clone(), update.data.value.payload().clone()) - .encoding(update.data.value.encoding().clone()) - .timestamp(update.data.timestamp) - .into() - } - SampleKind::Delete => SampleBuilder::delete(k.clone()) - .timestamp(update.data.timestamp) - .into(), - } - } else { - SampleBuilder::from(sample.clone()) - .keyexpr(k.clone()) - .into() - }; + match update.kind { + SampleKind::Put => SampleBuilder::put(k.clone(), update.data.value.payload()) + .encoding(update.data.value.encoding().clone()) + .timestamp(update.data.timestamp) + .into(), + SampleKind::Delete => SampleBuilder::delete(k.clone()) + .timestamp(update.data.timestamp) + .into(), + } + } else { + SampleBuilder::from(sample.clone()) + .keyexpr(k.clone()) + .into() + }; - // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample - // generated should have a Timestamp and, in theory, this check is - // unneeded. - let sample_to_store_timestamp = match sample_to_store.timestamp() { - Some(timestamp) => *timestamp, - None => { - tracing::error!( - "Discarding `Sample` generated through `SampleBuilder` that has no \ - Timestamp: {:?}", - sample_to_store - ); - continue; - } - }; + // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample + // generated should have a Timestamp and, in theory, this check is + // unneeded. + let sample_to_store_timestamp = match sample_to_store.timestamp() { + Some(timestamp) => *timestamp, + None => { + tracing::error!( + "Discarding `Sample` generated through `SampleBuilder` that has no \ + Timestamp: {:?}", + sample_to_store + ); + continue; + } + }; - let stripped_key = match crate::strip_prefix( - self.strip_prefix.as_ref(), - sample_to_store.key_expr(), - ) { + let stripped_key = + match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { tracing::error!("{}", e); @@ -294,69 +291,61 @@ impl StorageService { } }; - // If the Storage was declared as only keeping the Latest value, we ensure that, for - // each received Sample, it is indeed the Latest value that is processed. - let mut latest_updates_guard = self.latest_updates.lock().await; - if self.capability.history == History::Latest { - if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) { - if sample_to_store_timestamp < *stored_timestamp { - tracing::debug!( - "Skipping Sample for < {:?} >, a Value with a more recent \ - Timestamp is stored: (received) {} vs (stored) {}", - stripped_key, - sample_to_store_timestamp, - stored_timestamp - ); - continue; - } + // If the Storage was declared as only keeping the Latest value, we ensure that, for + // each received Sample, it is indeed the Latest value that is processed. + let mut cache_guard = None; + if self.capability.history == History::Latest { + match self + .guard_cache_if_latest(&stripped_key, &sample_to_store_timestamp) + .await + { + Some(guard) => { + cache_guard = Some(guard); + } + None => { + tracing::trace!("Skipping outdated Sample < {} >", k); + continue; } } + } - let mut storage = self.storage.lock().await; - let storage_result = match sample.kind() { - SampleKind::Put => { - storage - .put( - stripped_key.clone(), - Value::new( - sample_to_store.payload().clone(), - sample_to_store.encoding().clone(), - ), - sample_to_store_timestamp, - ) - .await - } - SampleKind::Delete => { - // register a tombstone - self.mark_tombstone(&k, sample_to_store_timestamp).await; - storage - .delete(stripped_key.clone(), sample_to_store_timestamp) - .await - } - }; + let mut storage = self.storage.lock().await; + let storage_result = match sample.kind() { + SampleKind::Put => { + storage + .put( + stripped_key.clone(), + Value::new( + sample_to_store.payload(), + sample_to_store.encoding().clone(), + ), + sample_to_store_timestamp, + ) + .await + } + SampleKind::Delete => { + // register a tombstone + self.mark_tombstone(&k, sample_to_store_timestamp).await; + storage + .delete(stripped_key.clone(), sample_to_store_timestamp) + .await + } + }; - drop(storage); + drop(storage); - match storage_result { - Ok(StorageInsertionResult::Outdated) => { - tracing::trace!( - "Ignoring `Outdated` sample < {} >", - sample_to_store.key_expr() - ); - } - Ok(_) => { - if self.capability.history == History::Latest { - latest_updates_guard.insert(stripped_key, sample_to_store_timestamp); - } - } - Err(e) => { - tracing::error!( - "`{}` on < {} > failed with: {e:?}", - sample.kind(), - sample_to_store.key_expr() - ); + match storage_result { + Ok(StorageInsertionResult::Outdated) => { + tracing::trace!("Ignoring `Outdated` sample < {} >", k); + } + Ok(_) => { + if let Some(mut cache_guard) = cache_guard { + cache_guard.insert(stripped_key, sample_to_store_timestamp); } } + Err(e) => { + tracing::error!("`{}` on < {} > failed with: {e:?}", sample.kind(), k); + } } } } @@ -464,24 +453,45 @@ impl StorageService { update } - async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool { - // @TODO: if cache exists, read from there - let mut storage = self.storage.lock().await; - let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - return false; + /// Returns a guard over the cache if the provided [Timestamp] is more recent than what is kept + /// in the Storage for the `stripped_key`. Otherwise returns `None`. + /// + /// This method will first look up any cached value and if none is found, it will request the + /// Storage. + /// + /// # ⚠️ Race-condition + /// + /// Returning a guard over the cache is not an "innocent" choice: in order to avoid + /// race-condition, the guard over the cache must be kept until the Storage has processed the + /// Sample and the Cache has been updated accordingly. + /// + /// If the lock is released before both operations are performed, the Cache and Storage could + /// end up in an inconsistent state (think two updates being processed at the same time). + async fn guard_cache_if_latest( + &self, + stripped_key: &Option, + timestamp: &Timestamp, + ) -> Option, Timestamp>>> { + let cache_guard = self.latest_updates.lock().await; + if let Some(latest_timestamp) = cache_guard.get(stripped_key) { + if timestamp > latest_timestamp { + return Some(cache_guard); + } else { + return None; } - }; - if let Ok(stored_data) = storage.get(stripped_key, "").await { - for entry in stored_data { - if entry.timestamp > *timestamp { - return false; + } + + let mut storage = self.storage.lock().await; + + if let Ok(stored_data) = storage.get(stripped_key.clone(), "").await { + for data in stored_data { + if data.timestamp > *timestamp { + return None; } } } - true + + Some(cache_guard) } async fn reply_query(&self, query: Result) {