Skip to content

Commit

Permalink
refactor(storage-manager): move prefix related functions in crate
Browse files Browse the repository at this point in the history
This change is motivated by the refactor of the Replication feature. In
order to exchange metadata that can be processed by all Replicas, the
key expressions associated with the data stored must be prefixed (when
sent) and stripped (when received).

This commit exposes two functions, at the `zenoh-plugin-storage-manager`
crate, that perform these operations.

The objective is to reuse these functions in the Replication refactor
and, as we intend to move the Replication in its own crate, exposing
them at the crate level makes it easier to then import them.

* plugins/zenoh-plugin-storage-manager/src/lib.rs:
  - moved there the `strip_prefix` function,
  - moved there the `get_prefixed` function and renamed it to `prefix`.
* plugins/zenoh-plugin-storage-manager/src/replica/mod.rs: updated the
  call to the previously named `get_prefixed` function.
* plugins/zenoh-plugin-storage-manager/src/replica/storage.rs:
  - removed the `strip_prefix` method,
  - removed the `prefix` function,
  - updated the call to `strip_prefix` and `get_prefixed`.
  • Loading branch information
J-Loudet committed Aug 27, 2024
1 parent 8b027e9 commit 407e7ec
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 61 deletions.
65 changes: 64 additions & 1 deletion plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use std::{
collections::HashMap,
convert::TryFrom,
str::FromStr,
sync::{Arc, Mutex},
};

Expand All @@ -35,7 +36,7 @@ use zenoh::{
runtime::Runtime,
zlock, LibLoader,
},
key_expr::{keyexpr, KeyExpr},
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
prelude::Wait,
session::Session,
Result as ZResult,
Expand Down Expand Up @@ -408,3 +409,65 @@ fn with_extended_string<R, F: FnMut(&mut String) -> R>(
prefix.truncate(prefix_len);
result
}

/// Returns the key expression stripped of the provided prefix.
///
/// If no prefix is provided this function returns the key expression untouched.
///
/// If `None` is returned, it indicates that the key expression is equal to the prefix.
///
/// This function will internally call [strip_prefix], see its documentation for possible outcomes.
///
/// # Errors
///
/// This function will return an error if:
/// - The provided prefix contains a wildcard.
/// NOTE: The configuration of a Storage is checked and will reject any prefix that contains a
/// wildcard. In theory, this error should never occur.
/// - The key expression is not prefixed by the provided prefix.
/// - The resulting stripped key is not a valid key expression (this should, in theory, never
/// happen).
///
/// [strip_prefix]: zenoh::key_expr::keyexpr::strip_prefix()
pub fn strip_prefix(
maybe_prefix: Option<&OwnedKeyExpr>,
key_expr: &KeyExpr<'_>,
) -> ZResult<Option<OwnedKeyExpr>> {
match maybe_prefix {
None => Ok(Some(key_expr.clone().into())),
Some(prefix) => {
if prefix.is_wild() {
bail!(
"Prefix < {} > contains a wild character (\"**\" or \"*\")",
prefix
);
}

match key_expr.strip_prefix(prefix).as_slice() {
[stripped_key_expr] => {
if stripped_key_expr.is_empty() {
return Ok(None);
}

OwnedKeyExpr::from_str(stripped_key_expr).map(Some)
}
_ => bail!("Failed to strip prefix < {} > from: {}", prefix, key_expr),
}
}
}
}

/// Returns the key with an additional prefix, if one was provided.
///
/// If no prefix is provided, this function returns `maybe_stripped_key`.
///
/// If a prefix is provided, this function returns the concatenation of both.
pub fn prefix(
maybe_prefix: Option<&OwnedKeyExpr>,
maybe_stripped_key: &OwnedKeyExpr,
) -> OwnedKeyExpr {
match maybe_prefix {
Some(prefix) => prefix / maybe_stripped_key,
None => maybe_stripped_key.clone(),
}
}
5 changes: 1 addition & 4 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ impl Replica {
}
} else {
result.push((
StorageService::get_prefixed(
&storage_config.strip_prefix,
&entry.0.unwrap().into(),
),
crate::prefix(storage_config.strip_prefix.as_ref(), &entry.0.unwrap()),
entry.1,
));
}
Expand Down
80 changes: 24 additions & 56 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use tokio::sync::{Mutex, RwLock};
use zenoh::{
bytes::EncodingBuilderTrait,
internal::{
bail,
buffers::{SplitBuffer, ZBuf},
zenoh_home, Timed, TimedEvent, Timer, Value,
},
Expand All @@ -39,7 +38,6 @@ use zenoh::{
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait},
session::{Session, SessionDeclarations},
time::{Timestamp, NTP64},
Result as ZResult,
};
use zenoh_backend_traits::{
config::{GarbageCollectionConfig, StorageConfig},
Expand Down Expand Up @@ -342,7 +340,10 @@ impl StorageService {
}
};

