Skip to content

Commit

Permalink
refactor(storage-manager): implement cache for History::Latest
Browse files Browse the repository at this point in the history
The changes introduced in #1367 were actually redundant (and
incomplete): the method `is_latest` was checking that a received Sample
was more recent than what is currently stored.

This commit removes the redundant checks and implements a correct
caching strategy by leveraging both approaches: the method `is_latest`
will check if there is an entry in the cache and, if not, it will query
the Storage for the latest value.

In addition, the cache is now filled at initialisation.

This approach has one main advantage: the Storage will only be queried
for missing key expressions (because of garbage collection for old
keys), hence reducing its load.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs:
* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 16, 2024
1 parent a22f1f1 commit bb28fa7
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 123 deletions.
20 changes: 18 additions & 2 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use flume::Sender;
use tokio::sync::Mutex;
use zenoh::{session::Session, Result as ZResult};
use zenoh::{internal::bail, session::Session, Result as ZResult};
use zenoh_backend_traits::{config::StorageConfig, VolumeInstance};

mod service;
Expand Down Expand Up @@ -48,9 +48,25 @@ pub(crate) async fn create_and_start_storage(

let (tx, rx) = flume::bounded(1);

let latest_updates = match storage.get_all_entries().await {
Ok(entries) => entries.into_iter().collect(),
Err(e) => {
bail!("Failed to retrieve entries from Storage < {storage_name} >: {e:?}");
}
};

let storage = Arc::new(Mutex::new(storage));
tokio::task::spawn(async move {
StorageService::start(zenoh_session, config, &name, storage, capability, rx).await;
StorageService::start(
zenoh_session,
config,
&name,
storage,
capability,
rx,
Arc::new(Mutex::new(latest_updates)),
)
.await;
});

Ok(tx)
Expand Down
253 changes: 132 additions & 121 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::{
use async_trait::async_trait;
use flume::Receiver;
use futures::select;
use tokio::sync::{Mutex, RwLock};
use tokio::sync::{Mutex, MutexGuard, RwLock};
use zenoh::{
internal::{
buffers::{SplitBuffer, ZBuf},
Expand Down Expand Up @@ -75,9 +75,8 @@ impl StorageService {
storage: Arc<Mutex<Box<dyn zenoh_backend_traits::Storage>>>,
capability: Capability,
rx: Receiver<StorageMessage>,
latest_updates: Arc<Mutex<HashMap<Option<OwnedKeyExpr>, Timestamp>>>,
) {
// @TODO: optimization: if read_cost is high for the storage, initialize a cache for the
// latest value
let mut storage_service = StorageService {
session,
key_expr: config.key_expr,
Expand All @@ -88,7 +87,7 @@ impl StorageService {
capability,
tombstones: Arc::new(RwLock::new(KeBoxTree::default())),
wildcard_updates: Arc::new(RwLock::new(KeBoxTree::default())),
latest_updates: Arc::new(Mutex::new(HashMap::new())),
latest_updates,
};
if storage_service
.capability
Expand Down Expand Up @@ -236,127 +235,116 @@ impl StorageService {
);

for k in matching_keys {
if !self.is_deleted(&k.clone(), sample_timestamp).await
&& (self.capability.history.eq(&History::All)
|| (self.capability.history.eq(&History::Latest)
&& self.is_latest(&k, sample_timestamp).await))
{
tracing::trace!(
"Sample `{:?}` identified as needed processing for key {}",
sample,
k
);
// there might be the case that the actual update was outdated due to a wild card
// update, but not stored yet in the storage. get the relevant wild
// card entry and use that value and timestamp to update the storage
let sample_to_store: Sample =
if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await {
match update.kind {
SampleKind::Put => {
SampleBuilder::put(k.clone(), update.data.value.payload().clone())
.encoding(update.data.value.encoding().clone())
.timestamp(update.data.timestamp)
.into()
}
SampleKind::Delete => SampleBuilder::delete(k.clone())
if self.is_deleted(&k, sample_timestamp).await {
tracing::trace!("Skipping Sample < {} > deleted later on", k);
continue;
}

tracing::trace!(
"Sample `{:?}` identified as needed processing for key {}",
sample,
k
);

// there might be the case that the actual update was outdated due to a wild card
// update, but not stored yet in the storage. get the relevant wild
// card entry and use that value and timestamp to update the storage
let sample_to_store: Sample =
if let Some(update) = self.overriding_wild_update(&k, sample_timestamp).await {
match update.kind {
SampleKind::Put => {
SampleBuilder::put(k.clone(), update.data.value.payload().clone())
.encoding(update.data.value.encoding().clone())
.timestamp(update.data.timestamp)
.into(),
.into()
}
} else {
SampleBuilder::from(sample.clone())
.keyexpr(k.clone())
.into()
};

// A Sample that is to be stored **must** have a Timestamp. In theory, the Sample
// generated should have a Timestamp and, in theory, this check is
// unneeded.
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
SampleKind::Delete => SampleBuilder::delete(k.clone())
.timestamp(update.data.timestamp)
.into(),
}
} else {
SampleBuilder::from(sample.clone())
.keyexpr(k.clone())
.into()
};

let stripped_key = match crate::strip_prefix(
self.strip_prefix.as_ref(),
sample_to_store.key_expr(),
) {
// A Sample that is to be stored **must** have a Timestamp. In theory, the Sample
// generated should have a Timestamp and, in theory, this check is
// unneeded.
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
}
};

let stripped_key =
match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
return;
}
};

// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
let mut latest_updates_guard = self.latest_updates.lock().await;
if self.capability.history == History::Latest {
if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) {
if sample_to_store_timestamp < *stored_timestamp {
tracing::debug!(
"Skipping Sample for < {:?} >, a Value with a more recent \
Timestamp is stored: (received) {} vs (stored) {}",
stripped_key,
sample_to_store_timestamp,
stored_timestamp
);
continue;
}
}
}
// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
let mut latest_updates_guard = self.latest_updates.lock().await;
if self.capability.history == History::Latest
&& !self
.is_latest(
&latest_updates_guard,
&stripped_key,
&sample_to_store_timestamp,
)
.await
{
tracing::trace!("Skipping outdated Sample < {} >", k);
continue;
}

let mut storage = self.storage.lock().await;
let storage_result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key.clone(),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
),
sample_to_store_timestamp,
)
.await
}
SampleKind::Delete => {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};
let mut storage = self.storage.lock().await;
let storage_result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key.clone(),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
),
sample_to_store_timestamp,
)
.await
}
SampleKind::Delete => {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};

