Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(storage-manager): move prefix related functions in crate #1325

Merged
merged 1 commit into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 64 additions & 1 deletion plugins/zenoh-plugin-storage-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
use std::{
collections::HashMap,
convert::TryFrom,
str::FromStr,
sync::{Arc, Mutex},
};

Expand All @@ -35,7 +36,7 @@ use zenoh::{
runtime::Runtime,
zlock, LibLoader,
},
key_expr::{keyexpr, KeyExpr},
key_expr::{keyexpr, KeyExpr, OwnedKeyExpr},
prelude::Wait,
session::Session,
Result as ZResult,
Expand Down Expand Up @@ -408,3 +409,65 @@ fn with_extended_string<R, F: FnMut(&mut String) -> R>(
prefix.truncate(prefix_len);
result
}

/// Returns the key expression stripped of the provided prefix.
///
/// If no prefix is provided this function returns the key expression untouched.
///
/// If `None` is returned, it indicates that the key expression is equal to the prefix.
///
/// This function will internally call [strip_prefix], see its documentation for possible outcomes.
///
/// # Errors
///
/// This function will return an error if:
/// - The provided prefix contains a wildcard.
/// NOTE: The configuration of a Storage is checked and will reject any prefix that contains a
/// wildcard. In theory, this error should never occur.
/// - The key expression is not prefixed by the provided prefix.
/// - The resulting stripped key is not a valid key expression (this should, in theory, never
/// happen).
///
/// [strip_prefix]: zenoh::key_expr::keyexpr::strip_prefix()
pub fn strip_prefix(
maybe_prefix: Option<&OwnedKeyExpr>,
key_expr: &KeyExpr<'_>,
) -> ZResult<Option<OwnedKeyExpr>> {
match maybe_prefix {
None => Ok(Some(key_expr.clone().into())),
Some(prefix) => {
if prefix.is_wild() {
bail!(
"Prefix < {} > contains a wild character (\"**\" or \"*\")",
prefix
);
}

match key_expr.strip_prefix(prefix).as_slice() {
[stripped_key_expr] => {
if stripped_key_expr.is_empty() {
return Ok(None);
}

OwnedKeyExpr::from_str(stripped_key_expr).map(Some)
}
_ => bail!("Failed to strip prefix < {} > from: {}", prefix, key_expr),
}
}
}
}

/// Returns the key with an additional prefix, if one was provided.
///
/// If no prefix is provided, this function returns `maybe_stripped_key`.
///
/// If a prefix is provided, this function returns the concatenation of both.
pub fn prefix(
maybe_prefix: Option<&OwnedKeyExpr>,
maybe_stripped_key: &OwnedKeyExpr,
) -> OwnedKeyExpr {
match maybe_prefix {
Some(prefix) => prefix / maybe_stripped_key,
None => maybe_stripped_key.clone(),
}
}
5 changes: 1 addition & 4 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,7 @@ impl Replica {
}
} else {
result.push((
StorageService::get_prefixed(
&storage_config.strip_prefix,
&entry.0.unwrap().into(),
),
crate::prefix(storage_config.strip_prefix.as_ref(), &entry.0.unwrap()),
entry.1,
));
}
Expand Down
80 changes: 24 additions & 56 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use tokio::sync::{Mutex, RwLock};
use zenoh::{
bytes::EncodingBuilderTrait,
internal::{
bail,
buffers::{SplitBuffer, ZBuf},
zenoh_home, Timed, TimedEvent, Timer, Value,
},
Expand All @@ -39,7 +38,6 @@ use zenoh::{
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait},
session::{Session, SessionDeclarations},
time::{Timestamp, NTP64},
Result as ZResult,
};
use zenoh_backend_traits::{
config::{GarbageCollectionConfig, StorageConfig},
Expand Down Expand Up @@ -342,7 +340,10 @@ impl StorageService {
}
};

