Skip to content

Commit

Permalink
refactor(storage-manager): remove redundant is_latest test
Browse files Browse the repository at this point in the history
Commit 87f4791 (#1367) introduced a
cache to keep track of the timestamps of the publication for each key
expression reaching a Storage. The purpose of this cache is to only
allow more recent publication to reach the Storage.

For lack of a better understanding / reading of the code base, this
verification was already performed through the `is_latest`
function. Hence, only one of the two verification should remain.

This commit removes the `is_latest` function in favour of the more
efficient cache: `is_latest` was calling the Storage for every
publication to retrieve the latest value associated to the key
expression.

Note that, in its current state, the cache strategy is not enough: in
the unlikely scenario where no publication was made for the `lifespan`
of a cached entry (i.e. the entry was garbage collected) and a new
publication with an older timestamp is received, then this older
publication will reach the Storage.

* plugins/zenoh-plugin-storage-manager/src/replica/storage.rs: remove
  the `is_latest` function.

Signed-off-by: Julien Loudet <[email protected]>
  • Loading branch information
J-Loudet committed Sep 12, 2024
1 parent a2eaf2e commit 85fd854
Showing 1 changed file with 84 additions and 108 deletions.
192 changes: 84 additions & 108 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,21 +300,20 @@ impl StorageService {
);

for k in matching_keys {
if !self.is_deleted(&k.clone(), sample_timestamp).await
&& (self.capability.history.eq(&History::All)
|| (self.capability.history.eq(&History::Latest)
&& self.is_latest(&k, sample_timestamp).await))
{
tracing::trace!(
"Sample `{:?}` identified as needed processing for key {}",
sample,
k
);
// there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage.
// get the relevant wild card entry and use that value and timestamp to update the storage
let sample_to_store: Sample = if let Some(update) =
self.ovderriding_wild_update(&k, sample_timestamp).await
{
if self.is_deleted(&k.clone(), sample_timestamp).await {
tracing::trace!("Ignoring publication on < {} > that is deleted later on", k);
continue;
}

tracing::trace!(
"Sample `{:?}` identified as needed processing for key {}",
sample,
k
);
// there might be the case that the actual update was outdated due to a wild card update, but not stored yet in the storage.
// get the relevant wild card entry and use that value and timestamp to update the storage
let sample_to_store: Sample =
if let Some(update) = self.ovderriding_wild_update(&k, sample_timestamp).await {
match update.kind {
SampleKind::Put => {
SampleBuilder::put(k.clone(), update.data.value.payload().clone())
Expand All @@ -332,91 +331,88 @@ impl StorageService {
.into()
};

// A Sample that is to be stored **must** have a Timestamp. In theory, the Sample generated should have
// a Timestamp and, in theory, this check is unneeded.
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
}
};
// A Sample that is to be stored **must** have a Timestamp. In theory, the Sample generated should have
// a Timestamp and, in theory, this check is unneeded.
let sample_to_store_timestamp = match sample_to_store.timestamp() {
Some(timestamp) => *timestamp,
None => {
tracing::error!(
"Discarding `Sample` generated through `SampleBuilder` that has no \
Timestamp: {:?}",
sample_to_store
);
continue;
}
};

let stripped_key = match crate::strip_prefix(
self.strip_prefix.as_ref(),
sample_to_store.key_expr(),
) {
let stripped_key =
match crate::strip_prefix(self.strip_prefix.as_ref(), sample_to_store.key_expr()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
return;
}
};

// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
let mut latest_updates_guard = self.latest_updates.lock().await;
if self.capability.history == History::Latest {
if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) {
if sample_to_store_timestamp < *stored_timestamp {
tracing::debug!(
"Skipping Sample for < {:?} >, a Value with a more recent \
Timestamp is stored: (received) {} vs (stored) {}",
stripped_key,
sample_to_store_timestamp,
stored_timestamp
);
continue;
}
// If the Storage was declared as only keeping the Latest value, we ensure that, for
// each received Sample, it is indeed the Latest value that is processed.
let mut latest_updates_guard = self.latest_updates.lock().await;
if self.capability.history == History::Latest {
if let Some(stored_timestamp) = latest_updates_guard.get(&stripped_key) {
if sample_to_store_timestamp < *stored_timestamp {
tracing::debug!(
"Skipping Sample for < {:?} >, a Value with a more recent Timestamp \
is stored: (received) {} vs (stored) {}",
stripped_key,
sample_to_store_timestamp,
stored_timestamp
);
continue;
}
}
}

let mut storage = self.storage.lock().await;
let result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key.clone(),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
),
sample_to_store_timestamp,
)
.await
}
SampleKind::Delete => {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};
drop(storage);

if result.is_ok_and(|insertion_result| {
!matches!(insertion_result, StorageInsertionResult::Outdated)
}) {
// Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps
// track of the Latest value, the timestamp is indeed more recent (it was
// checked before being processed): we update our internal structure.
if self.capability.history == History::Latest {
latest_updates_guard.insert(stripped_key, sample_to_store_timestamp);
}
drop(latest_updates_guard);
let mut storage = self.storage.lock().await;
let result = match sample.kind() {
SampleKind::Put => {
storage
.put(
stripped_key.clone(),
Value::new(
sample_to_store.payload().clone(),
sample_to_store.encoding().clone(),
),
sample_to_store_timestamp,
)
.await
}
SampleKind::Delete => {
// register a tombstone
self.mark_tombstone(&k, sample_to_store_timestamp).await;
storage
.delete(stripped_key.clone(), sample_to_store_timestamp)
.await
}
};
drop(storage);

if let Some(replication) = &self.replication {
if let Err(e) = replication
.log_propagation
.send((k.clone(), sample_to_store_timestamp))
{
tracing::error!("Error in sending the sample to the log: {}", e);
}
if result.is_ok_and(|insertion_result| {
!matches!(insertion_result, StorageInsertionResult::Outdated)
}) {
// Insertion was successful (i.e. Ok + not Outdated), the Storage only keeps
// track of the Latest value, the timestamp is indeed more recent (it was
// checked before being processed): we update our internal structure.
if self.capability.history == History::Latest {
latest_updates_guard.insert(stripped_key, sample_to_store_timestamp);
}
drop(latest_updates_guard);

if let Some(replication) = &self.replication {
if let Err(e) = replication
.log_propagation
.send((k.clone(), sample_to_store_timestamp))
{
tracing::error!("Error in sending the sample to the log: {}", e);
}
}
}
Expand Down Expand Up @@ -526,26 +522,6 @@ impl StorageService {
update
}

async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool {
// @TODO: if cache exists, read from there
let mut storage = self.storage.lock().await;
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
return false;
}
};
if let Ok(stored_data) = storage.get(stripped_key, "").await {
for entry in stored_data {
if entry.timestamp > *timestamp {
return false;
}
}
}
true
}

async fn reply_query(&self, query: Result<zenoh::query::Query, flume::RecvError>) {
let q = match query {
Ok(q) => q,
Expand Down

0 comments on commit 85fd854

Please sign in to comment.