drop(storage);
drop(storage);

match storage_result {
Ok(StorageInsertionResult::Outdated) => {
tracing::trace!(
"Ignoring `Outdated` sample < {} >",
sample_to_store.key_expr()
);
}
Ok(_) => {
if self.capability.history == History::Latest {
latest_updates_guard.insert(stripped_key, sample_to_store_timestamp);
}
}
Err(e) => {
tracing::error!(
"`{}` on < {} > failed with: {e:?}",
sample.kind(),
sample_to_store.key_expr()
);
match storage_result {
Ok(StorageInsertionResult::Outdated) => {
tracing::trace!("Ignoring `Outdated` sample < {} >", k);
}
Ok(_) => {
if self.capability.history == History::Latest {
latest_updates_guard.insert(stripped_key, sample_to_store_timestamp);
}
}
Err(e) => {
tracing::error!("`{}` on < {} > failed with: {e:?}", sample.kind(), k);
}
}
}
}
Expand Down Expand Up @@ -464,23 +452,46 @@ impl StorageService {
update
}

async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool {
// @TODO: if cache exists, read from there
/// Returns if the provided [Timestamp] is more recent than the [Timestamp] kept in the Storage
/// for that, stripped, key expression.
///
/// This method will first look up any cached value and if none is found, it will request the
/// Storage.
///
/// # Deadlock
///
/// As shown by its parameters, this method takes a locked `latest_updates` structure. It can
/// also take the lock of the Storage if the provided key expression is not present in
/// `latest_updates`.
///
/// This will not create deadlocks as the `latest_updates` structure is locked only when
/// processing a publication, i.e. when performing exactly the same chain of operations
/// (starting by calling `is_latest`).
///
/// We lock the Storage as late as possible to minimise contention with other operations.
async fn is_latest(
&self,
latest_updates: &MutexGuard<'_, HashMap<Option<OwnedKeyExpr>, Timestamp>>,
stripped_key: &Option<OwnedKeyExpr>,
timestamp: &Timestamp,
) -> bool {
if let Some(latest_timestamp) = latest_updates.get(stripped_key) {
return timestamp > latest_timestamp;
}

let mut storage = self.storage.lock().await;
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
return false;
}
let Ok(stored_data) = storage.get(stripped_key.clone(), "").await else {
// FIXME: An actual error from the underlying Storage cannot be distinguished from a
// missing entry.
return true;
};
if let Ok(stored_data) = storage.get(stripped_key, "").await {
for entry in stored_data {
if entry.timestamp > *timestamp {
return false;
}

for data in stored_data {
if data.timestamp > *timestamp {
return false;
}
}

true
}

Expand Down

0 comments on commit bb28fa7

Please sign in to comment.