let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) {
let stripped_key = match crate::strip_prefix(
self.strip_prefix.as_ref(),
sample_to_store.key_expr(),
) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
Expand Down Expand Up @@ -463,13 +464,14 @@ impl StorageService {
if weight.is_some() && weight.unwrap().data.timestamp > *ts {
// if the key matches a wild card update, check whether it was saved in storage
// remember that wild card updates change only existing keys
let stripped_key = match self.strip_prefix(&key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
break;
}
};
let stripped_key =
match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
break;
}
};
let mut storage = self.storage.lock().await;
match storage.get(stripped_key, "").await {
Ok(stored_data) => {
Expand Down Expand Up @@ -498,7 +500,7 @@ impl StorageService {
async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool {
// @TODO: if cache exists, read from there
let mut storage = self.storage.lock().await;
let stripped_key = match self.strip_prefix(&key_expr.into()) {
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
Expand Down Expand Up @@ -529,14 +531,15 @@ impl StorageService {
let matching_keys = self.get_matching_keys(q.key_expr()).await;
let mut storage = self.storage.lock().await;
for key in matching_keys {
let stripped_key = match self.strip_prefix(&key.clone().into()) {
Ok(k) => k,
Err(e) => {
tracing::error!("{}", e);
// @TODO: return error when it is supported
return;
}
};
let stripped_key =
match crate::strip_prefix(self.strip_prefix.as_ref(), &key.clone().into()) {
Ok(k) => k,
Err(e) => {
tracing::error!("{}", e);
// @TODO: return error when it is supported
return;
}
};
match storage.get(stripped_key, q.parameters().as_str()).await {
Ok(stored_data) => {
for entry in stored_data {
Expand All @@ -561,7 +564,7 @@ impl StorageService {
}
drop(storage);
} else {
let stripped_key = match self.strip_prefix(q.key_expr()) {
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), q.key_expr()) {
Ok(k) => k,
Err(e) => {
tracing::error!("{}", e);
Expand Down Expand Up @@ -603,7 +606,7 @@ impl StorageService {
for (k, _ts) in entries {
// @TODO: optimize adding back the prefix (possible inspiration from https://github.com/eclipse-zenoh/zenoh/blob/0.5.0-beta.9/backends/traits/src/utils.rs#L79)
let full_key = match k {
Some(key) => StorageService::get_prefixed(&self.strip_prefix, &key.into()),
Some(key) => crate::prefix(self.strip_prefix.as_ref(), &key),
None => self.strip_prefix.clone().unwrap(),
};
if key_expr.intersects(&full_key.clone()) {
Expand All @@ -620,41 +623,6 @@ impl StorageService {
result
}

fn strip_prefix(&self, key_expr: &KeyExpr<'_>) -> ZResult<Option<OwnedKeyExpr>> {
let key = match &self.strip_prefix {
Some(prefix) => {
if key_expr.as_str().eq(prefix.as_str()) {
""
} else {
match key_expr.strip_prefix(prefix).as_slice() {
[ke] => ke.as_str(),
_ => bail!(
"Keyexpr doesn't start with prefix '{}': '{}'",
prefix,
key_expr
),
}
}
}
None => key_expr.as_str(),
};
if key.is_empty() {
Ok(None)
} else {
Ok(Some(OwnedKeyExpr::new(key.to_string()).unwrap()))
}
}

pub fn get_prefixed(
strip_prefix: &Option<OwnedKeyExpr>,
key_expr: &KeyExpr<'_>,
) -> OwnedKeyExpr {
match strip_prefix {
Some(prefix) => prefix.join(key_expr.as_keyexpr()).unwrap(),
None => OwnedKeyExpr::from(key_expr.as_keyexpr()),
}
}

async fn initialize_if_empty(&mut self) {
if self.replication.is_some() && self.replication.as_ref().unwrap().empty_start {
// align with other storages, querying them on key_expr,
Expand Down

0 comments on commit 407e7ec

Please sign in to comment.