let stripped_key = match self.strip_prefix(sample_to_store.key_expr()) {
let stripped_key = match crate::strip_prefix(
self.strip_prefix.as_ref(),
sample_to_store.key_expr(),
) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
Expand Down Expand Up @@ -463,13 +464,14 @@ impl StorageService {
if weight.is_some() && weight.unwrap().data.timestamp > *ts {
// if the key matches a wild card update, check whether it was saved in storage
// remember that wild card updates change only existing keys
let stripped_key = match self.strip_prefix(&key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
break;
}
};
let stripped_key =
match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
break;
}
};
let mut storage = self.storage.lock().await;
match storage.get(stripped_key, "").await {
Ok(stored_data) => {
Expand Down Expand Up @@ -498,7 +500,7 @@ impl StorageService {
async fn is_latest(&self, key_expr: &OwnedKeyExpr, timestamp: &Timestamp) -> bool {
// @TODO: if cache exists, read from there
let mut storage = self.storage.lock().await;
let stripped_key = match self.strip_prefix(&key_expr.into()) {
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), &key_expr.into()) {
Ok(stripped) => stripped,
Err(e) => {
tracing::error!("{}", e);
Expand Down Expand Up @@ -529,14 +531,15 @@ impl StorageService {
let matching_keys = self.get_matching_keys(q.key_expr()).await;
let mut storage = self.storage.lock().await;
for key in matching_keys {
let stripped_key = match self.strip_prefix(&key.clone().into()) {
Ok(k) => k,
Err(e) => {
tracing::error!("{}", e);
// @TODO: return error when it is supported
return;
}
};
let stripped_key =
match crate::strip_prefix(self.strip_prefix.as_ref(), &key.clone().into()) {
Ok(k) => k,
Err(e) => {
tracing::error!("{}", e);
// @TODO: return error when it is supported
return;
}
};
match storage.get(stripped_key, q.parameters().as_str()).await {
Ok(stored_data) => {
for entry in stored_data {
Expand All @@ -561,7 +564,7 @@ impl StorageService {
}
drop(storage);
} else {
let stripped_key = match self.strip_prefix(q.key_expr()) {
let stripped_key = match crate::strip_prefix(self.strip_prefix.as_ref(), q.key_expr()) {
Ok(k) => k,
Err(e) => {
tracing::error!("{}", e);
Expand Down Expand Up @@ -603,7 +606,7 @@ impl StorageService {
for (k, _ts) in entries {
// @TODO: optimize adding back the prefix (possible inspiration from https://github.com/eclipse-zenoh/zenoh/blob/0.5.0-beta.9/backends/traits/src/utils.rs#L79)
let full_key = match k {
Some(key) => StorageService::get_prefixed(&self.strip_prefix, &key.into()),
Some(key) => crate::prefix(self.strip_prefix.as_ref(), &key),
None => self.strip_prefix.clone().unwrap(),
};
if key_expr.intersects(&full_key.clone()) {
Expand All @@ -620,41 +623,6 @@ impl StorageService {
result
}

fn strip_prefix(&self, key_expr: &KeyExpr<'_>) -> ZResult<Option<OwnedKeyExpr>> {
let key = match &self.strip_prefix {
Some(prefix) => {
if key_expr.as_str().eq(prefix.as_str()) {
""
} else {
match key_expr.strip_prefix(prefix).as_slice() {
[ke] => ke.as_str(),
_ => bail!(
"Keyexpr doesn't start with prefix '{}': '{}'",
prefix,
key_expr
),
}
}
}
None => key_expr.as_str(),
};
if key.is_empty() {
Ok(None)
} else {
Ok(Some(OwnedKeyExpr::new(key.to_string()).unwrap()))
}
}

pub fn get_prefixed(
strip_prefix: &Option<OwnedKeyExpr>,
key_expr: &KeyExpr<'_>,
) -> OwnedKeyExpr {
match strip_prefix {
Some(prefix) => prefix.join(key_expr.as_keyexpr()).unwrap(),
None => OwnedKeyExpr::from(key_expr.as_keyexpr()),
}
}

async fn initialize_if_empty(&mut self) {
if self.replication.is_some() && self.replication.as_ref().unwrap().empty_start {
// align with other storages, querying them on key_expr,
Expand Down