diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs index 2a72321813..8f2b184af9 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use flume::Sender; use tokio::sync::Mutex; -use zenoh::{session::Session, Result as ZResult}; +use zenoh::{internal::bail, session::Session, Result as ZResult}; use zenoh_backend_traits::{config::StorageConfig, VolumeInstance}; mod service; @@ -48,9 +48,25 @@ pub(crate) async fn create_and_start_storage( let (tx, rx) = flume::bounded(1); + let latest_updates = match storage.get_all_entries().await { + Ok(entries) => entries.into_iter().collect(), + Err(e) => { + bail!("Failed to retrieve entries from Storage < {storage_name} >: {e:?}"); + } + }; + let storage = Arc::new(Mutex::new(storage)); tokio::task::spawn(async move { - StorageService::start(zenoh_session, config, &name, storage, capability, rx).await; + StorageService::start( + zenoh_session, + config, + &name, + storage, + capability, + rx, + Arc::new(Mutex::new(latest_updates)), + ) + .await; }); Ok(tx) diff --git a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs index ac52891d9e..6814c8707b 100644 --- a/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs +++ b/plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs @@ -22,7 +22,7 @@ use std::{ use async_trait::async_trait; use flume::Receiver; use futures::select; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::{Mutex, MutexGuard, RwLock}; use zenoh::{ internal::{ buffers::{SplitBuffer, ZBuf}, @@ -75,9 +75,8 @@ impl StorageService { storage: Arc>>, capability: Capability, rx: Receiver, + latest_updates: Arc, Timestamp>>>, ) { - // @TODO: optimization: if read_cost is high for the storage, initialize a cache for the - // latest value let mut storage_service = StorageService { session, key_expr: config.key_expr, @@ -88,7 +87,7 @@ impl StorageService { capability, tombstones: Arc::new(RwLock::new(KeBoxTree::default())), wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())), - latest_updates: Arc::new(Mutex::new(HashMap::new())), + latest_updates, }; if storage_service .capability @@ -236,57 +235,56 @@ impl StorageService { ); for k in matching_keys { - if !self.is_deleted(&k.clone(), sample_timestamp).await - && (self.capability.history.eq(&History::All) - || (self.capability.history.eq(&History::Latest) - && self.is_latest(&k, sample_timestamp).await)) - { - tracing::trace!( - "Sample `{:?}` identified as needed processing for key {}", - sample, - k - ); - // there might be the case that the actual update was outdated due to a wild card - // update, but not stored yet in the storage. get the relevant wild - // card entry and use that value and timestamp to update the storage - let sample_to_store: Sample = - if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await { - match update.kind { - SampleKind::Put => { - SampleBuilder::put(k.clone(), update.data.value.payload().clone()) - .encoding(update.data.value.encoding().clone()) - .timestamp(update.data.timestamp) - .into() - } - SampleKind::Delete => SampleBuilder::delete(k.clone()) + if self.is_deleted(&k, sample_timestamp).await { + tracing::trace!("Skipping Sample < {} > deleted later on", k); + continue; + } + + tracing::trace!( + "Sample `{:?}` identified as needed processing for key {}", + sample, + k + ); + + // there might be the case that the actual update was outdated due to a wild card + // update, but not stored yet in the storage. get the relevant wild + // card entry and use that value and timestamp to update the storage + let sample_to_store: Sample = + if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await { + match update.kind { + SampleKind::Put => { + SampleBuilder::put(k.clone(), update.data.value.payload().clone()) + .encoding(update.data.value.encoding().clone()) .timestamp(update.data.timestamp) - .into(), + .into() } - } else { - SampleBuilder::from(sample.clone()) - .keyexpr(k.clone()) - .into() - }; - - // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample - // generated should have a Timestamp and, in theory, this check is - // unneeded. - let sample_to_store_timestamp = match sample_to_store.timestamp() { - Some(timestamp) => *timestamp, - None => { - tracing::error!( - "Discarding `Sample` generated through `SampleBuilder` that has no \ - Timestamp: {:?}", - sample_to_store - ); - continue; + SampleKind::Delete => SampleBuilder::delete(k.clone()) + .timestamp(update.data.timestamp) + .into(), } + } else { + SampleBuilder::from(sample.clone()) + .keyexpr(k.clone()) + .into() }; - let stripped_key = match crate::strip_prefix( - self.strip_prefix.as_ref(), - sample_to_store.key_expr(), - ) { + // A Sample that is to be stored **must** have a Timestamp. In theory, the Sample + // generated should have a Timestamp and, in theory, this check is + // unneeded. + let sample_to_store_timestamp = match sample_to_store.timestamp() { + Some(timestamp) => *timestamp, + None => { + tracing::error!( + "Discarding `Sample` generated through `SampleBuilder` that has no \ + Timestamp: {:?}", + sample_to_store + ); + continue; + } + }; + + let stripped_key = + match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) { Ok(stripped) => stripped, Err(e) => { tracing::error!("{}", e); @@ -294,69 +292,59 @@ impl StorageService { } }; - // If the Storage was declared as only keeping the Latest value, we ensure that, for - // each received Sample, it is indeed the Latest value that is processed. - let mut latest_updates_guard = self.latest_updates.lock().await; - if self.capability.history == History::Latest { - if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) { - if sample_to_store_timestamp < *stored_timestamp { - tracing::debug!( - "Skipping Sample for < {:?} >, a Value with a more recent \ - Timestamp is stored: (received) {} vs (stored) {}", - stripped_key, - sample_to_store_timestamp, - stored_timestamp - ); - continue; - } - } - } + // If the Storage was declared as only keeping the Latest value, we ensure that, for + // each received Sample, it is indeed the Latest value that is processed. + let mut latest_updates_guard = self.latest_updates.lock().await; + if self.capability.history == History::Latest + && !self + .is_latest( + &latest_updates_guard, + &stripped_key, + &sample_to_store_timestamp, + ) + .await + { + tracing::trace!("Skipping outdated Sample < {} >", k); + continue; + } - let mut storage = self.storage.lock().await; - let storage_result = match sample.kind() { - SampleKind::Put => { - storage - .put( - stripped_key.clone(), - Value::new( - sample_to_store.payload().clone(), - sample_to_store.encoding().clone(), - ), - sample_to_store_timestamp, - ) - .await - } - SampleKind::Delete => { - // register a tombstone - self.mark_tombstone(&k, sample_to_store_timestamp).await; - storage - .delete(stripped_key.clone(), sample_to_store_timestamp) - .await - } - }; + let mut storage = self.storage.lock().await; + let storage_result = match sample.kind() { + SampleKind::Put => { + storage + .put( + stripped_key.clone(), + Value::new( + sample_to_store.payload().clone(), + sample_to_store.encoding().clone(), + ), + sample_to_store_timestamp, + ) + .await + } + SampleKind::Delete => { + // register a tombstone + self.mark_tombstone(&k, sample_to_store_timestamp).await; + storage + .delete(stripped_key.clone(), sample_to_store_timestamp) + .await + } + }; - drop(storage); + drop(storage); - match storage_result { - Ok(StorageInsertionResult::Outdated) => { - tracing::trace!( - "Ignoring `Outdated` sample < {} >", - sample_to_store.key_expr() - ); - } - Ok(_) => { - if self.capability.history == History::Latest { - latest_updates_guard.insert(stripped_key, sample_to_store_timestamp); - } - } - Err(e) => { - tracing::error!( - "`{}` on < {} > failed with: {e:?}", - sample.kind(), - sample_to_store.key_expr() - ); + match storage_result { + Ok(StorageInsertionResult::Outdated) => { + tracing::trace!("Ignoring `Outdated` sample < {} >", k); + } + Ok(_) => { + if self.capability.history == History::Latest { + latest_updates_guard.insert(stripped_key, sample_to_store_timestamp); } } + Err(e) => { + tracing::error!("`{}` on < {} > failed with: {e:?}", sample.kind(), k); + } } } } @@ -464,23 +452,46 @@ impl StorageService { update } - async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool { - // @TODO: if cache exists, read from there + /// Returns if the provided [Timestamp] is more recent than the [Timestamp] kept in the Storage + /// for that, stripped, key expression. + /// + /// This method will first look up any cached value and if none is found, it will request the + /// Storage. + /// + /// # Deadlock + /// + /// As shown by its parameters, this method takes a locked `latest_updates` structure. It can + /// also take the lock of the Storage if the provided key expression is not present in + /// `latest_updates`. + /// + /// This will not create deadlocks as the `latest_updates` structure is locked only when + /// processing a publication, i.e. when performing exactly the same chain of operations + /// (starting by calling `is_latest`). + /// + /// We lock the Storage as late as possible to minimise contention with other operations. + async fn is_latest( + &self, + latest_updates: &MutexGuard<'_, HashMap, Timestamp>>, + stripped_key: &Option, + timestamp: &Timestamp, + ) -> bool { + if let Some(latest_timestamp) = latest_updates.get(stripped_key) { + return timestamp > latest_timestamp; + } + let mut storage = self.storage.lock().await; - let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) { - Ok(stripped) => stripped, - Err(e) => { - tracing::error!("{}", e); - return false; - } + let Ok(stored_data) = storage.get(stripped_key.clone(), "").await else { + // FIXME: An actual error from the underlying Storage cannot be distinguished from a + // missing entry. + return true; }; - if let Ok(stored_data) = storage.get(stripped_key, "").await { - for entry in stored_data { - if entry.timestamp > *timestamp { - return false; - } + + for data in stored_data { + if data.timestamp > *timestamp { + return false; } } + true }